Skip to content

Commit

Permalink
feat: Parser plugin restructuring (influxdata#8791)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored and powersj committed Jan 21, 2022
1 parent dc26b2c commit c10ac0a
Show file tree
Hide file tree
Showing 19 changed files with 1,320 additions and 445 deletions.
7 changes: 7 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func (a *Agent) initPlugins() error {
input.LogName(), err)
}
}
for _, parser := range a.Config.Parsers {
err := parser.Init()
if err != nil {
return fmt.Errorf("could not initialize parser %s::%s: %v",
parser.Config.DataFormat, parser.Config.Parent, err)
}
}
for _, processor := range a.Config.Processors {
err := processor.Init()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/parsers/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"gopkg.in/tomb.v1"
)
Expand Down
225 changes: 179 additions & 46 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Config struct {
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Aggregators []*models.RunningAggregator
Parsers []*models.RunningParser
// Processors have a slice wrapper type because they need to be sorted
Processors models.RunningProcessors
AggProcessors models.RunningProcessors
Expand Down Expand Up @@ -103,6 +104,7 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Parsers: make([]*models.RunningParser, 0),
Processors: make([]*models.RunningProcessor, 0),
AggProcessors: make([]*models.RunningProcessor, 0),
InputFilters: make([]string, 0),
Expand Down Expand Up @@ -233,6 +235,15 @@ func (c *Config) AggregatorNames() []string {
return PluginNameCounts(name)
}

// ParserNames returns a list of strings of the configured parsers.
func (c *Config) ParserNames() []string {
var name []string
for _, parser := range c.Parsers {
name = append(name, parser.Config.DataFormat)
}
return PluginNameCounts(name)
}

// ProcessorNames returns a list of strings of the configured processors.
func (c *Config) ProcessorNames() []string {
var name []string
Expand Down Expand Up @@ -1048,6 +1059,39 @@ func (c *Config) addAggregator(name string, table *ast.Table) error {
return nil
}

func (c *Config) probeParser(table *ast.Table) bool {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)

_, ok := parsers.Parsers[dataformat]
return ok
}

func (c *Config) addParser(parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)

creator, ok := parsers.Parsers[dataformat]
if !ok {
return nil, fmt.Errorf("Undefined but requested parser: %s", dataformat)
}
parser := creator(parentname)

conf, err := c.buildParser(parentname, table)
if err != nil {
return nil, err
}

if err := c.toml.UnmarshalTable(table, parser); err != nil {
return nil, err
}

running := models.NewRunningParser(parser, conf)
c.Parsers = append(c.Parsers, running)

return running, nil
}

func (c *Config) addProcessor(name string, table *ast.Table) error {
creator, ok := processors.Processors[name]
if !ok {
Expand Down Expand Up @@ -1162,6 +1206,17 @@ func (c *Config) addInput(name string, table *ast.Table) error {
name = "diskio"
}

// For inputs with parsers we need to compute the set of
// options that is not covered by both, the parser and the input.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
// missing entries at the end.
missThreshold := 0
missCount := make(map[string]int)
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()

creator, ok := inputs.Inputs[name]
if !ok {
// Handle removed, deprecated plugins
Expand All @@ -1174,35 +1229,95 @@ func (c *Config) addInput(name string, table *ast.Table) error {
}
input := creator()

// If the input has a SetParser function, then this means it can accept
// arbitrary types of input, so build the parser and set it.
if t, ok := input.(parsers.ParserInput); ok {
parser, err := c.buildParser(name, table)
if err != nil {
return err
// If the input has a SetParser or SetParserFunc function, it can accept
// arbitrary data-formats, so build the requested parser and set it.
if t, ok := input.(telegraf.ParserInput); ok {
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
}
t.SetParser(parser)
}

if t, ok := input.(parsers.ParserFuncInput); ok {
config, err := c.getParserConfig(name, table)
if err != nil {
return err
// Keep the old interface for backward compatibility
if t, ok := input.(parsers.ParserInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserInput.
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
}
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := parsers.NewParser(config)
}

if t, ok := input.(telegraf.ParserFuncInput); ok {
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (telegraf.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return nil, err
return err
}
logger := models.NewLogger("parsers", config.DataFormat, name)
models.SetLoggerOnPlugin(parser, logger)
if initializer, ok := parser.(telegraf.Initializer); ok {
if err := initializer.Init(); err != nil {
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.buildParserOld(name, config)
})
}
}

if t, ok := input.(parsers.ParserFuncInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput.
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
return parser, nil
})
t.SetParserFunc(func() (parsers.Parser, error) {
return c.buildParserOld(name, config)
})
}
}

pluginConfig, err := c.buildInput(name, table)
Expand All @@ -1221,6 +1336,17 @@ func (c *Config) addInput(name string, table *ast.Table) error {
rp := models.NewRunningInput(input, pluginConfig)
rp.SetDefaultTags(c.Tags)
c.Inputs = append(c.Inputs, rp)

// Check the number of misses against the threshold
for key, count := range missCount {
if count <= missThreshold {
continue
}
if err := c.missingTomlField(nil, key); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -1265,6 +1391,21 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato
return conf, nil
}

// buildParser parses Parser specific items from the ast.Table,
// builds the filter and returns a
// models.ParserConfig to be inserted into models.RunningParser
func (c *Config) buildParser(name string, tbl *ast.Table) (*models.ParserConfig, error) {
var dataformat string
c.getFieldString(tbl, "data_format", &dataformat)

conf := &models.ParserConfig{
Parent: name,
DataFormat: dataformat,
}

return conf, nil
}

// buildProcessor parses Processor specific items from the ast.Table,
// builds the filter and returns a
// models.ProcessorConfig to be inserted into models.RunningProcessor
Expand Down Expand Up @@ -1353,14 +1494,10 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
return cp, nil
}

// buildParser grabs the necessary entries from the ast.Table for creating
// buildParserOld grabs the necessary entries from the ast.Table for creating
// a parsers.Parser object, and creates it, which can then be added onto
// an Input object.
func (c *Config) buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
config, err := c.getParserConfig(name, tbl)
if err != nil {
return nil, err
}
func (c *Config) buildParserOld(name string, config *parsers.Config) (telegraf.Parser, error) {
parser, err := parsers.NewParser(config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1422,23 +1559,6 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config,
c.getFieldString(tbl, "grok_timezone", &pc.GrokTimezone)
c.getFieldString(tbl, "grok_unique_timestamp", &pc.GrokUniqueTimestamp)

//for csv parser
c.getFieldStringSlice(tbl, "csv_column_names", &pc.CSVColumnNames)
c.getFieldStringSlice(tbl, "csv_column_types", &pc.CSVColumnTypes)
c.getFieldStringSlice(tbl, "csv_tag_columns", &pc.CSVTagColumns)
c.getFieldString(tbl, "csv_timezone", &pc.CSVTimezone)
c.getFieldString(tbl, "csv_delimiter", &pc.CSVDelimiter)
c.getFieldString(tbl, "csv_comment", &pc.CSVComment)
c.getFieldString(tbl, "csv_measurement_column", &pc.CSVMeasurementColumn)
c.getFieldString(tbl, "csv_timestamp_column", &pc.CSVTimestampColumn)
c.getFieldString(tbl, "csv_timestamp_format", &pc.CSVTimestampFormat)
c.getFieldInt(tbl, "csv_header_row_count", &pc.CSVHeaderRowCount)
c.getFieldInt(tbl, "csv_skip_rows", &pc.CSVSkipRows)
c.getFieldInt(tbl, "csv_skip_columns", &pc.CSVSkipColumns)
c.getFieldBool(tbl, "csv_trim_space", &pc.CSVTrimSpace)
c.getFieldStringSlice(tbl, "csv_skip_values", &pc.CSVSkipValues)
c.getFieldBool(tbl, "csv_skip_errors", &pc.CSVSkipErrors)

c.getFieldStringSlice(tbl, "form_urlencoded_tag_keys", &pc.FormUrlencodedTagKeys)

c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName)
Expand Down Expand Up @@ -1652,9 +1772,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
switch key {
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
"csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns", "csv_skip_errors",
"csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
"dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path",
"fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys",
Expand All @@ -1679,6 +1796,22 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
return nil
}

func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) {
f := func(_ reflect.Type, key string) error {
if c, ok := counter[key]; ok {
counter[key] = c + 1
} else {
counter[key] = 1
}
return nil
}
c.toml.MissingField = f
}

func (c *Config) resetMissingTomlFieldTracker() {
c.toml.MissingField = c.missingTomlField
}

func (c *Config) getFieldString(tbl *ast.Table, fieldName string, target *string) {
if node, ok := tbl.Fields[fieldName]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
Expand Down
Loading

0 comments on commit c10ac0a

Please sign in to comment.