diff --git a/agent/agent.go b/agent/agent.go index d86037e79edeb..6f1dd95ea12fc 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -259,24 +259,54 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er // the flusher will flush after metrics are collected. time.Sleep(time.Millisecond * 200) - ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) + // create an output metric channel and a gorouting that continously passes + // each metric onto the output plugins & aggregators. + outMetricC := make(chan telegraf.Metric, 100) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-shutdown: + // TODO aggregators should get stopped here + if len(outMetricC) > 0 { + // keep going until outMetricC is flushed + continue + } + return + case m := <-outMetricC: + // TODO send metrics to aggregators (copy all) + for i, o := range a.Config.Outputs { + if i == len(a.Config.Outputs)-1 { + o.AddMetric(m) + } else { + o.AddMetric(copyMetric(m)) + } + } + } + } + }() + ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) for { select { case <-shutdown: log.Println("Hang on, flushing any cached metrics before shutdown") + // wait for outMetricC to get flushed before flushing outputs + wg.Wait() a.flush() return nil case <-ticker.C: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() - case m := <-metricC: - for i, o := range a.Config.Outputs { - if i == len(a.Config.Outputs)-1 { - o.AddMetric(m) - } else { - o.AddMetric(copyMetric(m)) - } + case metric := <-metricC: + mS := []telegraf.Metric{metric} + for _, filter := range a.Config.Filters { + mS = filter.Apply(mS...) + } + for _, m := range mS { + outMetricC <- m } } } diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 022280d6bbbe4..81f5bbbde7093 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + _ "github.com/influxdata/telegraf/plugins/filters/all" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" diff --git a/filter.go b/filter.go new file mode 100644 index 0000000000000..83e5e2542a367 --- /dev/null +++ b/filter.go @@ -0,0 +1,12 @@ +package telegraf + +type FilterPlugin interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Apply the filter to the given metric + Apply(in ...Metric) []Metric +} diff --git a/internal/config/config.go b/internal/config/config.go index 30e62789023ec..74b3aedb52ba9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/filters" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -50,6 +51,7 @@ type Config struct { Agent *AgentConfig Inputs []*models.RunningInput Outputs []*models.RunningOutput + Filters []*models.RunningFilterPlugin } func NewConfig() *Config { @@ -64,6 +66,7 @@ func NewConfig() *Config { Tags: make(map[string]string), Inputs: make([]*models.RunningInput, 0), Outputs: make([]*models.RunningOutput, 0), + Filters: make([]*models.RunningFilterPlugin, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -499,6 +502,7 @@ func (c *Config) LoadConfig(path string) error { case "outputs": for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { + // legacy [outputs.influxdb] support case *ast.Table: if err = c.addOutput(pluginName, pluginSubTable); err != nil { return fmt.Errorf("Error parsing %s, %s", path, err) @@ -517,6 +521,7 @@ func (c *Config) LoadConfig(path string) error { case "inputs", "plugins": for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { + // legacy [inputs.cpu] support case *ast.Table: if err = c.addInput(pluginName, pluginSubTable); err != nil { return fmt.Errorf("Error parsing %s, %s", path, err) @@ -532,6 +537,20 @@ func (c *Config) LoadConfig(path string) error { pluginName, path) } } + case "filters": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case []*ast.Table: + for _, t := range pluginSubTable { + if err = c.addFilterPlugin(pluginName, t); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + } + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } // Assume it's an input input for legacy config file support if no other // identifiers are present default: @@ -572,6 +591,32 @@ func parseFile(fpath string) (*ast.Table, error) { return toml.Parse(contents) } +func (c *Config) addFilterPlugin(name string, table *ast.Table) error { + creator, ok := filters.Filters[name] + if !ok { + return fmt.Errorf("Undefined but requested filter: %s", name) + } + filter := creator() + + filterConfig, err := buildFilterPlugin(name, table) + if err != nil { + return err + } + + if err := config.UnmarshalTable(table, filter); err != nil { + return err + } + + rf := &models.RunningFilterPlugin{ + Name: name, + FilterPlugin: filter, + Config: filterConfig, + } + + c.Filters = append(c.Filters, rf) + return nil +} + func (c *Config) addOutput(name string, table *ast.Table) error { if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) { return nil @@ -652,6 +697,25 @@ func (c *Config) addInput(name string, table *ast.Table) error { return nil } +// buildFilterPlugin TODO doc +func buildFilterPlugin(name string, tbl *ast.Table) (*models.FilterPluginConfig, error) { + conf := &models.FilterPluginConfig{Name: name} + unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop", + "tagexclude", "taginclude"} + for _, field := range unsupportedFields { + if _, ok := tbl.Fields[field]; ok { + // TODO raise error because field is not supported + } + } + + var err error + conf.Filter, err = buildFilter(tbl) + if err != nil { + return conf, err + } + return conf, nil +} + // buildFilter builds a Filter // (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to // be inserted into the models.OutputConfig/models.InputConfig diff --git a/internal/models/running_filter.go b/internal/models/running_filter.go new file mode 100644 index 0000000000000..f0d57615a4ae7 --- /dev/null +++ b/internal/models/running_filter.go @@ -0,0 +1,37 @@ +package models + +import ( + "github.com/influxdata/telegraf" +) + +type RunningFilterPlugin struct { + Name string + FilterPlugin telegraf.FilterPlugin + Config *FilterPluginConfig +} + +// FilterConfig containing a name and filter +type FilterPluginConfig struct { + Name string + Filter Filter +} + +func (rf *RunningFilterPlugin) Apply(in ...telegraf.Metric) []telegraf.Metric { + ret := []telegraf.Metric{} + + for _, metric := range in { + if rf.Config.Filter.IsActive() { + // check if the filter should be applied to this metric + if ok := rf.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok { + // this means filter should not be applied + ret = append(ret, metric) + continue + } + } + // This metric should pass through the filter, so call the filter Apply + // function and append results to the output slice. + ret = append(ret, rf.FilterPlugin.Apply(metric)...) + } + + return ret +} diff --git a/internal/models/running_filter_test.go b/internal/models/running_filter_test.go new file mode 100644 index 0000000000000..47075ad7b96b0 --- /dev/null +++ b/internal/models/running_filter_test.go @@ -0,0 +1,117 @@ +package models + +import ( + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +type TestFilterPlugin struct { +} + +func (f *TestFilterPlugin) SampleConfig() string { return "" } +func (f *TestFilterPlugin) Description() string { return "" } + +// Apply renames: +// "foo" to "fuz" +// "bar" to "baz" +// And it also drops measurements named "dropme" +func (f *TestFilterPlugin) Apply(in ...telegraf.Metric) []telegraf.Metric { + out := make([]telegraf.Metric, 0) + for _, m := range in { + switch m.Name() { + case "foo": + out = append(out, testutil.TestMetric(1, "fuz")) + case "bar": + out = append(out, testutil.TestMetric(1, "baz")) + case "dropme": + // drop the metric! + default: + out = append(out, m) + } + } + return out +} + +func NewTestRunningFilterPlugin() *RunningFilterPlugin { + out := &RunningFilterPlugin{ + Name: "test", + FilterPlugin: &TestFilterPlugin{}, + Config: &FilterPluginConfig{Filter: Filter{}}, + } + return out +} + +func TestRunningFilterPlugin(t *testing.T) { + inmetrics := []telegraf.Metric{ + testutil.TestMetric(1, "foo"), + testutil.TestMetric(1, "bar"), + testutil.TestMetric(1, "baz"), + } + + expectedNames := []string{ + "fuz", + "baz", + "baz", + } + rfp := NewTestRunningFilterPlugin() + filteredMetrics := rfp.Apply(inmetrics...) + + actualNames := []string{ + filteredMetrics[0].Name(), + filteredMetrics[1].Name(), + filteredMetrics[2].Name(), + } + assert.Equal(t, expectedNames, actualNames) +} + +func TestRunningFilterPlugin_WithNameDrop(t *testing.T) { + inmetrics := []telegraf.Metric{ + testutil.TestMetric(1, "foo"), + testutil.TestMetric(1, "bar"), + testutil.TestMetric(1, "baz"), + } + + expectedNames := []string{ + "foo", + "baz", + "baz", + } + rfp := NewTestRunningFilterPlugin() + + rfp.Config.Filter.NameDrop = []string{"foo"} + assert.NoError(t, rfp.Config.Filter.Compile()) + + filteredMetrics := rfp.Apply(inmetrics...) + + actualNames := []string{ + filteredMetrics[0].Name(), + filteredMetrics[1].Name(), + filteredMetrics[2].Name(), + } + assert.Equal(t, expectedNames, actualNames) +} + +func TestRunningFilterPlugin_DroppedMetric(t *testing.T) { + inmetrics := []telegraf.Metric{ + testutil.TestMetric(1, "dropme"), + testutil.TestMetric(1, "foo"), + testutil.TestMetric(1, "bar"), + } + + expectedNames := []string{ + "fuz", + "baz", + } + rfp := NewTestRunningFilterPlugin() + filteredMetrics := rfp.Apply(inmetrics...) + + actualNames := []string{ + filteredMetrics[0].Name(), + filteredMetrics[1].Name(), + } + assert.Equal(t, expectedNames, actualNames) +} diff --git a/internal/models/running_output_test.go b/internal/models/running_output_test.go index a42d6fc7e6ff2..2bca79a067b01 100644 --- a/internal/models/running_output_test.go +++ b/internal/models/running_output_test.go @@ -132,7 +132,6 @@ func TestRunningOutput_PassFilter(t *testing.T) { func TestRunningOutput_TagIncludeNoMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagInclude: []string{"nothing*"}, }, } @@ -154,7 +153,6 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) { func TestRunningOutput_TagExcludeMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagExclude: []string{"tag*"}, }, } @@ -176,7 +174,6 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) { func TestRunningOutput_TagExcludeNoMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagExclude: []string{"nothing*"}, }, } @@ -198,7 +195,6 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) { func TestRunningOutput_TagIncludeMatch(t *testing.T) { conf := &OutputConfig{ Filter: Filter{ - TagInclude: []string{"tag*"}, }, } diff --git a/plugins/filters/all/all.go b/plugins/filters/all/all.go new file mode 100644 index 0000000000000..aaeb9f2de5ab0 --- /dev/null +++ b/plugins/filters/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/filters/printer" +) diff --git a/plugins/filters/printer/printer.go b/plugins/filters/printer/printer.go new file mode 100644 index 0000000000000..b4adfb5adc46f --- /dev/null +++ b/plugins/filters/printer/printer.go @@ -0,0 +1,35 @@ +package printer + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/filters" +) + +type Printer struct { +} + +var sampleConfig = ` +` + +func (p *Printer) SampleConfig() string { + return sampleConfig +} + +func (p *Printer) Description() string { + return "Print all metrics that pass through this filter." +} + +func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + fmt.Println(metric.String()) + } + return in +} + +func init() { + filters.Add("printer", func() telegraf.FilterPlugin { + return &Printer{} + }) +} diff --git a/plugins/filters/registry.go b/plugins/filters/registry.go new file mode 100644 index 0000000000000..53f0174ddc603 --- /dev/null +++ b/plugins/filters/registry.go @@ -0,0 +1,11 @@ +package filters + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.FilterPlugin + +var Filters = map[string]Creator{} + +func Add(name string, creator Creator) { + Filters[name] = creator +}