Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Migrate logfmt parser to new style #11366

Merged
merged 3 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,6 @@ func TestConfig_ParserInterfaceNewFormat(t *testing.T) {
},
mask: []string{"TimeFunc"},
},
"logfmt": {
mask: []string{"Now"},
},
"xpath_protobuf": {
param: map[string]interface{}{
"ProtobufMessageDef": "testdata/addressbook.proto",
Expand Down Expand Up @@ -555,9 +552,6 @@ func TestConfig_ParserInterfaceOldFormat(t *testing.T) {
},
mask: []string{"TimeFunc"},
},
"logfmt": {
mask: []string{"Now"},
},
"xpath_protobuf": {
param: map[string]interface{}{
"ProtobufMessageDef": "testdata/addressbook.proto",
Expand Down
1 change: 1 addition & 0 deletions plugins/parsers/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/graphite"
_ "github.com/influxdata/telegraf/plugins/parsers/json"
_ "github.com/influxdata/telegraf/plugins/parsers/json_v2"
_ "github.com/influxdata/telegraf/plugins/parsers/logfmt"
_ "github.com/influxdata/telegraf/plugins/parsers/value"
_ "github.com/influxdata/telegraf/plugins/parsers/wavefront"
_ "github.com/influxdata/telegraf/plugins/parsers/xpath"
Expand Down
46 changes: 26 additions & 20 deletions plugins/parsers/logfmt/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logfmt

import (
"bytes"
"errors"
"fmt"
"strconv"
"time"
Expand All @@ -10,31 +11,18 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)

var (
ErrNoMetric = fmt.Errorf("no metric in line")
)
var ErrNoMetric = errors.New("no metric in line")

// Parser decodes logfmt formatted messages into metrics.
type Parser struct {
TagKeys []string `toml:"logfmt_tag_keys"`

MetricName string
DefaultTags map[string]string
Now func() time.Time
srebhan marked this conversation as resolved.
Show resolved Hide resolved
TagKeys []string `toml:"logfmt_tag_keys"`
DefaultTags map[string]string `toml:"-"`

tagFilter filter.Filter
}

// NewParser creates a parser.
func NewParser(metricName string, defaultTags map[string]string, tagKeys []string) *Parser {
return &Parser{
MetricName: metricName,
DefaultTags: defaultTags,
Now: time.Now,
TagKeys: tagKeys,
}
metricName string
tagFilter filter.Filter
}

// Parse converts a slice of bytes in logfmt format to metrics.
Expand Down Expand Up @@ -76,7 +64,7 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
continue
}

m := metric.New(p.MetricName, tags, fields, p.Now())
m := metric.New(p.metricName, tags, fields, time.Now())

metrics = append(metrics, m)
}
Expand Down Expand Up @@ -126,3 +114,21 @@ func (p *Parser) Init() error {

return nil
}

func init() {
// Register parser
parsers.Add("logfmt",
func(defaultMetricName string) telegraf.Parser {
return &Parser{metricName: defaultMetricName}
},
)
}

// InitFromConfig is a compatibility function to construct the parser the old way
func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.metricName = config.MetricName
p.DefaultTags = config.DefaultTags
p.TagKeys = append(p.TagKeys, config.LogFmtTagKeys...)

return p.Init()
}
39 changes: 13 additions & 26 deletions plugins/parsers/logfmt/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,24 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestParse(t *testing.T) {
tests := []struct {
name string
measurement string
now func() time.Time
bytes []byte
want []telegraf.Metric
wantErr bool
}{
{
name: "no bytes returns no metrics",
now: func() time.Time { return time.Unix(0, 0) },
want: []telegraf.Metric{},
},
{
name: "test without trailing end",
bytes: []byte("foo=\"bar\""),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []telegraf.Metric{
testutil.MustMetric(
Expand All @@ -42,7 +39,6 @@ func TestParse(t *testing.T) {
{
name: "test with trailing end",
bytes: []byte("foo=\"bar\"\n"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []telegraf.Metric{
testutil.MustMetric(
Expand All @@ -58,7 +54,6 @@ func TestParse(t *testing.T) {
{
name: "logfmt parser returns all the fields",
bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []telegraf.Metric{
testutil.MustMetric(
Expand All @@ -77,7 +72,6 @@ func TestParse(t *testing.T) {
{
name: "logfmt parser parses every line",
bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []telegraf.Metric{
testutil.MustMetric(
Expand Down Expand Up @@ -105,29 +99,25 @@ func TestParse(t *testing.T) {
},
{
name: "keys without = or values are ignored",
now: func() time.Time { return time.Unix(0, 0) },
bytes: []byte(`i am no data.`),
want: []telegraf.Metric{},
wantErr: false,
},
{
name: "keys without values are ignored",
now: func() time.Time { return time.Unix(0, 0) },
bytes: []byte(`foo="" bar=`),
want: []telegraf.Metric{},
wantErr: false,
},
{
name: "unterminated quote produces error",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
bytes: []byte(`bar=baz foo="bar`),
want: []telegraf.Metric{},
wantErr: true,
},
{
name: "malformed key",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
bytes: []byte(`"foo=" bar=baz`),
want: []telegraf.Metric{},
Expand All @@ -137,16 +127,15 @@ func TestParse(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
metricName: tt.measurement,
}
got, err := l.Parse(tt.bytes)
if (err != nil) != tt.wantErr {
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
return
}

testutil.RequireMetricsEqual(t, tt.want, got)
testutil.RequireMetricsEqual(t, tt.want, got, testutil.IgnoreTime())
})
}
}
Expand All @@ -156,19 +145,16 @@ func TestParseLine(t *testing.T) {
name string
s string
measurement string
now func() time.Time
want telegraf.Metric
wantErr bool
}{
{
name: "No Metric In line",
now: func() time.Time { return time.Unix(0, 0) },
want: nil,
wantErr: true,
},
{
name: "Log parser fmt returns all fields",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`,
want: testutil.MustMetric(
Expand All @@ -185,7 +171,6 @@ func TestParseLine(t *testing.T) {
},
{
name: "ParseLine only returns metrics from first string",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000",
want: testutil.MustMetric(
Expand All @@ -204,14 +189,13 @@ func TestParseLine(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
metricName: tt.measurement,
}
got, err := l.ParseLine(tt.s)
if (err != nil) != tt.wantErr {
t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
}
testutil.RequireMetricEqual(t, tt.want, got)
testutil.RequireMetricEqual(t, tt.want, got, testutil.IgnoreTime())
})
}
}
Expand Down Expand Up @@ -277,15 +261,18 @@ func TestTags(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := NewParser(tt.measurement, map[string]string{}, tt.tagKeys)
assert.NoError(t, l.Init())
l := &Parser{
metricName: tt.measurement,
DefaultTags: map[string]string{},
TagKeys: tt.tagKeys,
}
require.NoError(t, l.Init())

got, err := l.ParseLine(tt.s)

if tt.wantErr {
assert.Error(t, err)
require.Error(t, err)
} else {
assert.NoError(t, err)
require.NoError(t, err)
}
testutil.RequireMetricEqual(t, tt.want, got, testutil.IgnoreTime())
})
Expand Down
10 changes: 0 additions & 10 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream"
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
"github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite"
Expand Down Expand Up @@ -227,8 +226,6 @@ func NewParser(config *Config) (Parser, error) {
config.GrokCustomPatternFiles,
config.GrokTimezone,
config.GrokUniqueTimestamp)
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags, config.LogFmtTagKeys)
case "prometheus":
parser, err = NewPrometheusParser(
config.DefaultTags,
Expand Down Expand Up @@ -310,13 +307,6 @@ func NewDropwizardParser(
return parser, err
}

// NewLogFmtParser returns a logfmt parser with the default options.
func NewLogFmtParser(metricName string, defaultTags map[string]string, tagKeys []string) (Parser, error) {
parser := logfmt.NewParser(metricName, defaultTags, tagKeys)
err := parser.Init()
return parser, err
}

func NewPrometheusParser(defaultTags map[string]string, ignoreTimestamp bool) (Parser, error) {
return &prometheus.Parser{
DefaultTags: defaultTags,
Expand Down