From 1cbc2e6fa23f13d788fe3ba0b9dcf8e62844bc4d Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Wed, 22 Jun 2022 19:58:21 +0200 Subject: [PATCH 1/3] Cleanup the parser a bit. --- plugins/parsers/logfmt/parser.go | 9 +++------ plugins/parsers/logfmt/parser_test.go | 20 ++------------------ 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index f612c8e2e72a9..2b4593d515cd5 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -2,6 +2,7 @@ package logfmt import ( "bytes" + "errors" "fmt" "strconv" "time" @@ -12,9 +13,7 @@ import ( "github.com/influxdata/telegraf/metric" ) -var ( - ErrNoMetric = fmt.Errorf("no metric in line") -) +var ErrNoMetric = errors.New("no metric in line") // Parser decodes logfmt formatted messages into metrics. type Parser struct { @@ -22,7 +21,6 @@ type Parser struct { MetricName string DefaultTags map[string]string - Now func() time.Time tagFilter filter.Filter } @@ -32,7 +30,6 @@ func NewParser(metricName string, defaultTags map[string]string, tagKeys []strin return &Parser{ MetricName: metricName, DefaultTags: defaultTags, - Now: time.Now, TagKeys: tagKeys, } } @@ -76,7 +73,7 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { continue } - m := metric.New(p.MetricName, tags, fields, p.Now()) + m := metric.New(p.MetricName, tags, fields, time.Now()) metrics = append(metrics, m) } diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index a2ee8178f6072..db0d0687b4906 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -13,20 +13,17 @@ func TestParse(t *testing.T) { tests := []struct { name string measurement string - now func() time.Time bytes []byte want []telegraf.Metric wantErr bool }{ { name: "no bytes returns no metrics", - now: func() time.Time { return time.Unix(0, 0) }, want: []telegraf.Metric{}, }, { name: "test without trailing end", bytes: []byte("foo=\"bar\""), - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", want: []telegraf.Metric{ testutil.MustMetric( @@ -42,7 +39,6 @@ func TestParse(t *testing.T) { { name: "test with trailing end", bytes: []byte("foo=\"bar\"\n"), - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", want: []telegraf.Metric{ testutil.MustMetric( @@ -58,7 +54,6 @@ func TestParse(t *testing.T) { { name: "logfmt parser returns all the fields", bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", want: []telegraf.Metric{ testutil.MustMetric( @@ -77,7 +72,6 @@ func TestParse(t *testing.T) { { name: "logfmt parser parses every line", bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", want: []telegraf.Metric{ testutil.MustMetric( @@ -105,21 +99,18 @@ func TestParse(t *testing.T) { }, { name: "keys without = or values are ignored", - now: func() time.Time { return time.Unix(0, 0) }, bytes: []byte(`i am no data.`), want: []telegraf.Metric{}, wantErr: false, }, { name: "keys without values are ignored", - now: func() time.Time { return time.Unix(0, 0) }, bytes: []byte(`foo="" bar=`), want: []telegraf.Metric{}, wantErr: false, }, { name: "unterminated quote produces error", - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", bytes: []byte(`bar=baz foo="bar`), want: []telegraf.Metric{}, @@ -127,7 +118,6 @@ func TestParse(t *testing.T) { }, { name: "malformed key", - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", bytes: []byte(`"foo=" bar=baz`), want: []telegraf.Metric{}, @@ -138,7 +128,6 @@ func TestParse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { l := Parser{ MetricName: tt.measurement, - Now: tt.now, } got, err := l.Parse(tt.bytes) if (err != nil) != tt.wantErr { @@ -146,7 +135,7 @@ func TestParse(t *testing.T) { return } - testutil.RequireMetricsEqual(t, tt.want, got) + testutil.RequireMetricsEqual(t, tt.want, got, testutil.IgnoreTime()) }) } } @@ -156,19 +145,16 @@ func TestParseLine(t *testing.T) { name string s string measurement string - now func() time.Time want telegraf.Metric wantErr bool }{ { name: "No Metric In line", - now: func() time.Time { return time.Unix(0, 0) }, want: nil, wantErr: true, }, { name: "Log parser fmt returns all fields", - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`, want: testutil.MustMetric( @@ -185,7 +171,6 @@ func TestParseLine(t *testing.T) { }, { name: "ParseLine only returns metrics from first string", - now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000", want: testutil.MustMetric( @@ -205,13 +190,12 @@ func TestParseLine(t *testing.T) { t.Run(tt.name, func(t *testing.T) { l := Parser{ MetricName: tt.measurement, - Now: tt.now, } got, err := l.ParseLine(tt.s) if (err != nil) != tt.wantErr { t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) } - testutil.RequireMetricEqual(t, tt.want, got) + testutil.RequireMetricEqual(t, tt.want, got, testutil.IgnoreTime()) }) } } From d8c293e5c76343cedf7d2f3ce604d68d5cb80cae Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Wed, 22 Jun 2022 20:04:12 +0200 Subject: [PATCH 2/3] Get rid of 'now' customization. --- config/config_test.go | 6 ------ plugins/parsers/logfmt/parser.go | 13 ++++++------- plugins/parsers/logfmt/parser_test.go | 4 ++-- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 9ee7aa21c903d..b8a7dfc3b6bdd 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -415,9 +415,6 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) { }, mask: []string{"TimeFunc"}, }, - "logfmt": { - mask: []string{"Now"}, - }, "xpath_protobuf": { param: map[string]interface{}{ "ProtobufMessageDef": "testdata/addressbook.proto", @@ -555,9 +552,6 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) { }, mask: []string{"TimeFunc"}, }, - "logfmt": { - mask: []string{"Now"}, - }, "xpath_protobuf": { param: map[string]interface{}{ "ProtobufMessageDef": "testdata/addressbook.proto", diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 2b4593d515cd5..9ddaca785a8ca 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -17,18 +17,17 @@ var ErrNoMetric = errors.New("no metric in line") // Parser decodes logfmt formatted messages into metrics. type Parser struct { - TagKeys []string `toml:"logfmt_tag_keys"` + TagKeys []string `toml:"logfmt_tag_keys"` + DefaultTags map[string]string `toml:"-"` - MetricName string - DefaultTags map[string]string - - tagFilter filter.Filter + metricName string + tagFilter filter.Filter } // NewParser creates a parser. func NewParser(metricName string, defaultTags map[string]string, tagKeys []string) *Parser { return &Parser{ - MetricName: metricName, + metricName: metricName, DefaultTags: defaultTags, TagKeys: tagKeys, } @@ -73,7 +72,7 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { continue } - m := metric.New(p.MetricName, tags, fields, time.Now()) + m := metric.New(p.metricName, tags, fields, time.Now()) metrics = append(metrics, m) } diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index db0d0687b4906..feb0fde0656d3 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -127,7 +127,7 @@ func TestParse(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { l := Parser{ - MetricName: tt.measurement, + metricName: tt.measurement, } got, err := l.Parse(tt.bytes) if (err != nil) != tt.wantErr { @@ -189,7 +189,7 @@ func TestParseLine(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { l := Parser{ - MetricName: tt.measurement, + metricName: tt.measurement, } got, err := l.ParseLine(tt.s) if (err != nil) != tt.wantErr { From c536cd9f79600b6c8ec8508aa615dfd579f5f0d1 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Wed, 22 Jun 2022 21:32:56 +0200 Subject: [PATCH 3/3] Migrate logfmt parser to the new-style. --- plugins/parsers/all/all.go | 1 + plugins/parsers/logfmt/parser.go | 28 ++++++++++++++++++--------- plugins/parsers/logfmt/parser_test.go | 15 ++++++++------ plugins/parsers/registry.go | 10 ---------- 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/plugins/parsers/all/all.go b/plugins/parsers/all/all.go index b953730713bc3..cd9cb6c6f4775 100644 --- a/plugins/parsers/all/all.go +++ b/plugins/parsers/all/all.go @@ -8,6 +8,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/parsers/graphite" _ "github.com/influxdata/telegraf/plugins/parsers/json" _ "github.com/influxdata/telegraf/plugins/parsers/json_v2" + _ "github.com/influxdata/telegraf/plugins/parsers/logfmt" _ "github.com/influxdata/telegraf/plugins/parsers/value" _ "github.com/influxdata/telegraf/plugins/parsers/wavefront" _ "github.com/influxdata/telegraf/plugins/parsers/xpath" diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 9ddaca785a8ca..2cd518545610a 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" ) var ErrNoMetric = errors.New("no metric in line") @@ -24,15 +25,6 @@ type Parser struct { tagFilter filter.Filter } -// NewParser creates a parser. -func NewParser(metricName string, defaultTags map[string]string, tagKeys []string) *Parser { - return &Parser{ - metricName: metricName, - DefaultTags: defaultTags, - TagKeys: tagKeys, - } -} - // Parse converts a slice of bytes in logfmt format to metrics. func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { reader := bytes.NewReader(b) @@ -122,3 +114,21 @@ func (p *Parser) Init() error { return nil } + +func init() { + // Register parser + parsers.Add("logfmt", + func(defaultMetricName string) telegraf.Parser { + return &Parser{metricName: defaultMetricName} + }, + ) +} + +// InitFromConfig is a compatibility function to construct the parser the old way +func (p *Parser) InitFromConfig(config *parsers.Config) error { + p.metricName = config.MetricName + p.DefaultTags = config.DefaultTags + p.TagKeys = append(p.TagKeys, config.LogFmtTagKeys...) + + return p.Init() +} diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index feb0fde0656d3..169b238bc68a3 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -6,7 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestParse(t *testing.T) { @@ -261,15 +261,18 @@ func TestTags(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - l := NewParser(tt.measurement, map[string]string{}, tt.tagKeys) - assert.NoError(t, l.Init()) + l := &Parser{ + metricName: tt.measurement, + DefaultTags: map[string]string{}, + TagKeys: tt.tagKeys, + } + require.NoError(t, l.Init()) got, err := l.ParseLine(tt.s) - if tt.wantErr { - assert.Error(t, err) + require.Error(t, err) } else { - assert.NoError(t, err) + require.NoError(t, err) } testutil.RequireMetricEqual(t, tt.want, got, testutil.IgnoreTime()) }) diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 59603bdbdc984..3391288eebedf 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -8,7 +8,6 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream" - "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/prometheus" "github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite" @@ -227,8 +226,6 @@ func NewParser(config *Config) (Parser, error) { config.GrokCustomPatternFiles, config.GrokTimezone, config.GrokUniqueTimestamp) - case "logfmt": - parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags, config.LogFmtTagKeys) case "prometheus": parser, err = NewPrometheusParser( config.DefaultTags, @@ -310,13 +307,6 @@ func NewDropwizardParser( return parser, err } -// NewLogFmtParser returns a logfmt parser with the default options. -func NewLogFmtParser(metricName string, defaultTags map[string]string, tagKeys []string) (Parser, error) { - parser := logfmt.NewParser(metricName, defaultTags, tagKeys) - err := parser.Init() - return parser, err -} - func NewPrometheusParser(defaultTags map[string]string, ignoreTimestamp bool) (Parser, error) { return &prometheus.Parser{ DefaultTags: defaultTags,