Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Migrate collectd parser to new style #11367

Merged
merged 2 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
// Parser options to ignore
case "data_type", "separator", "tag_keys",
// "templates", // shared with serializers
"collectd_auth_file", "collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb",
"dropwizard_metric_registry_path", "dropwizard_tags_path", "dropwizard_tag_paths",
"dropwizard_time_format", "dropwizard_time_path",
"form_urlencoded_tag_keys",
Expand Down
1 change: 1 addition & 0 deletions plugins/parsers/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package all

import (
//Blank imports for plugins to register themselves
_ "github.com/influxdata/telegraf/plugins/parsers/collectd"
_ "github.com/influxdata/telegraf/plugins/parsers/csv"
_ "github.com/influxdata/telegraf/plugins/parsers/json"
_ "github.com/influxdata/telegraf/plugins/parsers/json_v2"
Expand Down
84 changes: 47 additions & 37 deletions plugins/parsers/collectd/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,70 +10,62 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)

const (
DefaultAuthFile = "/etc/collectd/auth_file"
)

type CollectdParser struct {
// DefaultTags will be added to every parsed metric
DefaultTags map[string]string
type Parser struct {
DefaultTags map[string]string `toml:"-"`

//whether or not to split multi value metric into multiple metrics
//default value is split
ParseMultiValue string
Log telegraf.Logger `toml:"-"`
popts network.ParseOpts
}
ParseMultiValue string `toml:"collectd_parse_multivalue"`

func (p *CollectdParser) SetParseOpts(popts *network.ParseOpts) {
p.popts = *popts
}
popts network.ParseOpts
AuthFile string `toml:"collectd_auth_file"`
SecurityLevel string `toml:"collectd_security_level"`
TypesDB []string `toml:"collectd_typesdb"`

func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
split string,
) (*CollectdParser, error) {
popts := network.ParseOpts{}
Log telegraf.Logger `toml:"-"`
}

switch securityLevel {
func (p *Parser) Init() error {
switch p.SecurityLevel {
case "none":
popts.SecurityLevel = network.None
p.popts.SecurityLevel = network.None
case "sign":
popts.SecurityLevel = network.Sign
p.popts.SecurityLevel = network.Sign
case "encrypt":
popts.SecurityLevel = network.Encrypt
p.popts.SecurityLevel = network.Encrypt
default:
popts.SecurityLevel = network.None
p.popts.SecurityLevel = network.None
}

if authFile == "" {
authFile = DefaultAuthFile
if p.AuthFile == "" {
p.AuthFile = DefaultAuthFile
}
popts.PasswordLookup = network.NewAuthFile(authFile)
p.popts.PasswordLookup = network.NewAuthFile(p.AuthFile)

for _, path := range typesDB {
for _, path := range p.TypesDB {
db, err := LoadTypesDB(path)
if err != nil {
return nil, err
return err
}

if popts.TypesDB != nil {
popts.TypesDB.Merge(db)
if p.popts.TypesDB != nil {
p.popts.TypesDB.Merge(db)
} else {
popts.TypesDB = db
p.popts.TypesDB = db
}
}

parser := CollectdParser{popts: popts,
ParseMultiValue: split}
return &parser, nil
return nil
}

func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
valueLists, err := network.Parse(buf, p.popts)
if err != nil {
return nil, fmt.Errorf("collectd parser error: %s", err)
Expand All @@ -98,7 +90,7 @@ func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

func (p *CollectdParser) ParseLine(line string) (telegraf.Metric, error) {
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
if err != nil {
return nil, err
Expand All @@ -111,12 +103,12 @@ func (p *CollectdParser) ParseLine(line string) (telegraf.Metric, error) {
return metrics[0], nil
}

func (p *CollectdParser) SetDefaultTags(tags map[string]string) {
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

// unmarshalValueList translates a ValueList into a Telegraf metric.
func (p *CollectdParser) unmarshalValueList(vl *api.ValueList) []telegraf.Metric {
func (p *Parser) unmarshalValueList(vl *api.ValueList) []telegraf.Metric {
timestamp := vl.Time.UTC()

var metrics []telegraf.Metric
Expand Down Expand Up @@ -205,3 +197,21 @@ func LoadTypesDB(path string) (*api.TypesDB, error) {
}
return api.NewTypesDB(reader)
}

func init() {
parsers.Add("collectd",
func(_ string) telegraf.Parser {
return &Parser{
AuthFile: DefaultAuthFile,
}
})
}

func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.AuthFile = config.CollectdAuthFile
p.SecurityLevel = config.CollectdSecurityLevel
p.TypesDB = config.CollectdTypesDB
p.ParseMultiValue = config.CollectdSplit

return p.Init()
}
63 changes: 36 additions & 27 deletions plugins/parsers/collectd/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ var multiMetric = testCase{
}

func TestNewCollectdParser(t *testing.T) {
parser, err := NewCollectdParser("", "", []string{}, "join")
require.NoError(t, err)
parser := Parser{
ParseMultiValue: "join",
}
require.NoError(t, parser.Init())
require.Equal(t, parser.popts.SecurityLevel, network.None)
require.NotNil(t, parser.popts.PasswordLookup)
require.Nil(t, parser.popts.TypesDB)
Expand All @@ -124,8 +126,8 @@ func TestParse(t *testing.T) {
bytes, err := buf.Bytes()
require.NoError(t, err)

parser := &CollectdParser{}
require.NoError(t, err)
parser := &Parser{}
require.NoError(t, parser.Init())
metrics, err := parser.Parse(bytes)
require.NoError(t, err)

Expand All @@ -139,20 +141,36 @@ func TestParseMultiValueSplit(t *testing.T) {
bytes, err := buf.Bytes()
require.NoError(t, err)

parser := &CollectdParser{ParseMultiValue: "split"}
parser := &Parser{ParseMultiValue: "split"}
require.NoError(t, parser.Init())
metrics, err := parser.Parse(bytes)
require.NoError(t, err)

require.Equal(t, 2, len(metrics))
}

func TestParseMultiValueJoin(t *testing.T) {
buf, err := writeValueList(multiMetric.vl)
require.NoError(t, err)
bytes, err := buf.Bytes()
require.NoError(t, err)

parser := &Parser{ParseMultiValue: "join"}
require.NoError(t, parser.Init())
metrics, err := parser.Parse(bytes)
require.NoError(t, err)

require.Equal(t, 1, len(metrics))
}

func TestParse_DefaultTags(t *testing.T) {
buf, err := writeValueList(singleMetric.vl)
require.NoError(t, err)
bytes, err := buf.Bytes()
require.NoError(t, err)

parser := &CollectdParser{}
parser := &Parser{}
require.NoError(t, parser.Init())
parser.SetDefaultTags(map[string]string{
"foo": "bar",
})
Expand All @@ -164,16 +182,11 @@ func TestParse_DefaultTags(t *testing.T) {
}

func TestParse_SignSecurityLevel(t *testing.T) {
parser := &CollectdParser{}
popts := &network.ParseOpts{
SecurityLevel: network.Sign,
PasswordLookup: &AuthMap{
map[string]string{
"user0": "bar",
},
},
parser := &Parser{
SecurityLevel: "sign",
AuthFile: "testdata/authfile",
}
parser.SetParseOpts(popts)
require.NoError(t, parser.Init())

// Signed data
buf, err := writeValueList(singleMetric.vl)
Expand Down Expand Up @@ -219,16 +232,11 @@ func TestParse_SignSecurityLevel(t *testing.T) {
}

func TestParse_EncryptSecurityLevel(t *testing.T) {
parser := &CollectdParser{}
popts := &network.ParseOpts{
SecurityLevel: network.Encrypt,
PasswordLookup: &AuthMap{
map[string]string{
"user0": "bar",
},
},
parser := &Parser{
SecurityLevel: "encrypt",
AuthFile: "testdata/authfile",
}
parser.SetParseOpts(popts)
require.NoError(t, parser.Init())

// Signed data skipped
buf, err := writeValueList(singleMetric.vl)
Expand Down Expand Up @@ -278,9 +286,10 @@ func TestParseLine(t *testing.T) {
require.NoError(t, err)
bytes, err := buf.Bytes()
require.NoError(t, err)

parser, err := NewCollectdParser("", "", []string{}, "split")
require.NoError(t, err)
parser := Parser{
ParseMultiValue: "split",
}
require.NoError(t, parser.Init())
metric, err := parser.ParseLine(string(bytes))
require.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions plugins/parsers/collectd/testdata/authfile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
user0: bar
13 changes: 0 additions & 13 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
Expand Down Expand Up @@ -219,9 +218,6 @@ func NewParser(config *Config) (Parser, error) {
case "graphite":
parser, err = NewGraphiteParser(config.Separator,
config.Templates, config.DefaultTags)
case "collectd":
parser, err = NewCollectdParser(config.CollectdAuthFile,
config.CollectdSecurityLevel, config.CollectdTypesDB, config.CollectdSplit)
case "dropwizard":
parser, err = NewDropwizardParser(
config.DropwizardMetricRegistryPath,
Expand Down Expand Up @@ -324,15 +320,6 @@ func NewValueParser(
return value.NewValueParser(metricName, dataType, fieldName, defaultTags), nil
}

func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
split string,
) (Parser, error) {
return collectd.NewCollectdParser(authFile, securityLevel, typesDB, split)
}

func NewDropwizardParser(
metricRegistryPath string,
timePath string,
Expand Down