From bcc3de8942000ae7a64db855829cfca099c9841a Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Wed, 29 Jun 2022 14:46:43 -0600 Subject: [PATCH] feat: migrate value parser to new style (#11407) --- plugins/inputs/exec/exec_test.go | 39 ++++-- plugins/inputs/http/http_test.go | 8 +- .../kafka_consumer/kafka_consumer_test.go | 24 +++- plugins/parsers/all/all.go | 1 + plugins/parsers/registry.go | 13 -- plugins/parsers/value/parser.go | 48 ++++--- plugins/parsers/value/parser_test.go | 126 +++++++++++++++--- 7 files changed, 188 insertions(+), 71 deletions(-) diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index ee2c4ba91b6c0..2fa77d3927c6e 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -15,8 +15,8 @@ import ( "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/testutil" ) @@ -139,11 +139,15 @@ func TestCommandError(t *testing.T) { } func TestExecCommandWithGlob(t *testing.T) { - parser, err := parsers.NewValueParser("metric", "string", "", nil) - require.NoError(t, err) + parser := value.Parser{ + MetricName: "metric", + DataType: "string", + } + require.NoError(t, parser.Init()) + e := NewExec() e.Commands = []string{"/bin/ech* metric_value"} - e.SetParser(parser) + e.SetParser(&parser) var acc testutil.Accumulator require.NoError(t, acc.GatherError(e.Gather)) @@ -155,12 +159,15 @@ func TestExecCommandWithGlob(t *testing.T) { } func TestExecCommandWithoutGlob(t *testing.T) { - parser, err := parsers.NewValueParser("metric", "string", "", nil) - require.NoError(t, err) + parser := value.Parser{ + MetricName: "metric", + DataType: "string", + } + require.NoError(t, parser.Init()) e := NewExec() e.Commands = []string{"/bin/echo metric_value"} - e.SetParser(parser) + e.SetParser(&parser) var acc testutil.Accumulator require.NoError(t, acc.GatherError(e.Gather)) @@ -172,11 +179,14 @@ func TestExecCommandWithoutGlob(t *testing.T) { } func TestExecCommandWithoutGlobAndPath(t *testing.T) { - parser, err := parsers.NewValueParser("metric", "string", "", nil) - require.NoError(t, err) + parser := value.Parser{ + MetricName: "metric", + DataType: "string", + } + require.NoError(t, parser.Init()) e := NewExec() e.Commands = []string{"echo metric_value"} - e.SetParser(parser) + e.SetParser(&parser) var acc testutil.Accumulator require.NoError(t, acc.GatherError(e.Gather)) @@ -188,12 +198,15 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) { } func TestExecCommandWithEnv(t *testing.T) { - parser, err := parsers.NewValueParser("metric", "string", "", nil) - require.NoError(t, err) + parser := value.Parser{ + MetricName: "metric", + DataType: "string", + } + require.NoError(t, parser.Init()) e := NewExec() e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"} e.Environment = []string{"METRIC_NAME=metric_value"} - e.SetParser(parser) + e.SetParser(&parser) var acc testutil.Accumulator require.NoError(t, acc.GatherError(e.Gather)) diff --git a/plugins/inputs/http/http_test.go b/plugins/inputs/http/http_test.go index 4a960f50665a8..eec056979b701 100644 --- a/plugins/inputs/http/http_test.go +++ b/plugins/inputs/http/http_test.go @@ -19,6 +19,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/testutil" ) @@ -374,7 +375,12 @@ func TestOAuthClientCredentialsGrant(t *testing.T) { }) tt.plugin.SetParserFunc(func() (telegraf.Parser, error) { - return parsers.NewValueParser("metric", "string", "", nil) + p := &value.Parser{ + MetricName: "metric", + DataType: "string", + } + err := p.Init() + return p, err }) err = tt.plugin.Init() diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 55769a72404df..cfc9e8b85b00a 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -271,8 +271,12 @@ func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { func TestConsumerGroupHandler_Lifecycle(t *testing.T) { acc := &testutil.Accumulator{} - parser := value.NewValueParser("cpu", "int", "", nil) - cg := NewConsumerGroupHandler(acc, 1, parser, testutil.Logger{}) + parser := value.Parser{ + MetricName: "cpu", + DataType: "int", + } + require.NoError(t, parser.Init()) + cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -301,8 +305,12 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) { func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) { acc := &testutil.Accumulator{} - parser := value.NewValueParser("cpu", "int", "", nil) - cg := NewConsumerGroupHandler(acc, 1, parser, testutil.Logger{}) + parser := value.Parser{ + MetricName: "cpu", + DataType: "int", + } + require.NoError(t, parser.Init()) + cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -415,8 +423,12 @@ func TestConsumerGroupHandler_Handle(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { acc := &testutil.Accumulator{} - parser := value.NewValueParser("cpu", "int", "", nil) - cg := NewConsumerGroupHandler(acc, 1, parser, testutil.Logger{}) + parser := value.Parser{ + MetricName: "cpu", + DataType: "int", + } + require.NoError(t, parser.Init()) + cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{}) cg.MaxMessageLen = tt.maxMessageLen cg.TopicTag = tt.topicTag diff --git a/plugins/parsers/all/all.go b/plugins/parsers/all/all.go index 715748b0a41f0..0943ded6f5e01 100644 --- a/plugins/parsers/all/all.go +++ b/plugins/parsers/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/parsers/form_urlencoded" _ "github.com/influxdata/telegraf/plugins/parsers/json" _ "github.com/influxdata/telegraf/plugins/parsers/json_v2" + _ "github.com/influxdata/telegraf/plugins/parsers/value" _ "github.com/influxdata/telegraf/plugins/parsers/wavefront" _ "github.com/influxdata/telegraf/plugins/parsers/xpath" ) diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 8cc20705420f7..fd2fc497d358a 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -15,7 +15,6 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite" "github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2" "github.com/influxdata/telegraf/plugins/parsers/temporary/xpath" - "github.com/influxdata/telegraf/plugins/parsers/value" ) // Creator is the function to create a new parser @@ -202,9 +201,6 @@ func NewParser(config *Config) (Parser, error) { var err error var parser Parser switch config.DataFormat { - case "value": - parser, err = NewValueParser(config.MetricName, - config.DataType, config.ValueFieldName, config.DefaultTags) case "influx": if config.InfluxParserType == "upstream" { parser, err = NewInfluxUpstreamParser() @@ -301,15 +297,6 @@ func NewGraphiteParser( return graphite.NewGraphiteParser(separator, templates, defaultTags) } -func NewValueParser( - metricName string, - dataType string, - fieldName string, - defaultTags map[string]string, -) (Parser, error) { - return value.NewValueParser(metricName, dataType, fieldName, defaultTags), nil -} - func NewDropwizardParser( metricRegistryPath string, timePath string, diff --git a/plugins/parsers/value/parser.go b/plugins/parsers/value/parser.go index dc496663e98d9..1640974d03850 100644 --- a/plugins/parsers/value/parser.go +++ b/plugins/parsers/value/parser.go @@ -9,16 +9,17 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" ) -type ValueParser struct { - MetricName string - DataType string - DefaultTags map[string]string - FieldName string +type Parser struct { + DataType string `toml:"data_type"` + FieldName string `toml:"value_field_name"` + MetricName string `toml:"-"` + DefaultTags map[string]string `toml:"-"` } -func (v *ValueParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func (v *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { vStr := string(bytes.TrimSpace(bytes.Trim(buf, "\x00"))) // unless it's a string, separate out any fields in the buffer, @@ -54,7 +55,7 @@ func (v *ValueParser) Parse(buf []byte) ([]telegraf.Metric, error) { return []telegraf.Metric{m}, nil } -func (v *ValueParser) ParseLine(line string) (telegraf.Metric, error) { +func (v *Parser) ParseLine(line string) (telegraf.Metric, error) { metrics, err := v.Parse([]byte(line)) if err != nil { @@ -68,19 +69,32 @@ func (v *ValueParser) ParseLine(line string) (telegraf.Metric, error) { return metrics[0], nil } -func (v *ValueParser) SetDefaultTags(tags map[string]string) { +func (v *Parser) SetDefaultTags(tags map[string]string) { v.DefaultTags = tags } -func NewValueParser(metricName, dataType, fieldName string, defaultTags map[string]string) *ValueParser { - if fieldName == "" { - fieldName = "value" - } +// InitFromConfig is a compatibility function to construct the parser the old way +func (v *Parser) InitFromConfig(config *parsers.Config) error { + v.MetricName = config.MetricName + v.DefaultTags = config.DefaultTags + return v.Init() +} - return &ValueParser{ - MetricName: metricName, - DataType: dataType, - DefaultTags: defaultTags, - FieldName: fieldName, +func (v *Parser) Init() error { + if v.FieldName == "" { + v.FieldName = "value" } + + return nil +} + +func init() { + parsers.Add("value", + func(defaultMetricName string) telegraf.Parser { + return &Parser{ + FieldName: "value", + MetricName: defaultMetricName, + } + }, + ) } diff --git a/plugins/parsers/value/parser_test.go b/plugins/parsers/value/parser_test.go index 6d8184fef1e02..1a1cf7a61a9a2 100644 --- a/plugins/parsers/value/parser_test.go +++ b/plugins/parsers/value/parser_test.go @@ -7,7 +7,11 @@ import ( ) func TestParseValidValues(t *testing.T) { - parser := NewValueParser("value_test", "integer", "", nil) + parser := Parser{ + MetricName: "value_test", + DataType: "integer", + } + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte("55")) require.NoError(t, err) require.Len(t, metrics, 1) @@ -17,7 +21,11 @@ func TestParseValidValues(t *testing.T) { }, metrics[0].Fields()) require.Equal(t, map[string]string{}, metrics[0].Tags()) - parser = NewValueParser("value_test", "float", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "float", + } + require.NoError(t, parser.Init()) metrics, err = parser.Parse([]byte("64")) require.NoError(t, err) require.Len(t, metrics, 1) @@ -27,7 +35,11 @@ func TestParseValidValues(t *testing.T) { }, metrics[0].Fields()) require.Equal(t, map[string]string{}, metrics[0].Tags()) - parser = NewValueParser("value_test", "string", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "string", + } + require.NoError(t, parser.Init()) metrics, err = parser.Parse([]byte("foobar")) require.NoError(t, err) require.Len(t, metrics, 1) @@ -37,7 +49,11 @@ func TestParseValidValues(t *testing.T) { }, metrics[0].Fields()) require.Equal(t, map[string]string{}, metrics[0].Tags()) - parser = NewValueParser("value_test", "boolean", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "boolean", + } + require.NoError(t, parser.Init()) metrics, err = parser.Parse([]byte("true")) require.NoError(t, err) require.Len(t, metrics, 1) @@ -49,7 +65,11 @@ func TestParseValidValues(t *testing.T) { } func TestParseMultipleValues(t *testing.T) { - parser := NewValueParser("value_test", "integer", "", nil) + parser := Parser{ + MetricName: "value_test", + DataType: "integer", + } + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte(`55 45 223 @@ -66,7 +86,11 @@ func TestParseMultipleValues(t *testing.T) { } func TestParseCustomFieldName(t *testing.T) { - parser := NewValueParser("value_test", "integer", "", nil) + parser := Parser{ + MetricName: "value_test", + DataType: "integer", + } + require.NoError(t, parser.Init()) parser.FieldName = "penguin" metrics, err := parser.Parse([]byte(`55`)) @@ -77,7 +101,11 @@ func TestParseCustomFieldName(t *testing.T) { } func TestParseLineValidValues(t *testing.T) { - parser := NewValueParser("value_test", "integer", "", nil) + parser := Parser{ + MetricName: "value_test", + DataType: "integer", + } + require.NoError(t, parser.Init()) metric, err := parser.ParseLine("55") require.NoError(t, err) require.Equal(t, "value_test", metric.Name()) @@ -86,7 +114,11 @@ func TestParseLineValidValues(t *testing.T) { }, metric.Fields()) require.Equal(t, map[string]string{}, metric.Tags()) - parser = NewValueParser("value_test", "float", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "float", + } + require.NoError(t, parser.Init()) metric, err = parser.ParseLine("64") require.NoError(t, err) require.Equal(t, "value_test", metric.Name()) @@ -95,7 +127,11 @@ func TestParseLineValidValues(t *testing.T) { }, metric.Fields()) require.Equal(t, map[string]string{}, metric.Tags()) - parser = NewValueParser("value_test", "string", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "string", + } + require.NoError(t, parser.Init()) metric, err = parser.ParseLine("foobar") require.NoError(t, err) require.Equal(t, "value_test", metric.Name()) @@ -104,7 +140,11 @@ func TestParseLineValidValues(t *testing.T) { }, metric.Fields()) require.Equal(t, map[string]string{}, metric.Tags()) - parser = NewValueParser("value_test", "boolean", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "boolean", + } + require.NoError(t, parser.Init()) metric, err = parser.ParseLine("true") require.NoError(t, err) require.Equal(t, "value_test", metric.Name()) @@ -115,38 +155,66 @@ func TestParseLineValidValues(t *testing.T) { } func TestParseInvalidValues(t *testing.T) { - parser := NewValueParser("value_test", "integer", "", nil) + parser := Parser{ + MetricName: "value_test", + DataType: "integer", + } + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte("55.0")) require.Error(t, err) require.Len(t, metrics, 0) - parser = NewValueParser("value_test", "float", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "float", + } + require.NoError(t, parser.Init()) metrics, err = parser.Parse([]byte("foobar")) require.Error(t, err) require.Len(t, metrics, 0) - parser = NewValueParser("value_test", "boolean", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "boolean", + } + require.NoError(t, parser.Init()) metrics, err = parser.Parse([]byte("213")) require.Error(t, err) require.Len(t, metrics, 0) } func TestParseLineInvalidValues(t *testing.T) { - parser := NewValueParser("value_test", "integer", "", nil) + parser := Parser{ + MetricName: "value_test", + DataType: "integer", + } + require.NoError(t, parser.Init()) _, err := parser.ParseLine("55.0") require.Error(t, err) - parser = NewValueParser("value_test", "float", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "float", + } + require.NoError(t, parser.Init()) _, err = parser.ParseLine("foobar") require.Error(t, err) - parser = NewValueParser("value_test", "boolean", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "boolean", + } + require.NoError(t, parser.Init()) _, err = parser.ParseLine("213") require.Error(t, err) } func TestParseValidValuesDefaultTags(t *testing.T) { - parser := NewValueParser("value_test", "integer", "", nil) + parser := Parser{ + MetricName: "value_test", + DataType: "integer", + } + require.NoError(t, parser.Init()) parser.SetDefaultTags(map[string]string{"test": "tag"}) metrics, err := parser.Parse([]byte("55")) require.NoError(t, err) @@ -157,7 +225,11 @@ func TestParseValidValuesDefaultTags(t *testing.T) { }, metrics[0].Fields()) require.Equal(t, map[string]string{"test": "tag"}, metrics[0].Tags()) - parser = NewValueParser("value_test", "float", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "float", + } + require.NoError(t, parser.Init()) parser.SetDefaultTags(map[string]string{"test": "tag"}) metrics, err = parser.Parse([]byte("64")) require.NoError(t, err) @@ -168,7 +240,11 @@ func TestParseValidValuesDefaultTags(t *testing.T) { }, metrics[0].Fields()) require.Equal(t, map[string]string{"test": "tag"}, metrics[0].Tags()) - parser = NewValueParser("value_test", "string", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "string", + } + require.NoError(t, parser.Init()) parser.SetDefaultTags(map[string]string{"test": "tag"}) metrics, err = parser.Parse([]byte("foobar")) require.NoError(t, err) @@ -179,7 +255,11 @@ func TestParseValidValuesDefaultTags(t *testing.T) { }, metrics[0].Fields()) require.Equal(t, map[string]string{"test": "tag"}, metrics[0].Tags()) - parser = NewValueParser("value_test", "boolean", "", nil) + parser = Parser{ + MetricName: "value_test", + DataType: "boolean", + } + require.NoError(t, parser.Init()) parser.SetDefaultTags(map[string]string{"test": "tag"}) metrics, err = parser.Parse([]byte("true")) require.NoError(t, err) @@ -192,7 +272,11 @@ func TestParseValidValuesDefaultTags(t *testing.T) { } func TestParseValuesWithNullCharacter(t *testing.T) { - parser := NewValueParser("value_test", "integer", "", nil) + parser := Parser{ + MetricName: "value_test", + DataType: "integer", + } + require.NoError(t, parser.Init()) metrics, err := parser.Parse([]byte("55\x00")) require.NoError(t, err) require.Len(t, metrics, 1)