forked from pool/telegraf
392 lines
12 KiB
Diff
392 lines
12 KiB
Diff
|
diff -urN telegraf-1.8.3/plugins/outputs/all/all.go telegraf-1.8.3-patch/plugins/outputs/all/all.go
|
||
|
--- telegraf-1.8.3/plugins/outputs/all/all.go 2018-10-30 23:13:52.000000000 +0200
|
||
|
+++ telegraf-1.8.3-patch/plugins/outputs/all/all.go 2018-11-01 10:00:53.949179483 +0200
|
||
|
@@ -28,5 +28,6 @@
|
||
|
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
|
||
|
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
|
||
|
_ "github.com/influxdata/telegraf/plugins/outputs/socket_writer"
|
||
|
+ _ "github.com/influxdata/telegraf/plugins/outputs/sql"
|
||
|
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
|
||
|
)
|
||
|
diff -urN telegraf-1.8.3/plugins/outputs/sql/README.md telegraf-1.8.3-patch/plugins/outputs/sql/README.md
|
||
|
--- telegraf-1.8.3/plugins/outputs/sql/README.md 1970-01-01 02:00:00.000000000 +0200
|
||
|
+++ telegraf-1.8.3-patch/plugins/outputs/sql/README.md 2018-11-01 09:59:15.296965030 +0200
|
||
|
@@ -0,0 +1,73 @@
|
||
|
+# SQL plugin
|
||
|
+
|
||
|
+The plugin inserts values to SQL various database.
|
||
|
+Supported/integrated drivers are mssql (SQLServer), mysql (MySQL), postgres (Postgres)
|
||
|
+Activable drivers (read below) are all golang SQL compliant drivers (see https://github.com/golang/go/wiki/SQLDrivers): for instance oci8 for Oracle or sqlite3 (SQLite)
|
||
|
+
|
||
|
+## Getting started :
|
||
|
+First you need to grant insert (if auto create table create) privileges to the database user you use for the connection
|
||
|
+
|
||
|
+## Configuration:
|
||
|
+
|
||
|
+```
|
||
|
+# Send metrics to SQL-Database (Example configuration for MySQL/MariaDB)
|
||
|
+[[outputs.sql]]
|
||
|
+ ## Database Driver, required.
|
||
|
+ ## Valid options: mssql (SQLServer), mysql (MySQL), postgres (Postgres), sqlite3 (SQLite), [oci8 ora.v4 (Oracle)]
|
||
|
+ driver = "mysql"
|
||
|
+
|
||
|
+ ## specify address via a url matching:
|
||
|
+ ## postgres://[pqgotest[:password]]@localhost[/dbname]\
|
||
|
+ ## ?sslmode=[disable|verify-ca|verify-full]
|
||
|
+ ## or a simple string:
|
||
|
+ ## host=localhost user=pqotest password=... sslmode=... dbname=app_production
|
||
|
+ ##
|
||
|
+ ## All connection parameters are optional.
|
||
|
+ ##
|
||
|
+ ## Without the dbname parameter, the driver will default to a database
|
||
|
+ ## with the same name as the user. This dbname is just for instantiating a
|
||
|
+ ## connection with the server and doesn't restrict the databases we are trying
|
||
|
+ ## to grab metrics for.
|
||
|
+ ##
|
||
|
+ address = "username:password@tcp(server:port)/table"
|
||
|
+
|
||
|
+ ## Available Variables:
|
||
|
+ ## {TABLE} - tablename as identifier
|
||
|
+ ## {TABLELITERAL} - tablename as string literal
|
||
|
+ ## {COLUMNS} - column definitions
|
||
|
+ ## {KEY_COLUMNS} - comma-separated list of key columns (time + tags)
|
||
|
+ ##
|
||
|
+
|
||
|
+ ## Check with this is table exists
|
||
|
+ ##
|
||
|
+ ## Template for MySQL is "SELECT 1 FROM {TABLE} LIMIT 1"
|
||
|
+ ##
|
||
|
+ table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"
|
||
|
+
|
||
|
+ ## Template to use for generating tables
|
||
|
+
|
||
|
+ ## Default template
|
||
|
+ ##
|
||
|
+ # table_template = "CREATE TABLE {TABLE}({COLUMNS})"
|
||
|
+
|
||
|
+ ## Convert Telegraf datatypes to these types
|
||
|
+ [[outputs.sql.convert]]
|
||
|
+ integer = "INT"
|
||
|
+ real = "DOUBLE"
|
||
|
+ text = "TEXT"
|
||
|
+ timestamp = "TIMESTAMP"
|
||
|
+ defaultvalue = "TEXT"
|
||
|
+ unsigned = "UNSIGNED"
|
||
|
+```
|
||
|
+sql_script is read only once, if you change the script you need to reload telegraf
|
||
|
+
|
||
|
+## Field names
|
||
|
+If database table is not pre-created tries driver to create database. There can be errors as
|
||
|
+SQL has strict scheming.
|
||
|
+
|
||
|
+## Tested Databases
|
||
|
+Actually I run the plugin using MySQL
|
||
|
+
|
||
|
+## TODO
|
||
|
+1) Test with other databases
|
||
|
+2) More sane testing
|
||
|
diff -urN telegraf-1.8.3/plugins/outputs/sql/sql.go telegraf-1.8.3-patch/plugins/outputs/sql/sql.go
|
||
|
--- telegraf-1.8.3/plugins/outputs/sql/sql.go 1970-01-01 02:00:00.000000000 +0200
|
||
|
+++ telegraf-1.8.3-patch/plugins/outputs/sql/sql.go 2018-11-01 09:59:15.300965038 +0200
|
||
|
@@ -0,0 +1,256 @@
|
||
|
+package sql
|
||
|
+
|
||
|
+import (
|
||
|
+ "database/sql"
|
||
|
+ "fmt"
|
||
|
+ "log"
|
||
|
+ "strings"
|
||
|
+
|
||
|
+ _ "github.com/go-sql-driver/mysql"
|
||
|
+ _ "github.com/jackc/pgx"
|
||
|
+ // These SQL drivers can be enabled if
|
||
|
+ // they are added to depencies
|
||
|
+ // _ "github.com/lib/pq"
|
||
|
+ // _ "github.com/mattn/go-sqlite3"
|
||
|
+ // _ "github.com/zensqlmonitor/go-mssqldb"
|
||
|
+
|
||
|
+ "github.com/influxdata/telegraf"
|
||
|
+ "github.com/influxdata/telegraf/plugins/outputs"
|
||
|
+)
|
||
|
+
|
||
|
+type ConvertStruct struct {
|
||
|
+ Integer string
|
||
|
+ Real string
|
||
|
+ Text string
|
||
|
+ Timestamp string
|
||
|
+ Defaultvalue string
|
||
|
+ Unsigned string
|
||
|
+}
|
||
|
+
|
||
|
+type Sql struct {
|
||
|
+ db *sql.DB
|
||
|
+ Driver string
|
||
|
+ Address string
|
||
|
+ TableTemplate string
|
||
|
+ TableExistsTemplate string
|
||
|
+ TagTableSuffix string
|
||
|
+ Tables map[string]bool
|
||
|
+ Convert []ConvertStruct
|
||
|
+}
|
||
|
+
|
||
|
+func (p *Sql) Connect() error {
|
||
|
+ db, err := sql.Open(p.Driver, p.Address)
|
||
|
+ if err != nil {
|
||
|
+ return err
|
||
|
+ }
|
||
|
+ p.db = db
|
||
|
+ p.Tables = make(map[string]bool)
|
||
|
+
|
||
|
+ return nil
|
||
|
+}
|
||
|
+
|
||
|
+func (p *Sql) Close() error {
|
||
|
+ return p.db.Close()
|
||
|
+}
|
||
|
+
|
||
|
+func contains(haystack []string, needle string) bool {
|
||
|
+ for _, key := range haystack {
|
||
|
+ if key == needle {
|
||
|
+ return true
|
||
|
+ }
|
||
|
+ }
|
||
|
+ return false
|
||
|
+}
|
||
|
+
|
||
|
+func quoteIdent(name string) string {
|
||
|
+ return name
|
||
|
+}
|
||
|
+
|
||
|
+func quoteLiteral(name string) string {
|
||
|
+ return "'" + strings.Replace(name, "'", "''", -1) + "'"
|
||
|
+}
|
||
|
+
|
||
|
+func (p *Sql) deriveDatatype(value interface{}) string {
|
||
|
+ var datatype string
|
||
|
+
|
||
|
+ switch value.(type) {
|
||
|
+ case int64:
|
||
|
+ datatype = p.Convert[0].Integer
|
||
|
+ case uint64:
|
||
|
+ datatype = fmt.Sprintf("%s %s", p.Convert[0].Integer, p.Convert[0].Unsigned)
|
||
|
+ case float64:
|
||
|
+ datatype = p.Convert[0].Real
|
||
|
+ case string:
|
||
|
+ datatype = p.Convert[0].Text
|
||
|
+ default:
|
||
|
+ datatype = p.Convert[0].Defaultvalue
|
||
|
+ log.Printf("E! Unknown datatype: '%T' %v", value, value)
|
||
|
+ }
|
||
|
+ return datatype
|
||
|
+}
|
||
|
+
|
||
|
+var sampleConfig = `
|
||
|
+# Send metrics to SQL-Database (Example configuration for MySQL/MariaDB)
|
||
|
+[[outputs.sql]]
|
||
|
+ ## Database Driver, required.
|
||
|
+ ## Valid options: mssql (SQLServer), mysql (MySQL), postgres (Postgres), sqlite3 (SQLite), [oci8 ora.v4 (Oracle)]
|
||
|
+ driver = "mysql"
|
||
|
+
|
||
|
+ ## specify address via a url matching:
|
||
|
+ ## postgres://[pqgotest[:password]]@localhost[/dbname]\
|
||
|
+ ## ?sslmode=[disable|verify-ca|verify-full]
|
||
|
+ ## or a simple string:
|
||
|
+ ## host=localhost user=pqotest password=... sslmode=... dbname=app_production
|
||
|
+ ##
|
||
|
+ ## All connection parameters are optional.
|
||
|
+ ##
|
||
|
+ ## Without the dbname parameter, the driver will default to a database
|
||
|
+ ## with the same name as the user. This dbname is just for instantiating a
|
||
|
+ ## connection with the server and doesn't restrict the databases we are trying
|
||
|
+ ## to grab metrics for.
|
||
|
+ ##
|
||
|
+ address = "username:password@tcp(server:port)/table"
|
||
|
+
|
||
|
+ ## Available Variables:
|
||
|
+ ## {TABLE} - tablename as identifier
|
||
|
+ ## {TABLELITERAL} - tablename as string literal
|
||
|
+ ## {COLUMNS} - column definitions
|
||
|
+ ## {KEY_COLUMNS} - comma-separated list of key columns (time + tags)
|
||
|
+ ##
|
||
|
+
|
||
|
+ ## Check with this is table exists
|
||
|
+ ##
|
||
|
+ ## Template for MySQL is "SELECT 1 FROM {TABLE} LIMIT 1"
|
||
|
+ ##
|
||
|
+ table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"
|
||
|
+
|
||
|
+ ## Template to use for generating tables
|
||
|
+
|
||
|
+ ## Default template
|
||
|
+ ##
|
||
|
+ # table_template = "CREATE TABLE {TABLE}({COLUMNS})"
|
||
|
+
|
||
|
+ ## Convert Telegraf datatypes to these types
|
||
|
+ [[outputs.sql.convert]]
|
||
|
+ integer = "INT"
|
||
|
+ real = "DOUBLE"
|
||
|
+ text = "TEXT"
|
||
|
+ timestamp = "TIMESTAMP"
|
||
|
+ defaultvalue = "TEXT"
|
||
|
+ unsigned = "UNSIGNED"
|
||
|
+`
|
||
|
+
|
||
|
+func (p *Sql) SampleConfig() string { return sampleConfig }
|
||
|
+func (p *Sql) Description() string { return "Send metrics to SQL Database" }
|
||
|
+
|
||
|
+func (p *Sql) generateCreateTable(metric telegraf.Metric) string {
|
||
|
+ var columns []string
|
||
|
+ var pk []string
|
||
|
+ var sql []string
|
||
|
+
|
||
|
+ pk = append(pk, quoteIdent("timestamp"))
|
||
|
+ columns = append(columns, fmt.Sprintf("timestamp %s", p.Convert[0].Timestamp))
|
||
|
+
|
||
|
+ // handle tags if necessary
|
||
|
+ if len(metric.Tags()) > 0 {
|
||
|
+ // tags in measurement table
|
||
|
+ for column := range metric.Tags() {
|
||
|
+ pk = append(pk, quoteIdent(column))
|
||
|
+ columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(column), p.Convert[0].Text))
|
||
|
+ }
|
||
|
+ }
|
||
|
+
|
||
|
+ var datatype string
|
||
|
+ for column, v := range metric.Fields() {
|
||
|
+ datatype = p.deriveDatatype(v)
|
||
|
+ columns = append(columns, fmt.Sprintf("%s %s", quoteIdent(column), datatype))
|
||
|
+ }
|
||
|
+
|
||
|
+ query := strings.Replace(p.TableTemplate, "{TABLE}", quoteIdent(metric.Name()), -1)
|
||
|
+ query = strings.Replace(query, "{TABLELITERAL}", quoteLiteral(metric.Name()), -1)
|
||
|
+ query = strings.Replace(query, "{COLUMNS}", strings.Join(columns, ","), -1)
|
||
|
+ query = strings.Replace(query, "{KEY_COLUMNS}", strings.Join(pk, ","), -1)
|
||
|
+
|
||
|
+ sql = append(sql, query)
|
||
|
+ return strings.Join(sql, ";")
|
||
|
+}
|
||
|
+
|
||
|
+func (p *Sql) generateInsert(tablename string, columns []string) string {
|
||
|
+
|
||
|
+ var placeholder, quoted []string
|
||
|
+ for _, column := range columns {
|
||
|
+ placeholder = append(placeholder, fmt.Sprintf("?"))
|
||
|
+ quoted = append(quoted, quoteIdent(column))
|
||
|
+ }
|
||
|
+
|
||
|
+ sql := fmt.Sprintf("INSERT INTO %s(%s) VALUES(%s)", quoteIdent(tablename), strings.Join(quoted, ","), strings.Join(placeholder, ","))
|
||
|
+ return sql
|
||
|
+}
|
||
|
+
|
||
|
+func (p *Sql) tableExists(tableName string) bool {
|
||
|
+ stmt := strings.Replace(p.TableExistsTemplate, "{TABLE}", quoteIdent(tableName), -1)
|
||
|
+
|
||
|
+ _, err := p.db.Exec(stmt)
|
||
|
+ if err != nil {
|
||
|
+ return false
|
||
|
+ }
|
||
|
+ return true
|
||
|
+}
|
||
|
+
|
||
|
+func (p *Sql) Write(metrics []telegraf.Metric) error {
|
||
|
+ for _, metric := range metrics {
|
||
|
+ tablename := metric.Name()
|
||
|
+
|
||
|
+ // create table if needed
|
||
|
+ if p.Tables[tablename] == false && p.tableExists(tablename) == false {
|
||
|
+ createStmt := p.generateCreateTable(metric)
|
||
|
+ _, err := p.db.Exec(createStmt)
|
||
|
+ if err != nil {
|
||
|
+ return err
|
||
|
+ }
|
||
|
+ p.Tables[tablename] = true
|
||
|
+ }
|
||
|
+
|
||
|
+ var columns []string
|
||
|
+ var values []interface{}
|
||
|
+
|
||
|
+ // We assume that SQL is making auto timestamp
|
||
|
+ //columns = append(columns, "timestamp")
|
||
|
+ //values = append(values, metric.Time())
|
||
|
+
|
||
|
+ if len(metric.Tags()) > 0 {
|
||
|
+ // tags in measurement table
|
||
|
+ for column, value := range metric.Tags() {
|
||
|
+ columns = append(columns, column)
|
||
|
+ values = append(values, value)
|
||
|
+ }
|
||
|
+ }
|
||
|
+
|
||
|
+ for column, value := range metric.Fields() {
|
||
|
+ columns = append(columns, column)
|
||
|
+ values = append(values, value)
|
||
|
+ }
|
||
|
+
|
||
|
+ sql := p.generateInsert(tablename, columns)
|
||
|
+ _, err := p.db.Exec(sql, values...)
|
||
|
+
|
||
|
+ if err != nil {
|
||
|
+ // check if insert error was caused by column mismatch
|
||
|
+ log.Printf("E! Error during insert: %v", err)
|
||
|
+ return err
|
||
|
+ }
|
||
|
+ }
|
||
|
+ return nil
|
||
|
+}
|
||
|
+
|
||
|
+func init() {
|
||
|
+ outputs.Add("sql", func() telegraf.Output { return newSql() })
|
||
|
+}
|
||
|
+
|
||
|
+func newSql() *Sql {
|
||
|
+ return &Sql{
|
||
|
+ TableTemplate: "CREATE TABLE {TABLE}({COLUMNS})",
|
||
|
+ TableExistsTemplate: "SELECT 1 FROM {TABLE} LIMIT 1",
|
||
|
+ TagTableSuffix: "_tag",
|
||
|
+ }
|
||
|
+}
|
||
|
diff -urN telegraf-1.8.3/plugins/outputs/sql/sql_test.go telegraf-1.8.3-patch/plugins/outputs/sql/sql_test.go
|
||
|
--- telegraf-1.8.3/plugins/outputs/sql/sql_test.go 1970-01-01 02:00:00.000000000 +0200
|
||
|
+++ telegraf-1.8.3-patch/plugins/outputs/sql/sql_test.go 2018-11-01 09:59:15.300965038 +0200
|
||
|
@@ -0,0 +1,29 @@
|
||
|
+package sql
|
||
|
+
|
||
|
+import (
|
||
|
+ "testing"
|
||
|
+ // "time"
|
||
|
+ // "github.com/influxdata/telegraf"
|
||
|
+ // "github.com/influxdata/telegraf/metric"
|
||
|
+ // "github.com/stretchr/testify/assert"
|
||
|
+)
|
||
|
+
|
||
|
+func TestSqlQuote(t *testing.T) {
|
||
|
+ if testing.Short() {
|
||
|
+ t.Skip("Skipping integration test in short mode")
|
||
|
+ }
|
||
|
+
|
||
|
+}
|
||
|
+
|
||
|
+func TestSqlCreateStatement(t *testing.T) {
|
||
|
+ if testing.Short() {
|
||
|
+ t.Skip("Skipping integration test in short mode")
|
||
|
+ }
|
||
|
+
|
||
|
+}
|
||
|
+
|
||
|
+func TestSqlInsertStatement(t *testing.T) {
|
||
|
+ if testing.Short() {
|
||
|
+ t.Skip("Skipping integration test in short mode")
|
||
|
+ }
|
||
|
+}
|
||
|
diff -urN telegraf-1.8.3/README.md telegraf-1.8.3-patch/README.md
|
||
|
--- telegraf-1.8.3/README.md 2018-10-30 23:13:52.000000000 +0200
|
||
|
+++ telegraf-1.8.3-patch/README.md 2018-11-01 10:00:11.129086395 +0200
|
||
|
@@ -237,6 +237,7 @@
|
||
|
* [snmp](./plugins/inputs/snmp)
|
||
|
* [socket_listener](./plugins/inputs/socket_listener)
|
||
|
* [solr](./plugins/inputs/solr)
|
||
|
+* [sql](./plugins/outputs/sql) (sql generic output)
|
||
|
* [sql server](./plugins/inputs/sqlserver) (microsoft)
|
||
|
* [statsd](./plugins/inputs/statsd)
|
||
|
* [swap](./plugins/inputs/swap)
|