From c10ac0a32e27166095f0339bc63e674601a5438a Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 12 Jan 2022 23:54:42 +0100 Subject: [PATCH] feat: Parser plugin restructuring (#8791) --- agent/agent.go | 7 + cmd/telegraf/telegraf.go | 1 + config/config.go | 225 +++++-- config/config_test.go | 376 +++++++++++- config/testdata/addressbook.proto | 28 + config/testdata/parsers_new.toml | 60 ++ config/testdata/parsers_old.toml | 60 ++ models/running_parsers.go | 97 +++ parser.go | 39 ++ .../directory_monitor_test.go | 55 +- plugins/inputs/file/file_test.go | 14 +- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 6 +- plugins/inputs/tail/tail_test.go | 12 +- plugins/parsers/all/all.go | 6 + plugins/parsers/csv/parser.go | 99 ++-- plugins/parsers/csv/parser_test.go | 553 +++++++++--------- plugins/parsers/registry.go | 55 +- plugins/parsers/registry_test.go | 70 +++ plugins/processors/parser/parser.go | 2 +- 19 files changed, 1320 insertions(+), 445 deletions(-) create mode 100644 config/testdata/addressbook.proto create mode 100644 config/testdata/parsers_new.toml create mode 100644 config/testdata/parsers_old.toml create mode 100644 models/running_parsers.go create mode 100644 parser.go create mode 100644 plugins/parsers/all/all.go create mode 100644 plugins/parsers/registry_test.go diff --git a/agent/agent.go b/agent/agent.go index 8dbbffa826e85..b5e44c03a6164 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -192,6 +192,13 @@ func (a *Agent) initPlugins() error { input.LogName(), err) } } + for _, parser := range a.Config.Parsers { + err := parser.Init() + if err != nil { + return fmt.Errorf("could not initialize parser %s::%s: %v", + parser.Config.DataFormat, parser.Config.Parent, err) + } + } for _, processor := range a.Config.Processors { err := processor.Init() if err != nil { diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 09c5992ef43fa..e14292dc0df8f 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -31,6 +31,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" _ "github.com/influxdata/telegraf/plugins/outputs/all" + _ "github.com/influxdata/telegraf/plugins/parsers/all" _ "github.com/influxdata/telegraf/plugins/processors/all" "gopkg.in/tomb.v1" ) diff --git a/config/config.go b/config/config.go index c8c9e5312c6ff..1a5557b5c802d 100644 --- a/config/config.go +++ b/config/config.go @@ -76,6 +76,7 @@ type Config struct { Inputs []*models.RunningInput Outputs []*models.RunningOutput Aggregators []*models.RunningAggregator + Parsers []*models.RunningParser // Processors have a slice wrapper type because they need to be sorted Processors models.RunningProcessors AggProcessors models.RunningProcessors @@ -103,6 +104,7 @@ func NewConfig() *Config { Tags: make(map[string]string), Inputs: make([]*models.RunningInput, 0), Outputs: make([]*models.RunningOutput, 0), + Parsers: make([]*models.RunningParser, 0), Processors: make([]*models.RunningProcessor, 0), AggProcessors: make([]*models.RunningProcessor, 0), InputFilters: make([]string, 0), @@ -233,6 +235,15 @@ func (c *Config) AggregatorNames() []string { return PluginNameCounts(name) } +// ParserNames returns a list of strings of the configured parsers. +func (c *Config) ParserNames() []string { + var name []string + for _, parser := range c.Parsers { + name = append(name, parser.Config.DataFormat) + } + return PluginNameCounts(name) +} + // ProcessorNames returns a list of strings of the configured processors. func (c *Config) ProcessorNames() []string { var name []string @@ -1048,6 +1059,39 @@ func (c *Config) addAggregator(name string, table *ast.Table) error { return nil } +func (c *Config) probeParser(table *ast.Table) bool { + var dataformat string + c.getFieldString(table, "data_format", &dataformat) + + _, ok := parsers.Parsers[dataformat] + return ok +} + +func (c *Config) addParser(parentname string, table *ast.Table) (*models.RunningParser, error) { + var dataformat string + c.getFieldString(table, "data_format", &dataformat) + + creator, ok := parsers.Parsers[dataformat] + if !ok { + return nil, fmt.Errorf("Undefined but requested parser: %s", dataformat) + } + parser := creator(parentname) + + conf, err := c.buildParser(parentname, table) + if err != nil { + return nil, err + } + + if err := c.toml.UnmarshalTable(table, parser); err != nil { + return nil, err + } + + running := models.NewRunningParser(parser, conf) + c.Parsers = append(c.Parsers, running) + + return running, nil +} + func (c *Config) addProcessor(name string, table *ast.Table) error { creator, ok := processors.Processors[name] if !ok { @@ -1162,6 +1206,17 @@ func (c *Config) addInput(name string, table *ast.Table) error { name = "diskio" } + // For inputs with parsers we need to compute the set of + // options that is not covered by both, the parser and the input. + // We achieve this by keeping a local book of missing entries + // that counts the number of misses. In case we have a parser + // for the input both need to miss the entry. We count the + // missing entries at the end. + missThreshold := 0 + missCount := make(map[string]int) + c.setLocalMissingTomlFieldTracker(missCount) + defer c.resetMissingTomlFieldTracker() + creator, ok := inputs.Inputs[name] if !ok { // Handle removed, deprecated plugins @@ -1174,35 +1229,95 @@ func (c *Config) addInput(name string, table *ast.Table) error { } input := creator() - // If the input has a SetParser function, then this means it can accept - // arbitrary types of input, so build the parser and set it. - if t, ok := input.(parsers.ParserInput); ok { - parser, err := c.buildParser(name, table) - if err != nil { - return err + // If the input has a SetParser or SetParserFunc function, it can accept + // arbitrary data-formats, so build the requested parser and set it. + if t, ok := input.(telegraf.ParserInput); ok { + missThreshold = 1 + if parser, err := c.addParser(name, table); err == nil { + t.SetParser(parser) + } else { + missThreshold = 0 + // Fallback to the old way of instantiating the parsers. + config, err := c.getParserConfig(name, table) + if err != nil { + return err + } + parser, err := c.buildParserOld(name, config) + if err != nil { + return err + } + t.SetParser(parser) } - t.SetParser(parser) } - if t, ok := input.(parsers.ParserFuncInput); ok { - config, err := c.getParserConfig(name, table) - if err != nil { - return err + // Keep the old interface for backward compatibility + if t, ok := input.(parsers.ParserInput); ok { + // DEPRECATED: Please switch your plugin to telegraf.ParserInput. + missThreshold = 1 + if parser, err := c.addParser(name, table); err == nil { + t.SetParser(parser) + } else { + missThreshold = 0 + // Fallback to the old way of instantiating the parsers. + config, err := c.getParserConfig(name, table) + if err != nil { + return err + } + parser, err := c.buildParserOld(name, config) + if err != nil { + return err + } + t.SetParser(parser) } - t.SetParserFunc(func() (parsers.Parser, error) { - parser, err := parsers.NewParser(config) + } + + if t, ok := input.(telegraf.ParserFuncInput); ok { + missThreshold = 1 + if c.probeParser(table) { + t.SetParserFunc(func() (telegraf.Parser, error) { + parser, err := c.addParser(name, table) + if err != nil { + return nil, err + } + err = parser.Init() + return parser, err + }) + } else { + missThreshold = 0 + // Fallback to the old way + config, err := c.getParserConfig(name, table) if err != nil { - return nil, err + return err } - logger := models.NewLogger("parsers", config.DataFormat, name) - models.SetLoggerOnPlugin(parser, logger) - if initializer, ok := parser.(telegraf.Initializer); ok { - if err := initializer.Init(); err != nil { + t.SetParserFunc(func() (telegraf.Parser, error) { + return c.buildParserOld(name, config) + }) + } + } + + if t, ok := input.(parsers.ParserFuncInput); ok { + // DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput. + missThreshold = 1 + if c.probeParser(table) { + t.SetParserFunc(func() (parsers.Parser, error) { + parser, err := c.addParser(name, table) + if err != nil { return nil, err } + err = parser.Init() + return parser, err + }) + } else { + missThreshold = 0 + // Fallback to the old way + config, err := c.getParserConfig(name, table) + if err != nil { + return err } - return parser, nil - }) + t.SetParserFunc(func() (parsers.Parser, error) { + return c.buildParserOld(name, config) + }) + } } pluginConfig, err := c.buildInput(name, table) @@ -1221,6 +1336,17 @@ func (c *Config) addInput(name string, table *ast.Table) error { rp := models.NewRunningInput(input, pluginConfig) rp.SetDefaultTags(c.Tags) c.Inputs = append(c.Inputs, rp) + + // Check the number of misses against the threshold + for key, count := range missCount { + if count <= missThreshold { + continue + } + if err := c.missingTomlField(nil, key); err != nil { + return err + } + } + return nil } @@ -1265,6 +1391,21 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato return conf, nil } +// buildParser parses Parser specific items from the ast.Table, +// builds the filter and returns a +// models.ParserConfig to be inserted into models.RunningParser +func (c *Config) buildParser(name string, tbl *ast.Table) (*models.ParserConfig, error) { + var dataformat string + c.getFieldString(tbl, "data_format", &dataformat) + + conf := &models.ParserConfig{ + Parent: name, + DataFormat: dataformat, + } + + return conf, nil +} + // buildProcessor parses Processor specific items from the ast.Table, // builds the filter and returns a // models.ProcessorConfig to be inserted into models.RunningProcessor @@ -1353,14 +1494,10 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e return cp, nil } -// buildParser grabs the necessary entries from the ast.Table for creating +// buildParserOld grabs the necessary entries from the ast.Table for creating // a parsers.Parser object, and creates it, which can then be added onto // an Input object. -func (c *Config) buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { - config, err := c.getParserConfig(name, tbl) - if err != nil { - return nil, err - } +func (c *Config) buildParserOld(name string, config *parsers.Config) (telegraf.Parser, error) { parser, err := parsers.NewParser(config) if err != nil { return nil, err @@ -1422,23 +1559,6 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config, c.getFieldString(tbl, "grok_timezone", &pc.GrokTimezone) c.getFieldString(tbl, "grok_unique_timestamp", &pc.GrokUniqueTimestamp) - //for csv parser - c.getFieldStringSlice(tbl, "csv_column_names", &pc.CSVColumnNames) - c.getFieldStringSlice(tbl, "csv_column_types", &pc.CSVColumnTypes) - c.getFieldStringSlice(tbl, "csv_tag_columns", &pc.CSVTagColumns) - c.getFieldString(tbl, "csv_timezone", &pc.CSVTimezone) - c.getFieldString(tbl, "csv_delimiter", &pc.CSVDelimiter) - c.getFieldString(tbl, "csv_comment", &pc.CSVComment) - c.getFieldString(tbl, "csv_measurement_column", &pc.CSVMeasurementColumn) - c.getFieldString(tbl, "csv_timestamp_column", &pc.CSVTimestampColumn) - c.getFieldString(tbl, "csv_timestamp_format", &pc.CSVTimestampFormat) - c.getFieldInt(tbl, "csv_header_row_count", &pc.CSVHeaderRowCount) - c.getFieldInt(tbl, "csv_skip_rows", &pc.CSVSkipRows) - c.getFieldInt(tbl, "csv_skip_columns", &pc.CSVSkipColumns) - c.getFieldBool(tbl, "csv_trim_space", &pc.CSVTrimSpace) - c.getFieldStringSlice(tbl, "csv_skip_values", &pc.CSVSkipValues) - c.getFieldBool(tbl, "csv_skip_errors", &pc.CSVSkipErrors) - c.getFieldStringSlice(tbl, "form_urlencoded_tag_keys", &pc.FormUrlencodedTagKeys) c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName) @@ -1652,9 +1772,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { switch key { case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file", "collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter", - "csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count", - "csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns", "csv_skip_errors", - "csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values", "data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path", "dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path", "fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys", @@ -1679,6 +1796,22 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { return nil } +func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) { + f := func(_ reflect.Type, key string) error { + if c, ok := counter[key]; ok { + counter[key] = c + 1 + } else { + counter[key] = 1 + } + return nil + } + c.toml.MissingField = f +} + +func (c *Config) resetMissingTomlFieldTracker() { + c.toml.MissingField = c.missingTomlField +} + func (c *Config) getFieldString(tbl *ast.Table, fieldName string, target *string) { if node, ok := tbl.Fields[fieldName]; ok { if kv, ok := node.(*ast.KeyValue); ok { diff --git a/config/config_test.go b/config/config_test.go index 546b752f3a383..743a911da0eda 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/httptest" "os" + "reflect" "runtime" "strings" "testing" @@ -18,6 +19,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" + _ "github.com/influxdata/telegraf/plugins/parsers/all" // Blank import to have all parsers for testing ) func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) { @@ -359,6 +361,370 @@ func TestConfig_URLLikeFileName(t *testing.T) { } } +func TestConfig_ParserInterfaceNewFormat(t *testing.T) { + formats := []string{ + "collectd", + "csv", + "dropwizard", + "form_urlencoded", + "graphite", + "grok", + "influx", + "json", + "json_v2", + "logfmt", + "nagios", + "prometheus", + "prometheusremotewrite", + "value", + "wavefront", + "xml", "xpath_json", "xpath_msgpack", "xpath_protobuf", + } + + c := NewConfig() + require.NoError(t, c.LoadConfig("./testdata/parsers_new.toml")) + require.Len(t, c.Inputs, len(formats)) + + cfg := parsers.Config{ + CSVHeaderRowCount: 42, + DropwizardTagPathsMap: make(map[string]string), + GrokPatterns: []string{"%{COMBINED_LOG_FORMAT}"}, + JSONStrict: true, + MetricName: "parser_test_new", + } + + override := map[string]struct { + cfg *parsers.Config + param map[string]interface{} + mask []string + }{ + "csv": { + param: map[string]interface{}{ + "HeaderRowCount": cfg.CSVHeaderRowCount, + }, + mask: []string{"TimeFunc", "Log"}, + }, + "json_v2": { + mask: []string{"Log"}, + }, + "logfmt": { + mask: []string{"Now"}, + }, + "xml": { + mask: []string{"Log"}, + }, + "xpath_json": { + mask: []string{"Log"}, + }, + "xpath_msgpack": { + mask: []string{"Log"}, + }, + "xpath_protobuf": { + cfg: &parsers.Config{ + XPathProtobufFile: "testdata/addressbook.proto", + XPathProtobufType: "addressbook.AddressBook", + }, + param: map[string]interface{}{ + "ProtobufMessageDef": "testdata/addressbook.proto", + "ProtobufMessageType": "addressbook.AddressBook", + }, + mask: []string{"Log"}, + }, + } + + expected := make([]telegraf.Parser, 0, len(formats)) + for _, format := range formats { + formatCfg := &cfg + settings, hasOverride := override[format] + if hasOverride && settings.cfg != nil { + formatCfg = settings.cfg + } + formatCfg.DataFormat = format + + logger := models.NewLogger("parsers", format, cfg.MetricName) + + // Try with the new format + if creator, found := parsers.Parsers[format]; found { + t.Logf("using new format parser for %q...", format) + parserNew := creator(formatCfg.MetricName) + if settings, found := override[format]; found { + s := reflect.Indirect(reflect.ValueOf(parserNew)) + for key, value := range settings.param { + v := reflect.ValueOf(value) + s.FieldByName(key).Set(v) + } + } + models.SetLoggerOnPlugin(parserNew, logger) + if p, ok := parserNew.(telegraf.Initializer); ok { + require.NoError(t, p.Init()) + } + expected = append(expected, parserNew) + continue + } + + // Try with the old format + parserOld, err := parsers.NewParser(formatCfg) + if err == nil { + t.Logf("using old format parser for %q...", format) + models.SetLoggerOnPlugin(parserOld, logger) + if p, ok := parserOld.(telegraf.Initializer); ok { + require.NoError(t, p.Init()) + } + expected = append(expected, parserOld) + continue + } + require.Containsf(t, err.Error(), "invalid data format:", "setup %q failed: %v", format, err) + require.Failf(t, "%q neither found in old nor new format", format) + } + require.Len(t, expected, len(formats)) + + actual := make([]interface{}, 0) + generated := make([]interface{}, 0) + for _, plugin := range c.Inputs { + input, ok := plugin.Input.(*MockupInputPluginParserNew) + require.True(t, ok) + // Get the parser set with 'SetParser()' + if p, ok := input.Parser.(*models.RunningParser); ok { + actual = append(actual, p.Parser) + } else { + actual = append(actual, input.Parser) + } + // Get the parser set with 'SetParserFunc()' + g, err := input.ParserFunc() + require.NoError(t, err) + if rp, ok := g.(*models.RunningParser); ok { + generated = append(generated, rp.Parser) + } else { + generated = append(generated, g) + } + } + require.Len(t, actual, len(formats)) + + for i, format := range formats { + if settings, found := override[format]; found { + a := reflect.Indirect(reflect.ValueOf(actual[i])) + e := reflect.Indirect(reflect.ValueOf(expected[i])) + g := reflect.Indirect(reflect.ValueOf(generated[i])) + for _, key := range settings.mask { + af := a.FieldByName(key) + ef := e.FieldByName(key) + gf := g.FieldByName(key) + + v := reflect.Zero(ef.Type()) + af.Set(v) + ef.Set(v) + gf.Set(v) + } + } + + // We need special handling for same parsers as they internally contain pointers + // to other structs that inherently differ between instances + switch format { + case "dropwizard", "grok", "influx", "wavefront": + // At least check if we have the same type + require.IsType(t, expected[i], actual[i]) + require.IsType(t, expected[i], generated[i]) + continue + } + require.EqualValuesf(t, expected[i], actual[i], "in SetParser() for %q", format) + require.EqualValuesf(t, expected[i], generated[i], "in SetParserFunc() for %q", format) + } +} + +func TestConfig_ParserInterfaceOldFormat(t *testing.T) { + formats := []string{ + "collectd", + "csv", + "dropwizard", + "form_urlencoded", + "graphite", + "grok", + "influx", + "json", + "json_v2", + "logfmt", + "nagios", + "prometheus", + "prometheusremotewrite", + "value", + "wavefront", + "xml", "xpath_json", "xpath_msgpack", "xpath_protobuf", + } + + c := NewConfig() + require.NoError(t, c.LoadConfig("./testdata/parsers_old.toml")) + require.Len(t, c.Inputs, len(formats)) + + cfg := parsers.Config{ + CSVHeaderRowCount: 42, + DropwizardTagPathsMap: make(map[string]string), + GrokPatterns: []string{"%{COMBINED_LOG_FORMAT}"}, + JSONStrict: true, + MetricName: "parser_test_old", + } + + override := map[string]struct { + cfg *parsers.Config + param map[string]interface{} + mask []string + }{ + "csv": { + param: map[string]interface{}{ + "HeaderRowCount": cfg.CSVHeaderRowCount, + }, + mask: []string{"TimeFunc", "Log"}, + }, + "json_v2": { + mask: []string{"Log"}, + }, + "logfmt": { + mask: []string{"Now"}, + }, + "xml": { + mask: []string{"Log"}, + }, + "xpath_json": { + mask: []string{"Log"}, + }, + "xpath_msgpack": { + mask: []string{"Log"}, + }, + "xpath_protobuf": { + cfg: &parsers.Config{ + XPathProtobufFile: "testdata/addressbook.proto", + XPathProtobufType: "addressbook.AddressBook", + }, + param: map[string]interface{}{ + "ProtobufMessageDef": "testdata/addressbook.proto", + "ProtobufMessageType": "addressbook.AddressBook", + }, + mask: []string{"Log"}, + }, + } + + expected := make([]telegraf.Parser, 0, len(formats)) + for _, format := range formats { + formatCfg := &cfg + settings, hasOverride := override[format] + if hasOverride && settings.cfg != nil { + formatCfg = settings.cfg + } + formatCfg.DataFormat = format + + logger := models.NewLogger("parsers", format, cfg.MetricName) + + // Try with the new format + if creator, found := parsers.Parsers[format]; found { + t.Logf("using new format parser for %q...", format) + parserNew := creator(formatCfg.MetricName) + if settings, found := override[format]; found { + s := reflect.Indirect(reflect.ValueOf(parserNew)) + for key, value := range settings.param { + v := reflect.ValueOf(value) + s.FieldByName(key).Set(v) + } + } + models.SetLoggerOnPlugin(parserNew, logger) + if p, ok := parserNew.(telegraf.Initializer); ok { + require.NoError(t, p.Init()) + } + expected = append(expected, parserNew) + continue + } + + // Try with the old format + parserOld, err := parsers.NewParser(formatCfg) + if err == nil { + t.Logf("using old format parser for %q...", format) + models.SetLoggerOnPlugin(parserOld, logger) + if p, ok := parserOld.(telegraf.Initializer); ok { + require.NoError(t, p.Init()) + } + expected = append(expected, parserOld) + continue + } + require.Containsf(t, err.Error(), "invalid data format:", "setup %q failed: %v", format, err) + require.Failf(t, "%q neither found in old nor new format", format) + } + require.Len(t, expected, len(formats)) + + actual := make([]interface{}, 0) + generated := make([]interface{}, 0) + for _, plugin := range c.Inputs { + input, ok := plugin.Input.(*MockupInputPluginParserOld) + require.True(t, ok) + // Get the parser set with 'SetParser()' + if p, ok := input.Parser.(*models.RunningParser); ok { + actual = append(actual, p.Parser) + } else { + actual = append(actual, input.Parser) + } + // Get the parser set with 'SetParserFunc()' + g, err := input.ParserFunc() + require.NoError(t, err) + if rp, ok := g.(*models.RunningParser); ok { + generated = append(generated, rp.Parser) + } else { + generated = append(generated, g) + } + } + require.Len(t, actual, len(formats)) + + for i, format := range formats { + if settings, found := override[format]; found { + a := reflect.Indirect(reflect.ValueOf(actual[i])) + e := reflect.Indirect(reflect.ValueOf(expected[i])) + g := reflect.Indirect(reflect.ValueOf(generated[i])) + for _, key := range settings.mask { + af := a.FieldByName(key) + ef := e.FieldByName(key) + gf := g.FieldByName(key) + + v := reflect.Zero(ef.Type()) + af.Set(v) + ef.Set(v) + gf.Set(v) + } + } + + // We need special handling for same parsers as they internally contain pointers + // to other structs that inherently differ between instances + switch format { + case "dropwizard", "grok", "influx", "wavefront": + // At least check if we have the same type + require.IsType(t, expected[i], actual[i]) + require.IsType(t, expected[i], generated[i]) + continue + } + require.EqualValuesf(t, expected[i], actual[i], "in SetParser() for %q", format) + require.EqualValuesf(t, expected[i], generated[i], "in SetParserFunc() for %q", format) + } +} + +/*** Mockup INPUT plugin for (old) parser testing to avoid cyclic dependencies ***/ +type MockupInputPluginParserOld struct { + Parser parsers.Parser + ParserFunc parsers.ParserFunc +} + +func (m *MockupInputPluginParserOld) SampleConfig() string { return "Mockup old parser test plugin" } +func (m *MockupInputPluginParserOld) Description() string { return "Mockup old parser test plugin" } +func (m *MockupInputPluginParserOld) Gather(acc telegraf.Accumulator) error { return nil } +func (m *MockupInputPluginParserOld) SetParser(parser parsers.Parser) { m.Parser = parser } +func (m *MockupInputPluginParserOld) SetParserFunc(f parsers.ParserFunc) { m.ParserFunc = f } + +/*** Mockup INPUT plugin for (new) parser testing to avoid cyclic dependencies ***/ +type MockupInputPluginParserNew struct { + Parser telegraf.Parser + ParserFunc telegraf.ParserFunc +} + +func (m *MockupInputPluginParserNew) SampleConfig() string { return "Mockup old parser test plugin" } +func (m *MockupInputPluginParserNew) Description() string { return "Mockup old parser test plugin" } +func (m *MockupInputPluginParserNew) Gather(acc telegraf.Accumulator) error { return nil } +func (m *MockupInputPluginParserNew) SetParser(parser telegraf.Parser) { m.Parser = parser } +func (m *MockupInputPluginParserNew) SetParserFunc(f telegraf.ParserFunc) { m.ParserFunc = f } + /*** Mockup INPUT plugin for testing to avoid cyclic dependencies ***/ type MockupInputPlugin struct { Servers []string `toml:"servers"` @@ -373,13 +739,13 @@ type MockupInputPlugin struct { Log telegraf.Logger `toml:"-"` tls.ServerConfig - parser parsers.Parser + parser telegraf.Parser } -func (m *MockupInputPlugin) SampleConfig() string { return "Mockup test intput plugin" } -func (m *MockupInputPlugin) Description() string { return "Mockup test intput plugin" } +func (m *MockupInputPlugin) SampleConfig() string { return "Mockup test input plugin" } +func (m *MockupInputPlugin) Description() string { return "Mockup test input plugin" } func (m *MockupInputPlugin) Gather(acc telegraf.Accumulator) error { return nil } -func (m *MockupInputPlugin) SetParser(parser parsers.Parser) { m.parser = parser } +func (m *MockupInputPlugin) SetParser(parser telegraf.Parser) { m.parser = parser } /*** Mockup OUTPUT plugin for testing to avoid cyclic dependencies ***/ type MockupOuputPlugin struct { @@ -400,6 +766,8 @@ func (m *MockupOuputPlugin) Write(metrics []telegraf.Metric) error { return nil // Register the mockup plugin on loading func init() { // Register the mockup input plugin for the required names + inputs.Add("parser_test_new", func() telegraf.Input { return &MockupInputPluginParserNew{} }) + inputs.Add("parser_test_old", func() telegraf.Input { return &MockupInputPluginParserOld{} }) inputs.Add("exec", func() telegraf.Input { return &MockupInputPlugin{Timeout: Duration(time.Second * 5)} }) inputs.Add("http_listener_v2", func() telegraf.Input { return &MockupInputPlugin{} }) inputs.Add("memcached", func() telegraf.Input { return &MockupInputPlugin{} }) diff --git a/config/testdata/addressbook.proto b/config/testdata/addressbook.proto new file mode 100644 index 0000000000000..3ed0eb566a987 --- /dev/null +++ b/config/testdata/addressbook.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +package addressbook; + +message Person { + string name = 1; + int32 id = 2; // Unique ID number for this person. + string email = 3; + uint32 age = 4; + + enum PhoneType { + MOBILE = 0; + HOME = 1; + WORK = 2; + } + + message PhoneNumber { + string number = 1; + PhoneType type = 2; + } + + repeated PhoneNumber phones = 5; +} + +message AddressBook { + repeated Person people = 1; + repeated string tags = 2; +} diff --git a/config/testdata/parsers_new.toml b/config/testdata/parsers_new.toml new file mode 100644 index 0000000000000..515d6924339c5 --- /dev/null +++ b/config/testdata/parsers_new.toml @@ -0,0 +1,60 @@ +[[inputs.parser_test_new]] + data_format = "collectd" + +[[inputs.parser_test_new]] + data_format = "csv" + csv_header_row_count = 42 + +[[inputs.parser_test_new]] + data_format = "dropwizard" + +[[inputs.parser_test_new]] + data_format = "form_urlencoded" + +[[inputs.parser_test_new]] + data_format = "graphite" + +[[inputs.parser_test_new]] + data_format = "grok" + grok_patterns = ["%{COMBINED_LOG_FORMAT}"] + +[[inputs.parser_test_new]] + data_format = "influx" + +[[inputs.parser_test_new]] + data_format = "json" + +[[inputs.parser_test_new]] + data_format = "json_v2" + +[[inputs.parser_test_new]] + data_format = "logfmt" + +[[inputs.parser_test_new]] + data_format = "nagios" + +[[inputs.parser_test_new]] + data_format = "prometheus" + +[[inputs.parser_test_new]] + data_format = "prometheusremotewrite" + +[[inputs.parser_test_new]] + data_format = "value" + +[[inputs.parser_test_new]] + data_format = "wavefront" + +[[inputs.parser_test_new]] + data_format = "xml" + +[[inputs.parser_test_new]] + data_format = "xpath_json" + +[[inputs.parser_test_new]] + data_format = "xpath_msgpack" + +[[inputs.parser_test_new]] + data_format = "xpath_protobuf" + xpath_protobuf_file = "testdata/addressbook.proto" + xpath_protobuf_type = "addressbook.AddressBook" diff --git a/config/testdata/parsers_old.toml b/config/testdata/parsers_old.toml new file mode 100644 index 0000000000000..6a0b946a7ee51 --- /dev/null +++ b/config/testdata/parsers_old.toml @@ -0,0 +1,60 @@ +[[inputs.parser_test_old]] + data_format = "collectd" + +[[inputs.parser_test_old]] + data_format = "csv" + csv_header_row_count = 42 + +[[inputs.parser_test_old]] + data_format = "dropwizard" + +[[inputs.parser_test_old]] + data_format = "form_urlencoded" + +[[inputs.parser_test_old]] + data_format = "graphite" + +[[inputs.parser_test_old]] + data_format = "grok" + grok_patterns = ["%{COMBINED_LOG_FORMAT}"] + +[[inputs.parser_test_old]] + data_format = "influx" + +[[inputs.parser_test_old]] + data_format = "json" + +[[inputs.parser_test_old]] + data_format = "json_v2" + +[[inputs.parser_test_old]] + data_format = "logfmt" + +[[inputs.parser_test_old]] + data_format = "nagios" + +[[inputs.parser_test_old]] + data_format = "prometheus" + +[[inputs.parser_test_old]] + data_format = "prometheusremotewrite" + +[[inputs.parser_test_old]] + data_format = "value" + +[[inputs.parser_test_old]] + data_format = "wavefront" + +[[inputs.parser_test_old]] + data_format = "xml" + +[[inputs.parser_test_old]] + data_format = "xpath_json" + +[[inputs.parser_test_old]] + data_format = "xpath_msgpack" + +[[inputs.parser_test_old]] + data_format = "xpath_protobuf" + xpath_protobuf_file = "testdata/addressbook.proto" + xpath_protobuf_type = "addressbook.AddressBook" diff --git a/models/running_parsers.go b/models/running_parsers.go new file mode 100644 index 0000000000000..a7d98bbf8b291 --- /dev/null +++ b/models/running_parsers.go @@ -0,0 +1,97 @@ +package models + +import ( + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +type RunningParser struct { + Parser telegraf.Parser + Config *ParserConfig + log telegraf.Logger + + MetricsParsed selfstat.Stat + ParseTime selfstat.Stat +} + +func NewRunningParser(parser telegraf.Parser, config *ParserConfig) *RunningParser { + tags := map[string]string{"type": config.DataFormat} + if config.Alias != "" { + tags["alias"] = config.Alias + } + + parserErrorsRegister := selfstat.Register("parser", "errors", tags) + logger := NewLogger("parsers", config.DataFormat+"::"+config.Parent, config.Alias) + logger.OnErr(func() { + parserErrorsRegister.Incr(1) + }) + SetLoggerOnPlugin(parser, logger) + + return &RunningParser{ + Parser: parser, + Config: config, + MetricsParsed: selfstat.Register( + "parser", + "metrics_parsed", + tags, + ), + ParseTime: selfstat.Register( + "parser", + "parse_time_ns", + tags, + ), + log: logger, + } +} + +// ParserConfig is the common config for all parsers. +type ParserConfig struct { + Parent string + Alias string + DataFormat string + DefaultTags map[string]string +} + +func (r *RunningParser) LogName() string { + return logName("parsers", r.Config.DataFormat+"::"+r.Config.Parent, r.Config.Alias) +} + +func (r *RunningParser) Init() error { + if p, ok := r.Parser.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return err + } + } + return nil +} + +func (r *RunningParser) Parse(buf []byte) ([]telegraf.Metric, error) { + start := time.Now() + m, err := r.Parser.Parse(buf) + elapsed := time.Since(start) + r.ParseTime.Incr(elapsed.Nanoseconds()) + r.MetricsParsed.Incr(int64(len(m))) + + return m, err +} + +func (r *RunningParser) ParseLine(line string) (telegraf.Metric, error) { + start := time.Now() + m, err := r.Parser.ParseLine(line) + elapsed := time.Since(start) + r.ParseTime.Incr(elapsed.Nanoseconds()) + r.MetricsParsed.Incr(1) + + return m, err +} + +func (r *RunningParser) SetDefaultTags(tags map[string]string) { + r.Parser.SetDefaultTags(tags) +} + +func (r *RunningParser) Log() telegraf.Logger { + return r.log +} diff --git a/parser.go b/parser.go new file mode 100644 index 0000000000000..1112fa2118d35 --- /dev/null +++ b/parser.go @@ -0,0 +1,39 @@ +package telegraf + +// Parser is an interface defining functions that a parser plugin must satisfy. +type Parser interface { + // Parse takes a byte buffer separated by newlines + // ie, `cpu.usage.idle 90\ncpu.usage.busy 10` + // and parses it into telegraf metrics + // + // Must be thread-safe. + Parse(buf []byte) ([]Metric, error) + + // ParseLine takes a single string metric + // ie, "cpu.usage.idle 90" + // and parses it into a telegraf metric. + // + // Must be thread-safe. + ParseLine(line string) (Metric, error) + + // SetDefaultTags tells the parser to add all of the given tags + // to each parsed metric. + // NOTE: do _not_ modify the map after you've passed it here!! + SetDefaultTags(tags map[string]string) +} + +type ParserFunc func() (Parser, error) + +// ParserInput is an interface for input plugins that are able to parse +// arbitrary data formats. +type ParserInput interface { + // SetParser sets the parser function for the interface + SetParser(parser Parser) +} + +// ParserFuncInput is an interface for input plugins that are able to parse +// arbitrary data formats. +type ParserFuncInput interface { + // GetParser returns a new parser. + SetParserFunc(fn ParserFunc) +} diff --git a/plugins/inputs/directory_monitor/directory_monitor_test.go b/plugins/inputs/directory_monitor/directory_monitor_test.go index 3245074711fb2..17cddc5f5fd20 100644 --- a/plugins/inputs/directory_monitor/directory_monitor_test.go +++ b/plugins/inputs/directory_monitor/directory_monitor_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/testutil" ) @@ -35,13 +36,12 @@ func TestCSVGZImport(t *testing.T) { err = r.Init() require.NoError(t, err) - parserConfig := parsers.Config{ - DataFormat: "csv", - CSVHeaderRowCount: 1, - } - require.NoError(t, err) r.SetParserFunc(func() (parsers.Parser, error) { - return parsers.NewParser(&parserConfig) + parser := csv.Parser{ + HeaderRowCount: 1, + } + err := parser.Init() + return &parser, err }) r.Log = testutil.Logger{} @@ -215,15 +215,14 @@ func TestCSVNoSkipRows(t *testing.T) { err = r.Init() require.NoError(t, err) - parserConfig := parsers.Config{ - DataFormat: "csv", - CSVHeaderRowCount: 1, - CSVSkipRows: 0, - CSVTagColumns: []string{"line1"}, - } - require.NoError(t, err) r.SetParserFunc(func() (parsers.Parser, error) { - return parsers.NewParser(&parserConfig) + parser := csv.Parser{ + HeaderRowCount: 1, + SkipRows: 0, + TagColumns: []string{"line1"}, + } + err := parser.Init() + return &parser, err }) r.Log = testutil.Logger{} @@ -288,15 +287,14 @@ func TestCSVSkipRows(t *testing.T) { err = r.Init() require.NoError(t, err) - parserConfig := parsers.Config{ - DataFormat: "csv", - CSVHeaderRowCount: 1, - CSVSkipRows: 2, - CSVTagColumns: []string{"line1"}, - } - require.NoError(t, err) r.SetParserFunc(func() (parsers.Parser, error) { - return parsers.NewParser(&parserConfig) + parser := csv.Parser{ + HeaderRowCount: 1, + SkipRows: 2, + TagColumns: []string{"line1"}, + } + err := parser.Init() + return &parser, err }) r.Log = testutil.Logger{} @@ -363,14 +361,13 @@ func TestCSVMultiHeader(t *testing.T) { err = r.Init() require.NoError(t, err) - parserConfig := parsers.Config{ - DataFormat: "csv", - CSVHeaderRowCount: 2, - CSVTagColumns: []string{"line1"}, - } - require.NoError(t, err) r.SetParserFunc(func() (parsers.Parser, error) { - return parsers.NewParser(&parserConfig) + parser := csv.Parser{ + HeaderRowCount: 2, + TagColumns: []string{"line1"}, + } + err := parser.Init() + return &parser, err }) r.Log = testutil.Logger{} diff --git a/plugins/inputs/file/file_test.go b/plugins/inputs/file/file_test.go index ab09753ca1145..d34cef1759249 100644 --- a/plugins/inputs/file/file_test.go +++ b/plugins/inputs/file/file_test.go @@ -183,7 +183,7 @@ func TestCharacterEncoding(t *testing.T) { tests := []struct { name string plugin *File - csv *csv.Config + csv *csv.Parser file string }{ { @@ -192,7 +192,7 @@ func TestCharacterEncoding(t *testing.T) { Files: []string{"testdata/mtr-utf-8.csv"}, CharacterEncoding: "", }, - csv: &csv.Config{ + csv: &csv.Parser{ MetricName: "file", SkipRows: 1, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, @@ -205,7 +205,7 @@ func TestCharacterEncoding(t *testing.T) { Files: []string{"testdata/mtr-utf-8.csv"}, CharacterEncoding: "utf-8", }, - csv: &csv.Config{ + csv: &csv.Parser{ MetricName: "file", SkipRows: 1, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, @@ -218,7 +218,7 @@ func TestCharacterEncoding(t *testing.T) { Files: []string{"testdata/mtr-utf-16le.csv"}, CharacterEncoding: "utf-16le", }, - csv: &csv.Config{ + csv: &csv.Parser{ MetricName: "file", SkipRows: 1, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, @@ -231,7 +231,7 @@ func TestCharacterEncoding(t *testing.T) { Files: []string{"testdata/mtr-utf-16be.csv"}, CharacterEncoding: "utf-16be", }, - csv: &csv.Config{ + csv: &csv.Parser{ MetricName: "file", SkipRows: 1, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, @@ -244,8 +244,8 @@ func TestCharacterEncoding(t *testing.T) { err := tt.plugin.Init() require.NoError(t, err) - parser, err := csv.NewParser(tt.csv) - require.NoError(t, err) + parser := tt.csv + require.NoError(t, parser.Init()) tt.plugin.SetParser(parser) var acc testutil.Accumulator diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index d869ccc7eb102..2e9228fef3f6f 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -96,7 +96,7 @@ var sampleConfig = ` "telegraf/+/mem", "sensors/#", ] - # topic_fields = "_/_/_/temperature" + # topic_fields = "_/_/_/temperature" ## The message topic will be stored in a tag specified by this value. If set ## to the empty string no topic tag will be created. # topic_tag = "topic" @@ -142,14 +142,14 @@ var sampleConfig = ` ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ## Enable extracting tag values from MQTT topics - ## _ denotes an ignored entry in the topic path + ## _ denotes an ignored entry in the topic path ## [[inputs.mqtt_consumer.topic_parsing]] ## topic = "" ## measurement = "" ## tags = "" ## fields = "" ## [inputs.mqtt_consumer.topic_parsing.types] - ## + ## ` func (m *MQTTConsumer) SampleConfig() string { diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 147f3903bcd8f..6a14cf5041521 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -301,11 +301,13 @@ cpu,42 plugin.FromBeginning = true plugin.Files = []string{tmpfile.Name()} plugin.SetParserFunc(func() (parsers.Parser, error) { - return csv.NewParser(&csv.Config{ + parser := csv.Parser{ MeasurementColumn: "measurement", HeaderRowCount: 1, TimeFunc: func() time.Time { return time.Unix(0, 0) }, - }) + } + err := parser.Init() + return &parser, err }) err = plugin.Init() @@ -360,13 +362,15 @@ skip2,mem,100 plugin.FromBeginning = true plugin.Files = []string{tmpfile.Name()} plugin.SetParserFunc(func() (parsers.Parser, error) { - return csv.NewParser(&csv.Config{ + parser := csv.Parser{ MeasurementColumn: "measurement1", HeaderRowCount: 2, SkipRows: 1, SkipColumns: 1, TimeFunc: func() time.Time { return time.Unix(0, 0) }, - }) + } + err := parser.Init() + return &parser, err }) err = plugin.Init() diff --git a/plugins/parsers/all/all.go b/plugins/parsers/all/all.go new file mode 100644 index 0000000000000..2284bf78aa90f --- /dev/null +++ b/plugins/parsers/all/all.go @@ -0,0 +1,6 @@ +package all + +import ( + //Blank imports for plugins to register themselves + _ "github.com/influxdata/telegraf/plugins/parsers/csv" +) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 7fbcbbada25aa..4b53188eb9269 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -14,27 +14,29 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" ) type TimeFunc func() time.Time -type Config struct { - ColumnNames []string `toml:"csv_column_names"` - ColumnTypes []string `toml:"csv_column_types"` - Comment string `toml:"csv_comment"` - Delimiter string `toml:"csv_delimiter"` - HeaderRowCount int `toml:"csv_header_row_count"` - MeasurementColumn string `toml:"csv_measurement_column"` - MetricName string `toml:"metric_name"` - SkipColumns int `toml:"csv_skip_columns"` - SkipRows int `toml:"csv_skip_rows"` - TagColumns []string `toml:"csv_tag_columns"` - TimestampColumn string `toml:"csv_timestamp_column"` - TimestampFormat string `toml:"csv_timestamp_format"` - Timezone string `toml:"csv_timezone"` - TrimSpace bool `toml:"csv_trim_space"` - SkipValues []string `toml:"csv_skip_values"` - SkipErrors bool `toml:"csv_skip_errors"` +type Parser struct { + ColumnNames []string `toml:"csv_column_names"` + ColumnTypes []string `toml:"csv_column_types"` + Comment string `toml:"csv_comment"` + Delimiter string `toml:"csv_delimiter"` + HeaderRowCount int `toml:"csv_header_row_count"` + MeasurementColumn string `toml:"csv_measurement_column"` + MetricName string `toml:"metric_name"` + SkipColumns int `toml:"csv_skip_columns"` + SkipRows int `toml:"csv_skip_rows"` + TagColumns []string `toml:"csv_tag_columns"` + TimestampColumn string `toml:"csv_timestamp_column"` + TimestampFormat string `toml:"csv_timestamp_format"` + Timezone string `toml:"csv_timezone"` + TrimSpace bool `toml:"csv_trim_space"` + SkipValues []string `toml:"csv_skip_values"` + SkipErrors bool `toml:"csv_skip_errors"` + Log telegraf.Logger `toml:"-"` gotColumnNames bool @@ -42,42 +44,36 @@ type Config struct { DefaultTags map[string]string } -// Parser is a CSV parser, you should use NewParser to create a new instance. -type Parser struct { - *Config - Log telegraf.Logger -} - -func NewParser(c *Config) (*Parser, error) { - if c.HeaderRowCount == 0 && len(c.ColumnNames) == 0 { - return nil, fmt.Errorf("`csv_header_row_count` must be defined if `csv_column_names` is not specified") +func (p *Parser) Init() error { + if p.HeaderRowCount == 0 && len(p.ColumnNames) == 0 { + return fmt.Errorf("`csv_header_row_count` must be defined if `csv_column_names` is not specified") } - if c.Delimiter != "" { - runeStr := []rune(c.Delimiter) + if p.Delimiter != "" { + runeStr := []rune(p.Delimiter) if len(runeStr) > 1 { - return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", c.Delimiter) + return fmt.Errorf("csv_delimiter must be a single character, got: %s", p.Delimiter) } } - if c.Comment != "" { - runeStr := []rune(c.Comment) + if p.Comment != "" { + runeStr := []rune(p.Comment) if len(runeStr) > 1 { - return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", c.Comment) + return fmt.Errorf("csv_delimiter must be a single character, got: %s", p.Comment) } } - if len(c.ColumnNames) > 0 && len(c.ColumnTypes) > 0 && len(c.ColumnNames) != len(c.ColumnTypes) { - return nil, fmt.Errorf("csv_column_names field count doesn't match with csv_column_types") + if len(p.ColumnNames) > 0 && len(p.ColumnTypes) > 0 && len(p.ColumnNames) != len(p.ColumnTypes) { + return fmt.Errorf("csv_column_names field count doesn't match with csv_column_types") } - c.gotColumnNames = len(c.ColumnNames) > 0 + p.gotColumnNames = len(p.ColumnNames) > 0 - if c.TimeFunc == nil { - c.TimeFunc = time.Now + if p.TimeFunc == nil { + p.TimeFunc = time.Now } - return &Parser{Config: c}, nil + return nil } func (p *Parser) SetTimeFunc(fn TimeFunc) { @@ -322,3 +318,30 @@ func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface func (p *Parser) SetDefaultTags(tags map[string]string) { p.DefaultTags = tags } + +func init() { + parsers.Add("csv", + func(defaultMetricName string) telegraf.Parser { + return &Parser{MetricName: defaultMetricName} + }) +} + +func (p *Parser) InitFromConfig(config *parsers.Config) error { + p.HeaderRowCount = config.CSVHeaderRowCount + p.SkipRows = config.CSVSkipRows + p.SkipColumns = config.CSVSkipColumns + p.Delimiter = config.CSVDelimiter + p.Comment = config.CSVComment + p.TrimSpace = config.CSVTrimSpace + p.ColumnNames = config.CSVColumnNames + p.ColumnTypes = config.CSVColumnTypes + p.TagColumns = config.CSVTagColumns + p.MeasurementColumn = config.CSVMeasurementColumn + p.TimestampColumn = config.CSVTimestampColumn + p.TimestampFormat = config.CSVTimestampFormat + p.Timezone = config.CSVTimezone + p.DefaultTags = config.DefaultTags + p.SkipValues = config.CSVSkipValues + + return p.Init() +} diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index a0047b1ad1a20..398f61449e179 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -18,13 +18,12 @@ var DefaultTime = func() time.Time { } func TestBasicCSV(t *testing.T) { - p, err := NewParser( - &Config{ - ColumnNames: []string{"first", "second", "third"}, - TagColumns: []string{"third"}, - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + ColumnNames: []string{"first", "second", "third"}, + TagColumns: []string{"third"}, + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) _, err = p.ParseLine("1.4,true,hi") @@ -32,13 +31,12 @@ func TestBasicCSV(t *testing.T) { } func TestHeaderConcatenationCSV(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 2, - MeasurementColumn: "3", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 2, + MeasurementColumn: "3", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `first,second 1,2,3 @@ -50,14 +48,13 @@ func TestHeaderConcatenationCSV(t *testing.T) { } func TestHeaderOverride(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `line1,line2,line3 3.4,70,test_name` @@ -72,14 +69,13 @@ func TestHeaderOverride(t *testing.T) { testCSVRows := []string{"line1,line2,line3\r\n", "3.4,70,test_name\r\n"} - p, err = NewParser( - &Config{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimeFunc: DefaultTime, - }, - ) + p = &Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimeFunc: DefaultTime, + } + err = p.Init() require.NoError(t, err) metrics, err = p.Parse([]byte(testCSVRows[0])) require.NoError(t, err) @@ -91,16 +87,15 @@ func TestHeaderOverride(t *testing.T) { } func TestTimestamp(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimestampFormat: "02/01/06 03:04:05 PM", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "02/01/06 03:04:05 PM", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `line1,line2,line3 @@ -114,16 +109,15 @@ func TestTimestamp(t *testing.T) { } func TestTimestampYYYYMMDDHHmm(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimestampFormat: "200601021504", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "200601021504", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `line1,line2,line3 @@ -136,15 +130,14 @@ func TestTimestampYYYYMMDDHHmm(t *testing.T) { require.Equal(t, metrics[1].Time().UnixNano(), int64(1247328300000000000)) } func TestTimestampError(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `line1,line2,line3 23/05/09 04:05:06 PM,70,test_name @@ -154,16 +147,15 @@ func TestTimestampError(t *testing.T) { } func TestTimestampUnixFormat(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimestampFormat: "unix", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "unix", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `line1,line2,line3 1243094706,70,test_name @@ -175,16 +167,15 @@ func TestTimestampUnixFormat(t *testing.T) { } func TestTimestampUnixMSFormat(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimestampFormat: "unix_ms", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "unix_ms", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `line1,line2,line3 1243094706123,70,test_name @@ -196,14 +187,13 @@ func TestTimestampUnixMSFormat(t *testing.T) { } func TestQuotedCharacter(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `line1,line2,line3 @@ -214,15 +204,14 @@ func TestQuotedCharacter(t *testing.T) { } func TestDelimiter(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - Delimiter: "%", - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 1, + Delimiter: "%", + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `line1%line2%line3 @@ -233,15 +222,14 @@ func TestDelimiter(t *testing.T) { } func TestValueConversion(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 0, - Delimiter: ",", - ColumnNames: []string{"first", "second", "third", "fourth"}, - MetricName: "test_value", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 0, + Delimiter: ",", + ColumnNames: []string{"first", "second", "third", "fourth"}, + MetricName: "test_value", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `3.3,4,true,hello` @@ -275,15 +263,14 @@ func TestValueConversion(t *testing.T) { } func TestSkipComment(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 0, - Comment: "#", - ColumnNames: []string{"first", "second", "third", "fourth"}, - MetricName: "test_value", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 0, + Comment: "#", + ColumnNames: []string{"first", "second", "third", "fourth"}, + MetricName: "test_value", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `#3.3,4,true,hello 4,9.9,true,name_this` @@ -301,15 +288,14 @@ func TestSkipComment(t *testing.T) { } func TestTrimSpace(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 0, - TrimSpace: true, - ColumnNames: []string{"first", "second", "third", "fourth"}, - MetricName: "test_value", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 0, + TrimSpace: true, + ColumnNames: []string{"first", "second", "third", "fourth"}, + MetricName: "test_value", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := ` 3.3, 4, true,hello` @@ -324,13 +310,12 @@ func TestTrimSpace(t *testing.T) { require.NoError(t, err) require.Equal(t, expectedFields, metrics[0].Fields()) - p, err = NewParser( - &Config{ - HeaderRowCount: 2, - TrimSpace: true, - TimeFunc: DefaultTime, - }, - ) + p = &Parser{ + HeaderRowCount: 2, + TrimSpace: true, + TimeFunc: DefaultTime, + } + err = p.Init() require.NoError(t, err) testCSV = " col , col ,col\n" + " 1 , 2 ,3\n" + @@ -342,15 +327,15 @@ func TestTrimSpace(t *testing.T) { } func TestTrimSpaceDelimitedBySpace(t *testing.T) { - p, err := NewParser( - &Config{ - Delimiter: " ", - HeaderRowCount: 1, - TrimSpace: true, - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + Delimiter: " ", + HeaderRowCount: 1, + TrimSpace: true, + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) + testCSV := ` first second third fourth abcdefgh 0 2 false abcdef 3.3 4 true @@ -369,16 +354,16 @@ abcdefgh 0 2 false } func TestSkipRows(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - SkipRows: 1, - TagColumns: []string{"line1"}, - MeasurementColumn: "line3", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + HeaderRowCount: 1, + SkipRows: 1, + TagColumns: []string{"line1"}, + MeasurementColumn: "line3", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) + testCSV := `garbage nonsense line1,line2,line3 hello,80,test_name2` @@ -395,15 +380,14 @@ hello,80,test_name2` require.Equal(t, expectedFields, metrics[0].Fields()) require.Equal(t, expectedTags, metrics[0].Tags()) - p, err = NewParser( - &Config{ - HeaderRowCount: 1, - SkipRows: 1, - TagColumns: []string{"line1"}, - MeasurementColumn: "line3", - TimeFunc: DefaultTime, - }, - ) + p = &Parser{ + HeaderRowCount: 1, + SkipRows: 1, + TagColumns: []string{"line1"}, + MeasurementColumn: "line3", + TimeFunc: DefaultTime, + } + err = p.Init() require.NoError(t, err) testCSVRows := []string{"garbage nonsense\r\n", "line1,line2,line3\r\n", "hello,80,test_name2\r\n"} @@ -422,13 +406,12 @@ hello,80,test_name2` } func TestSkipColumns(t *testing.T) { - p, err := NewParser( - &Config{ - SkipColumns: 1, - ColumnNames: []string{"line1", "line2"}, - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + SkipColumns: 1, + ColumnNames: []string{"line1", "line2"}, + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) testCSV := `hello,80,test_name` @@ -442,14 +425,14 @@ func TestSkipColumns(t *testing.T) { } func TestSkipColumnsWithHeader(t *testing.T) { - p, err := NewParser( - &Config{ - SkipColumns: 1, - HeaderRowCount: 2, - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + SkipColumns: 1, + HeaderRowCount: 2, + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) + testCSV := `col,col,col 1,2,3 trash,80,test_name` @@ -461,13 +444,11 @@ trash,80,test_name` } func TestMultiHeader(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 2, - TimeFunc: DefaultTime, - }, - ) - require.NoError(t, err) + p := &Parser{ + HeaderRowCount: 2, + TimeFunc: DefaultTime, + } + require.NoError(t, p.Init()) testCSV := `col,col 1,2 80,test_name` @@ -478,12 +459,11 @@ func TestMultiHeader(t *testing.T) { testCSVRows := []string{"col,col\r\n", "1,2\r\n", "80,test_name\r\n"} - p, err = NewParser( - &Config{ - HeaderRowCount: 2, - TimeFunc: DefaultTime, - }, - ) + p = &Parser{ + HeaderRowCount: 2, + TimeFunc: DefaultTime, + } + err = p.Init() require.NoError(t, err) metrics, err = p.Parse([]byte(testCSVRows[0])) @@ -499,13 +479,12 @@ func TestMultiHeader(t *testing.T) { } func TestParseStream(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - HeaderRowCount: 1, - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + MetricName: "csv", + HeaderRowCount: 1, + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) csvHeader := "a,b,c" @@ -530,14 +509,12 @@ func TestParseStream(t *testing.T) { } func TestParseLineMultiMetricErrorMessage(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - HeaderRowCount: 1, - TimeFunc: DefaultTime, - }, - ) - require.NoError(t, err) + p := &Parser{ + MetricName: "csv", + HeaderRowCount: 1, + TimeFunc: DefaultTime, + } + require.NoError(t, p.Init()) csvHeader := "a,b,c" csvOneRow := "1,2,3" @@ -568,16 +545,16 @@ func TestParseLineMultiMetricErrorMessage(t *testing.T) { } func TestTimestampUnixFloatPrecision(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - ColumnNames: []string{"time", "value"}, - TimestampColumn: "time", - TimestampFormat: "unix", - TimeFunc: DefaultTime, - }, - ) + p := &Parser{ + MetricName: "csv", + ColumnNames: []string{"time", "value"}, + TimestampColumn: "time", + TimestampFormat: "unix", + TimeFunc: DefaultTime, + } + err := p.Init() require.NoError(t, err) + data := `1551129661.95456123352050781250,42` expected := []telegraf.Metric{ @@ -597,17 +574,17 @@ func TestTimestampUnixFloatPrecision(t *testing.T) { } func TestSkipMeasurementColumn(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - HeaderRowCount: 1, - TimestampColumn: "timestamp", - TimestampFormat: "unix", - TimeFunc: DefaultTime, - TrimSpace: true, - }, - ) + p := &Parser{ + MetricName: "csv", + HeaderRowCount: 1, + TimestampColumn: "timestamp", + TimestampFormat: "unix", + TimeFunc: DefaultTime, + TrimSpace: true, + } + err := p.Init() require.NoError(t, err) + data := `id,value,timestamp 1,5,1551129661.954561233` @@ -629,17 +606,17 @@ func TestSkipMeasurementColumn(t *testing.T) { } func TestSkipTimestampColumn(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - HeaderRowCount: 1, - TimestampColumn: "timestamp", - TimestampFormat: "unix", - TimeFunc: DefaultTime, - TrimSpace: true, - }, - ) + p := &Parser{ + MetricName: "csv", + HeaderRowCount: 1, + TimestampColumn: "timestamp", + TimestampFormat: "unix", + TimeFunc: DefaultTime, + TrimSpace: true, + } + err := p.Init() require.NoError(t, err) + data := `id,value,timestamp 1,5,1551129661.954561233` @@ -661,18 +638,18 @@ func TestSkipTimestampColumn(t *testing.T) { } func TestTimestampTimezone(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimestampFormat: "02/01/06 03:04:05 PM", - TimeFunc: DefaultTime, - Timezone: "Asia/Jakarta", - }, - ) + p := &Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "02/01/06 03:04:05 PM", + TimeFunc: DefaultTime, + Timezone: "Asia/Jakarta", + } + err := p.Init() require.NoError(t, err) + testCSV := `line1,line2,line3 23/05/09 11:05:06 PM,70,test_name 07/11/09 11:05:06 PM,80,test_name2` @@ -684,15 +661,15 @@ func TestTimestampTimezone(t *testing.T) { } func TestEmptyMeasurementName(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - HeaderRowCount: 1, - ColumnNames: []string{"", "b"}, - MeasurementColumn: "", - }, - ) + p := &Parser{ + MetricName: "csv", + HeaderRowCount: 1, + ColumnNames: []string{"", "b"}, + MeasurementColumn: "", + } + err := p.Init() require.NoError(t, err) + testCSV := `,b 1,2` metrics, err := p.Parse([]byte(testCSV)) @@ -711,15 +688,15 @@ func TestEmptyMeasurementName(t *testing.T) { } func TestNumericMeasurementName(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - HeaderRowCount: 1, - ColumnNames: []string{"a", "b"}, - MeasurementColumn: "a", - }, - ) + p := &Parser{ + MetricName: "csv", + HeaderRowCount: 1, + ColumnNames: []string{"a", "b"}, + MeasurementColumn: "a", + } + err := p.Init() require.NoError(t, err) + testCSV := `a,b 1,2` metrics, err := p.Parse([]byte(testCSV)) @@ -738,14 +715,14 @@ func TestNumericMeasurementName(t *testing.T) { } func TestStaticMeasurementName(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - HeaderRowCount: 1, - ColumnNames: []string{"a", "b"}, - }, - ) + p := &Parser{ + MetricName: "csv", + HeaderRowCount: 1, + ColumnNames: []string{"a", "b"}, + } + err := p.Init() require.NoError(t, err) + testCSV := `a,b 1,2` metrics, err := p.Parse([]byte(testCSV)) @@ -765,15 +742,15 @@ func TestStaticMeasurementName(t *testing.T) { } func TestSkipEmptyStringValue(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - HeaderRowCount: 1, - ColumnNames: []string{"a", "b"}, - SkipValues: []string{""}, - }, - ) + p := &Parser{ + MetricName: "csv", + HeaderRowCount: 1, + ColumnNames: []string{"a", "b"}, + SkipValues: []string{""}, + } + err := p.Init() require.NoError(t, err) + testCSV := `a,b 1,""` metrics, err := p.Parse([]byte(testCSV)) @@ -792,15 +769,15 @@ func TestSkipEmptyStringValue(t *testing.T) { } func TestSkipSpecifiedStringValue(t *testing.T) { - p, err := NewParser( - &Config{ - MetricName: "csv", - HeaderRowCount: 1, - ColumnNames: []string{"a", "b"}, - SkipValues: []string{"MM"}, - }, - ) + p := &Parser{ + MetricName: "csv", + HeaderRowCount: 1, + ColumnNames: []string{"a", "b"}, + SkipValues: []string{"MM"}, + } + err := p.Init() require.NoError(t, err) + testCSV := `a,b 1,MM` metrics, err := p.Parse([]byte(testCSV)) @@ -819,17 +796,17 @@ func TestSkipSpecifiedStringValue(t *testing.T) { } func TestSkipErrorOnCorruptedCSVLine(t *testing.T) { - p, err := NewParser( - &Config{ - HeaderRowCount: 1, - TimestampColumn: "date", - TimestampFormat: "02/01/06 03:04:05 PM", - TimeFunc: DefaultTime, - SkipErrors: true, - }, - ) - require.NoError(t, err) - p.Log = testutil.Logger{} + p := &Parser{ + HeaderRowCount: 1, + TimestampColumn: "date", + TimestampFormat: "02/01/06 03:04:05 PM", + TimeFunc: DefaultTime, + SkipErrors: true, + Log: testutil.Logger{}, + } + err := p.Init() + require.NoError(t, err) + testCSV := `date,a,b 23/05/09 11:05:06 PM,1,2 corrupted_line diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 0bc9fc5e4879c..50cb69cf10aa1 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -5,7 +5,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers/collectd" - "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/form_urlencoded" "github.com/influxdata/telegraf/plugins/parsers/graphite" @@ -22,6 +21,17 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/xpath" ) +// Creator is the function to create a new parser +type Creator func(defaultMetricName string) telegraf.Parser + +// Parsers contains the registry of all known parsers (following the new style) +var Parsers = map[string]Creator{} + +// Add adds a parser to the registry. Usually this function is called in the plugin's init function +func Add(name string, creator Creator) { + Parsers[name] = creator +} + type ParserFunc func() (Parser, error) // ParserInput is an interface for input plugins that are able to parse @@ -62,6 +72,12 @@ type Parser interface { SetDefaultTags(tags map[string]string) } +// ParserCompatibility is an interface for backward-compatible initialization of new parsers +type ParserCompatibility interface { + // InitFromConfig sets the parser internal variables from the old-style config + InitFromConfig(config *Config) error +} + // Config is a struct that covers the data types needed for all parser types, // and can be used to instantiate _any_ of the parsers. type Config struct { @@ -152,7 +168,6 @@ type Config struct { CSVTimezone string `toml:"csv_timezone"` CSVTrimSpace bool `toml:"csv_trim_space"` CSVSkipValues []string `toml:"csv_skip_values"` - CSVSkipErrors bool `toml:"csv_skip_errors"` // FormData configuration FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"` @@ -233,28 +248,6 @@ func NewParser(config *Config) (Parser, error) { config.GrokCustomPatternFiles, config.GrokTimezone, config.GrokUniqueTimestamp) - case "csv": - config := &csv.Config{ - MetricName: config.MetricName, - HeaderRowCount: config.CSVHeaderRowCount, - SkipRows: config.CSVSkipRows, - SkipColumns: config.CSVSkipColumns, - Delimiter: config.CSVDelimiter, - Comment: config.CSVComment, - TrimSpace: config.CSVTrimSpace, - ColumnNames: config.CSVColumnNames, - ColumnTypes: config.CSVColumnTypes, - TagColumns: config.CSVTagColumns, - MeasurementColumn: config.CSVMeasurementColumn, - TimestampColumn: config.CSVTimestampColumn, - TimestampFormat: config.CSVTimestampFormat, - Timezone: config.CSVTimezone, - DefaultTags: config.DefaultTags, - SkipValues: config.CSVSkipValues, - SkipErrors: config.CSVSkipErrors, - } - - return csv.NewParser(config) case "logfmt": parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags) case "form_urlencoded": @@ -282,7 +275,19 @@ func NewParser(config *Config) (Parser, error) { case "json_v2": parser, err = NewJSONPathParser(config.JSONV2Config) default: - err = fmt.Errorf("Invalid data format: %s", config.DataFormat) + creator, found := Parsers[config.DataFormat] + if !found { + return nil, fmt.Errorf("invalid data format: %s", config.DataFormat) + } + + // Try to create new-style parsers the old way... + // DEPRECATED: Please instantiate the parser directly instead of using this function. + parser = creator(config.MetricName) + p, ok := parser.(ParserCompatibility) + if !ok { + return nil, fmt.Errorf("parser for %q cannot be created the old way", config.DataFormat) + } + err = p.InitFromConfig(config) } return parser, err } diff --git a/plugins/parsers/registry_test.go b/plugins/parsers/registry_test.go new file mode 100644 index 0000000000000..472ba92a83ffc --- /dev/null +++ b/plugins/parsers/registry_test.go @@ -0,0 +1,70 @@ +package parsers_test + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" + _ "github.com/influxdata/telegraf/plugins/parsers/all" +) + +func TestRegistry_BackwardCompatibility(t *testing.T) { + cfg := &parsers.Config{ + MetricName: "parser_compatibility_test", + CSVHeaderRowCount: 42, + } + + // Some parsers need certain settings to not error. Furthermore, we + // might need to clear some (pointer) fields for comparison... + override := map[string]struct { + param map[string]interface{} + mask []string + }{ + "csv": { + param: map[string]interface{}{ + "HeaderRowCount": cfg.CSVHeaderRowCount, + }, + mask: []string{"TimeFunc"}, + }, + } + + for name, creator := range parsers.Parsers { + t.Logf("testing %q...", name) + cfg.DataFormat = name + + // Create parser the new way + expected := creator(cfg.MetricName) + if settings, found := override[name]; found { + s := reflect.Indirect(reflect.ValueOf(expected)) + for key, value := range settings.param { + v := reflect.ValueOf(value) + s.FieldByName(key).Set(v) + } + } + if p, ok := expected.(telegraf.Initializer); ok { + require.NoError(t, p.Init()) + } + + // Create parser the old way + actual, err := parsers.NewParser(cfg) + require.NoError(t, err) + + // Compare with mask + if settings, found := override[name]; found { + a := reflect.Indirect(reflect.ValueOf(actual)) + e := reflect.Indirect(reflect.ValueOf(expected)) + for _, key := range settings.mask { + af := a.FieldByName(key) + ef := e.FieldByName(key) + + v := reflect.Zero(ef.Type()) + af.Set(v) + ef.Set(v) + } + } + require.EqualValuesf(t, expected, actual, "format %q", name) + } +} diff --git a/plugins/processors/parser/parser.go b/plugins/processors/parser/parser.go index a7f5b47a1597c..133f173f3e5fa 100644 --- a/plugins/processors/parser/parser.go +++ b/plugins/processors/parser/parser.go @@ -13,7 +13,7 @@ type Parser struct { Merge string `toml:"merge"` ParseFields []string `toml:"parse_fields"` Log telegraf.Logger `toml:"-"` - parser parsers.Parser + parser telegraf.Parser } var SampleConfig = `