diff --git a/accumulator.go b/accumulator.go index d1f8adf614e56..852ad37e00088 100644 --- a/accumulator.go +++ b/accumulator.go @@ -2,188 +2,138 @@ package telegraf import ( "fmt" - "sort" - "strings" "sync" "time" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" ) -// BatchPoints is used to send a batch of data in a single write from telegraf -// to influx -type BatchPoints struct { - sync.Mutex - - client.BatchPoints +type Accumulator interface { + Add(measurement string, value interface{}, + tags map[string]string, t ...time.Time) + AddFields(measurement string, fields map[string]interface{}, + tags map[string]string, t ...time.Time) - Debug bool + SetDefaultTags(tags map[string]string) + AddDefaultTag(key, value string) - Prefix string + Prefix() string + SetPrefix(prefix string) - Config *ConfiguredPlugin + Debug() bool + SetDebug(enabled bool) } -// deepcopy returns a deep copy of the BatchPoints object. This is primarily so -// we can do multithreaded output flushing (see Agent.flush) -func (bp *BatchPoints) deepcopy() *BatchPoints { - bp.Lock() - defer bp.Unlock() - - var bpc BatchPoints - bpc.Time = bp.Time - bpc.Precision = bp.Precision - - bpc.Tags = make(map[string]string) - for k, v := range bp.Tags { - bpc.Tags[k] = v - } +func NewAccumulator( + plugin *ConfiguredPlugin, + points chan *client.Point, +) Accumulator { + acc := accumulator{} + acc.points = points + acc.plugin = plugin + return &acc +} - var pts []client.Point - for _, pt := range bp.Points { - var ptc client.Point +type accumulator struct { + sync.Mutex - ptc.Measurement = pt.Measurement - ptc.Time = pt.Time - ptc.Precision = pt.Precision - ptc.Raw = pt.Raw + points chan *client.Point - ptc.Tags = make(map[string]string) - ptc.Fields = make(map[string]interface{}) + defaultTags map[string]string - for k, v := range pt.Tags { - ptc.Tags[k] = v - } + debug bool - for k, v := range pt.Fields { - ptc.Fields[k] = v - } - pts = append(pts, ptc) - } + plugin *ConfiguredPlugin - bpc.Points = pts - return &bpc + prefix string } -// Add adds a measurement -func (bp *BatchPoints) Add( +func (ac *accumulator) Add( measurement string, - val interface{}, + value interface{}, tags map[string]string, + t ...time.Time, ) { fields := make(map[string]interface{}) - fields["value"] = val - bp.AddFields(measurement, fields, tags) + fields["value"] = value + ac.AddFields(measurement, fields, tags, t...) } -// AddFieldsWithTime adds a measurement with a provided timestamp -func (bp *BatchPoints) AddFieldsWithTime( +func (ac *accumulator) AddFields( measurement string, fields map[string]interface{}, tags map[string]string, - timestamp time.Time, + t ...time.Time, ) { - // TODO this function should add the fields with the timestamp, but that will - // need to wait for the InfluxDB point precision/unit to be fixed - bp.AddFields(measurement, fields, tags) - // bp.Lock() - // defer bp.Unlock() - - // measurement = bp.Prefix + measurement - - // if bp.Config != nil { - // if !bp.Config.ShouldPass(measurement, tags) { - // return - // } - // } - - // if bp.Debug { - // var tg []string - - // for k, v := range tags { - // tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v)) - // } - - // var vals []string - - // for k, v := range fields { - // vals = append(vals, fmt.Sprintf("%s=%v", k, v)) - // } - - // sort.Strings(tg) - // sort.Strings(vals) - - // fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) - // } - - // bp.Points = append(bp.Points, client.Point{ - // Measurement: measurement, - // Tags: tags, - // Fields: fields, - // Time: timestamp, - // }) -} -// AddFields will eventually replace the Add function, once we move to having a -// single plugin as a single measurement with multiple fields -func (bp *BatchPoints) AddFields( - measurement string, - fields map[string]interface{}, - tags map[string]string, -) { - bp.Lock() - defer bp.Unlock() + if tags == nil { + tags = make(map[string]string) + } - // InfluxDB does not support writing uint64 + // InfluxDB client/points does not support writing uint64 + // TODO fix when it does + // https://github.com/influxdb/influxdb/pull/4508 for k, v := range fields { switch val := v.(type) { case uint64: if val < uint64(9223372036854775808) { fields[k] = int64(val) + } else { + fields[k] = int64(9223372036854775807) } } } - measurement = bp.Prefix + measurement + var timestamp time.Time + if len(t) > 0 { + timestamp = t[0] + } else { + timestamp = time.Now() + } - if bp.Config != nil { - if !bp.Config.ShouldPass(measurement, tags) { + if ac.plugin != nil { + if !ac.plugin.ShouldPass(measurement, tags) { return } } - // Apply BatchPoints tags to tags passed in, giving precedence to those - // passed in. This is so that plugins have the ability to override global - // tags. - for k, v := range bp.Tags { - _, ok := tags[k] - if !ok { + for k, v := range ac.defaultTags { + if _, ok := tags[k]; !ok { tags[k] = v } } - if bp.Debug { - var tg []string + if ac.prefix != "" { + measurement = ac.prefix + measurement + } - for k, v := range tags { - tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v)) - } + pt := client.NewPoint(measurement, tags, fields, timestamp) + if ac.debug { + fmt.Println("> " + pt.String()) + } + ac.points <- pt +} - var vals []string +func (ac *accumulator) SetDefaultTags(tags map[string]string) { + ac.defaultTags = tags +} - for k, v := range fields { - vals = append(vals, fmt.Sprintf("%s=%v", k, v)) - } +func (ac *accumulator) AddDefaultTag(key, value string) { + ac.defaultTags[key] = value +} - sort.Strings(tg) - sort.Strings(vals) +func (ac *accumulator) Prefix() string { + return ac.prefix +} - fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) - } +func (ac *accumulator) SetPrefix(prefix string) { + ac.prefix = prefix +} + +func (ac *accumulator) Debug() bool { + return ac.debug +} - bp.Points = append(bp.Points, client.Point{ - Measurement: measurement, - Tags: tags, - Fields: fields, - }) +func (ac *accumulator) SetDebug(debug bool) { + ac.debug = debug } diff --git a/agent.go b/agent.go index 1ae7e0f943808..fcb32aa72fbb8 100644 --- a/agent.go +++ b/agent.go @@ -11,6 +11,8 @@ import ( "github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/plugins" + + "github.com/influxdb/influxdb/client/v2" ) type runningOutput struct { @@ -30,6 +32,9 @@ type Agent struct { // Interval at which to gather information Interval Duration + // Interval at which to flush data + FlushInterval Duration + // Option for outputting data in UTC UTC bool `toml:"utc"` @@ -50,10 +55,11 @@ type Agent struct { // NewAgent returns an Agent struct based off the given Config func NewAgent(config *Config) (*Agent, error) { agent := &Agent{ - Config: config, - Interval: Duration{10 * time.Second}, - UTC: true, - Precision: "s", + Config: config, + Interval: Duration{10 * time.Second}, + FlushInterval: Duration{10 * time.Second}, + UTC: true, + Precision: "s", } // Apply the toml table to the agent config, overriding defaults @@ -170,11 +176,9 @@ func (a *Agent) LoadPlugins(filters []string) ([]string, error) { return names, nil } -// crankParallel runs the plugins that are using the same reporting interval +// gatherParallel runs the plugins that are using the same reporting interval // as the telegraf agent. -func (a *Agent) crankParallel() error { - points := make(chan *BatchPoints, len(a.plugins)) - +func (a *Agent) gatherParallel(pointChan chan *client.Point) error { var wg sync.WaitGroup start := time.Now() @@ -189,100 +193,51 @@ func (a *Agent) crankParallel() error { go func(plugin *runningPlugin) { defer wg.Done() - var bp BatchPoints - bp.Debug = a.Debug - bp.Prefix = plugin.name + "_" - bp.Config = plugin.config - bp.Precision = a.Precision - bp.Tags = a.Config.Tags + acc := NewAccumulator(plugin.config, pointChan) + acc.SetDebug(a.Debug) + acc.SetPrefix(plugin.name + "_") + acc.SetDefaultTags(a.Config.Tags) - if err := plugin.plugin.Gather(&bp); err != nil { + if err := plugin.plugin.Gather(acc); err != nil { log.Printf("Error in plugin [%s]: %s", plugin.name, err) } - points <- &bp }(plugin) } wg.Wait() - close(points) - - var bp BatchPoints - bp.Time = time.Now() - if a.UTC { - bp.Time = bp.Time.UTC() - } - bp.Precision = a.Precision - - for sub := range points { - bp.Points = append(bp.Points, sub.Points...) - } - elapsed := time.Since(start) - log.Printf("Cranking default (%s) interval, gathered %d metrics from %d plugins in %s\n", - a.Interval, len(bp.Points), counter, elapsed) - return a.flush(&bp) -} - -// crank is mostly for test purposes. -func (a *Agent) crank() error { - var bp BatchPoints - - bp.Debug = a.Debug - bp.Precision = a.Precision - - for _, plugin := range a.plugins { - bp.Prefix = plugin.name + "_" - bp.Config = plugin.config - err := plugin.plugin.Gather(&bp) - if err != nil { - return err - } - } - - bp.Tags = a.Config.Tags - bp.Time = time.Now() - if a.UTC { - bp.Time = bp.Time.UTC() - } - - return a.flush(&bp) + log.Printf("Default (%s) interval, gathered metrics from %d plugins in %s\n", + a.Interval, counter, elapsed) + return nil } -// crankSeparate runs the plugins that have been configured with their own +// gatherSeparate runs the plugins that have been configured with their own // reporting interval. -func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { +func (a *Agent) gatherSeparate( + shutdown chan struct{}, + plugin *runningPlugin, + pointChan chan *client.Point, +) error { ticker := time.NewTicker(plugin.config.Interval) for { - var bp BatchPoints var outerr error start := time.Now() - bp.Debug = a.Debug - - bp.Prefix = plugin.name + "_" - bp.Config = plugin.config - bp.Precision = a.Precision - bp.Tags = a.Config.Tags + acc := NewAccumulator(plugin.config, pointChan) + acc.SetDebug(a.Debug) + acc.SetPrefix(plugin.name + "_") + acc.SetDefaultTags(a.Config.Tags) - if err := plugin.plugin.Gather(&bp); err != nil { + if err := plugin.plugin.Gather(acc); err != nil { log.Printf("Error in plugin [%s]: %s", plugin.name, err) - outerr = errors.New("Error encountered processing plugins & outputs") - } - - bp.Time = time.Now() - if a.UTC { - bp.Time = bp.Time.UTC() } elapsed := time.Since(start) - log.Printf("Cranking separate (%s) interval, gathered %d metrics from %s in %s\n", - plugin.config.Interval, len(bp.Points), plugin.name, elapsed) - if err := a.flush(&bp); err != nil { - outerr = errors.New("Error encountered processing plugins & outputs") - } + log.Printf("Separate (%s) interval, gathered metrics from %s in %s\n", + plugin.config.Interval, plugin.name, elapsed) if outerr != nil { return outerr @@ -297,47 +252,22 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err } } -func (a *Agent) flush(bp *BatchPoints) error { - var wg sync.WaitGroup - var outerr error - - for _, o := range a.outputs { - wg.Add(1) - - // Copy BatchPoints - bpc := bp.deepcopy() - - go func(ro *runningOutput) { - defer wg.Done() - // Log all output errors: - if err := ro.output.Write(bpc.BatchPoints); err != nil { - log.Printf("Error in output [%s]: %s", ro.name, err) - outerr = errors.New("Error encountered flushing outputs") - } - }(o) - } - - wg.Wait() - return outerr -} - // Test verifies that we can 'Gather' from all plugins with their configured // Config struct func (a *Agent) Test() error { - var acc BatchPoints - - acc.Debug = true + pointChan := make(chan *client.Point) for _, plugin := range a.plugins { - acc.Prefix = plugin.name + "_" - acc.Config = plugin.config + acc := NewAccumulator(plugin.config, pointChan) + acc.SetDebug(true) + acc.SetPrefix(plugin.name + "_") fmt.Printf("* Plugin: %s, Collection 1\n", plugin.name) if plugin.config.Interval != 0 { fmt.Printf("* Internal: %s\n", plugin.config.Interval) } - if err := plugin.plugin.Gather(&acc); err != nil { + if err := plugin.plugin.Gather(acc); err != nil { return err } @@ -347,7 +277,7 @@ func (a *Agent) Test() error { case "cpu": time.Sleep(500 * time.Millisecond) fmt.Printf("* Plugin: %s, Collection 2\n", plugin.name) - if err := plugin.plugin.Gather(&acc); err != nil { + if err := plugin.plugin.Gather(acc); err != nil { return err } } @@ -356,10 +286,65 @@ func (a *Agent) Test() error { return nil } +func (a *Agent) flush(points []*client.Point) error { + var wg sync.WaitGroup + var outerr error + + for _, o := range a.outputs { + wg.Add(1) + + go func(ro *runningOutput) { + defer wg.Done() + // Log all output errors: + if err := ro.output.Write(points); err != nil { + log.Printf("Error in output [%s]: %s", ro.name, err) + outerr = errors.New("Error encountered flushing outputs") + } + }(o) + } + + wg.Wait() + return outerr +} + +// flusher monitors the points input channel and flushes on the minimum interval +func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error { + ticker := time.NewTicker(a.FlushInterval.Duration) + points := make([]*client.Point, 0) + for { + select { + case <-shutdown: + return nil + case <-ticker.C: + start := time.Now() + if err := a.flush(points); err != nil { + log.Printf(err.Error()) + } + elapsed := time.Since(start) + log.Printf("Flushed %d metrics in %s\n", len(points), elapsed) + points = make([]*client.Point, 0) + case pt := <-pointChan: + points = append(points, pt) + } + } +} + // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup + // channel shared between all plugin threads for accumulating points + pointChan := make(chan *client.Point, 1000) + + wg.Add(1) + go func() { + defer wg.Done() + if err := a.flusher(shutdown, pointChan); err != nil { + log.Printf("Flusher routine failed, exiting: %s\n", err.Error()) + close(shutdown) + } + }() + for _, plugin := range a.plugins { // Start service of any ServicePlugins @@ -374,12 +359,12 @@ func (a *Agent) Run(shutdown chan struct{}) error { } // Special handling for plugins that have their own collection interval - // configured. Default intervals are handled below with crankParallel + // configured. Default intervals are handled below with gatherParallel if plugin.config.Interval != 0 { wg.Add(1) go func(plugin *runningPlugin) { defer wg.Done() - if err := a.crankSeparate(shutdown, plugin); err != nil { + if err := a.gatherSeparate(shutdown, plugin, pointChan); err != nil { log.Printf(err.Error()) } }(plugin) @@ -391,7 +376,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { ticker := time.NewTicker(a.Interval.Duration) for { - if err := a.crankParallel(); err != nil { + if err := a.gatherParallel(pointChan); err != nil { log.Printf(err.Error()) } diff --git a/agent_test.go b/agent_test.go index 3fc384d7e4e0e..cb9847bb0cec8 100644 --- a/agent_test.go +++ b/agent_test.go @@ -74,7 +74,7 @@ func TestAgent_DrivesMetrics(t *testing.T) { plugin.On("Add", "foo", 1.2, nil).Return(nil) plugin.On("Add", "bar", 888, nil).Return(nil) - err := a.crank() + err := a.gather() require.NoError(t, err) } @@ -112,7 +112,7 @@ func TestAgent_AppliesTags(t *testing.T) { plugin.On("Read").Return(msgs, nil) metrics.On("Receive", m2).Return(nil) - err := a.crank() + err := a.gather() require.NoError(t, err) } */ diff --git a/config.go b/config.go index e930c950586d9..f452a989908cf 100644 --- a/config.go +++ b/config.go @@ -355,6 +355,8 @@ var header = `# Telegraf configuration [agent] # Default data collection interval for all plugins interval = "10s" + # Default data flushing interval + flush_interval = "10s" # If utc = false, uses local time (utc is highly recommended) utc = true # Precision of writes, valid values are n, u, ms, s, m, and h diff --git a/outputs/amqp/amqp.go b/outputs/amqp/amqp.go index ec7e7332dc350..b8ae0501ddf4e 100644 --- a/outputs/amqp/amqp.go +++ b/outputs/amqp/amqp.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/outputs" "github.com/streadway/amqp" ) @@ -82,39 +82,21 @@ func (q *AMQP) Description() string { return "Configuration for the AMQP server to send metrics to" } -func (q *AMQP) Write(bp client.BatchPoints) error { +func (q *AMQP) Write(points []*client.Point) error { q.Lock() defer q.Unlock() - if len(bp.Points) == 0 { + if len(points) == 0 { return nil } - var zero_time time.Time - for _, p := range bp.Points { + for _, p := range points { // Combine tags from Point and BatchPoints and grab the resulting // line-protocol output string to write to AMQP var value, key string - if p.Raw != "" { - value = p.Raw - } else { - for k, v := range bp.Tags { - if p.Tags == nil { - p.Tags = make(map[string]string, len(bp.Tags)) - } - p.Tags[k] = v - } - if p.Time == zero_time { - if bp.Time == zero_time { - p.Time = time.Now() - } else { - p.Time = bp.Time - } - } - value = p.MarshalString() - } + value = p.String() if q.RoutingTag != "" { - if h, ok := p.Tags[q.RoutingTag]; ok { + if h, ok := p.Tags()[q.RoutingTag]; ok { key = h } } diff --git a/outputs/datadog/datadog.go b/outputs/datadog/datadog.go index 2190ea4b326ed..2538b7d404c18 100644 --- a/outputs/datadog/datadog.go +++ b/outputs/datadog/datadog.go @@ -8,7 +8,7 @@ import ( "net/url" "sort" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" t "github.com/influxdb/telegraf" "github.com/influxdb/telegraf/outputs" ) @@ -59,19 +59,19 @@ func (d *Datadog) Connect() error { return nil } -func (d *Datadog) Write(bp client.BatchPoints) error { - if len(bp.Points) == 0 { +func (d *Datadog) Write(points []*client.Point) error { + if len(points) == 0 { return nil } ts := TimeSeries{ - Series: make([]*Metric, len(bp.Points)), + Series: make([]*Metric, len(points)), } - for index, pt := range bp.Points { + for index, pt := range points { metric := &Metric{ - Metric: pt.Measurement, - Tags: buildTags(bp.Tags, pt.Tags), + Metric: pt.Name(), + Tags: buildTags(pt.Tags()), } - if p, err := buildPoint(bp, pt); err == nil { + if p, err := buildPoint(pt); err == nil { metric.Points[0] = p } ts.Series[index] = metric @@ -114,13 +114,18 @@ func (d *Datadog) authenticatedUrl() string { return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode()) } -func buildTags(bpTags map[string]string, ptTags map[string]string) []string { - tags := make([]string, (len(bpTags) + len(ptTags))) - index := 0 - for k, v := range bpTags { - tags[index] = fmt.Sprintf("%s:%s", k, v) - index += 1 +func buildPoint(pt *client.Point) (Point, error) { + var p Point + if err := p.setValue(pt.Fields()["value"]); err != nil { + return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) } + p[0] = float64(pt.Time().Unix()) + return p, nil +} + +func buildTags(ptTags map[string]string) []string { + tags := make([]string, len(ptTags)) + index := 0 for k, v := range ptTags { tags[index] = fmt.Sprintf("%s:%s", k, v) index += 1 @@ -129,19 +134,6 @@ func buildTags(bpTags map[string]string, ptTags map[string]string) []string { return tags } -func buildPoint(bp client.BatchPoints, pt client.Point) (Point, error) { - var p Point - if err := p.setValue(pt.Fields["value"]); err != nil { - return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) - } - if pt.Time.IsZero() { - p[0] = float64(bp.Time.Unix()) - } else { - p[0] = float64(pt.Time.Unix()) - } - return p, nil -} - func (p *Point) setValue(v interface{}) error { switch d := v.(type) { case int: diff --git a/outputs/datadog/datadog_test.go b/outputs/datadog/datadog_test.go index b5a7d356529d3..af5cf97f12c59 100644 --- a/outputs/datadog/datadog_test.go +++ b/outputs/datadog/datadog_test.go @@ -11,7 +11,7 @@ import ( "github.com/influxdb/telegraf/testutil" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -38,7 +38,7 @@ func TestUriOverride(t *testing.T) { d.Apikey = "123456" err := d.Connect() require.NoError(t, err) - err = d.Write(testutil.MockBatchPoints()) + err = d.Write(testutil.MockBatchPoints().Points()) require.NoError(t, err) } @@ -57,7 +57,7 @@ func TestBadStatusCode(t *testing.T) { d.Apikey = "123456" err := d.Connect() require.NoError(t, err) - err = d.Write(testutil.MockBatchPoints()) + err = d.Write(testutil.MockBatchPoints().Points()) if err == nil { t.Errorf("error expected but none returned") } else { @@ -74,28 +74,24 @@ func TestAuthenticatedUrl(t *testing.T) { func TestBuildTags(t *testing.T) { var tagtests = []struct { - bpIn map[string]string ptIn map[string]string outTags []string }{ { - map[string]string{"one": "two"}, - map[string]string{"three": "four"}, + map[string]string{"one": "two", "three": "four"}, []string{"one:two", "three:four"}, }, { map[string]string{"aaa": "bbb"}, - map[string]string{}, []string{"aaa:bbb"}, }, { - map[string]string{}, map[string]string{}, []string{}, }, } for _, tt := range tagtests { - tags := buildTags(tt.bpIn, tt.ptIn) + tags := buildTags(tt.ptIn) if !reflect.DeepEqual(tags, tt.outTags) { t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags) } @@ -103,92 +99,114 @@ func TestBuildTags(t *testing.T) { } func TestBuildPoint(t *testing.T) { + tags := make(map[string]string) var tagtests = []struct { - bpIn client.BatchPoints - ptIn client.Point + ptIn *client.Point outPt Point err error }{ { - client.BatchPoints{ - Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + client.NewPoint( + "test1", + tags, + map[string]interface{}{"value": 0.0}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 0.0, }, - client.Point{ - Fields: map[string]interface{}{"value": 0.0}, - }, - Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 0.0}, nil, }, { - client.BatchPoints{}, - client.Point{ - Fields: map[string]interface{}{"value": 1.0}, - Time: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + client.NewPoint( + "test2", + tags, + map[string]interface{}{"value": 1.0}, + time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()), + 1.0, }, - Point{float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()), 1.0}, nil, }, { - client.BatchPoints{ - Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), - }, - client.Point{ - Fields: map[string]interface{}{"value": 10}, + client.NewPoint( + "test3", + tags, + map[string]interface{}{"value": 10}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 10.0, }, - Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 10.0}, nil, }, { - client.BatchPoints{ - Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + client.NewPoint( + "test4", + tags, + map[string]interface{}{"value": int32(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 112345.0, }, - client.Point{ - Fields: map[string]interface{}{"value": int32(112345)}, - }, - Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0}, nil, }, { - client.BatchPoints{ - Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), - }, - client.Point{ - Fields: map[string]interface{}{"value": int64(112345)}, + client.NewPoint( + "test5", + tags, + map[string]interface{}{"value": int64(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 112345.0, }, - Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0}, nil, }, { - client.BatchPoints{ - Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + client.NewPoint( + "test6", + tags, + map[string]interface{}{"value": float32(11234.5)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 11234.5, }, - client.Point{ - Fields: map[string]interface{}{"value": float32(11234.5)}, - }, - Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5}, nil, }, { - client.BatchPoints{ - Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), - }, - client.Point{ - Fields: map[string]interface{}{"value": "11234.5"}, + client.NewPoint( + "test7", + tags, + map[string]interface{}{"value": "11234.5"}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 11234.5, }, - Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5}, fmt.Errorf("unable to extract value from Fields, undeterminable type"), }, } for _, tt := range tagtests { - pt, err := buildPoint(tt.bpIn, tt.ptIn) + pt, err := buildPoint(tt.ptIn) if err != nil && tt.err == nil { - t.Errorf("unexpected error, %+v\n", err) + t.Errorf("%s: unexpected error, %+v\n", tt.ptIn.Name(), err) } if tt.err != nil && err == nil { - t.Errorf("expected an error (%s) but none returned", tt.err.Error()) + t.Errorf("%s: expected an error (%s) but none returned", tt.ptIn.Name(), tt.err.Error()) } if !reflect.DeepEqual(pt, tt.outPt) && tt.err == nil { - t.Errorf("\nexpected %+v\ngot %+v\n", tt.outPt, pt) + t.Errorf("%s: \nexpected %+v\ngot %+v\n", tt.ptIn.Name(), tt.outPt, pt) } } } diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go index f49b58c458ad3..0cd3ca6e0eb9c 100644 --- a/outputs/influxdb/influxdb.go +++ b/outputs/influxdb/influxdb.go @@ -8,7 +8,7 @@ import ( "net/url" "strings" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" t "github.com/influxdb/telegraf" "github.com/influxdb/telegraf/outputs" ) @@ -21,9 +21,10 @@ type InfluxDB struct { Password string Database string UserAgent string + Precision string Timeout t.Duration - conns []*client.Client + conns []client.Client } var sampleConfig = ` @@ -32,6 +33,7 @@ var sampleConfig = ` urls = ["http://localhost:8086"] # required # The target database for metrics (telegraf will create it if not exists) database = "telegraf" # required + precision = "s" # Connection timeout (for the connection with InfluxDB), formatted as a string. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". @@ -63,18 +65,15 @@ func (i *InfluxDB) Connect() error { urls = append(urls, u) } - var conns []*client.Client + var conns []client.Client for _, parsed_url := range urls { - c, err := client.NewClient(client.Config{ - URL: *parsed_url, + c := client.NewClient(client.Config{ + URL: parsed_url, Username: i.Username, Password: i.Password, UserAgent: i.UserAgent, Timeout: i.Timeout.Duration, }) - if err != nil { - return err - } conns = append(conns, c) } @@ -113,15 +112,22 @@ func (i *InfluxDB) Description() string { // Choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. -func (i *InfluxDB) Write(bp client.BatchPoints) error { - bp.Database = i.Database +func (i *InfluxDB) Write(points []*client.Point) error { + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ + Database: i.Database, + Precision: i.Precision, + }) + + for _, point := range points { + bp.AddPoint(point) + } // This will get set to nil if a successful write occurs err := errors.New("Could not write to any InfluxDB server in cluster") p := rand.Perm(len(i.conns)) for _, n := range p { - if _, e := i.conns[n].Write(bp); e != nil { + if e := i.conns[n].Write(bp); e != nil { log.Println("ERROR: " + e.Error()) } else { err = nil diff --git a/outputs/kafka/kafka.go b/outputs/kafka/kafka.go index d0b98c42d78e3..fae9552104a67 100644 --- a/outputs/kafka/kafka.go +++ b/outputs/kafka/kafka.go @@ -3,10 +3,9 @@ package kafka import ( "errors" "fmt" - "time" "github.com/Shopify/sarama" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/outputs" ) @@ -52,40 +51,21 @@ func (k *Kafka) Description() string { return "Configuration for the Kafka server to send metrics to" } -func (k *Kafka) Write(bp client.BatchPoints) error { - if len(bp.Points) == 0 { +func (k *Kafka) Write(points []*client.Point) error { + if len(points) == 0 { return nil } - var zero_time time.Time - for _, p := range bp.Points { + for _, p := range points { // Combine tags from Point and BatchPoints and grab the resulting // line-protocol output string to write to Kafka - var value string - if p.Raw != "" { - value = p.Raw - } else { - for k, v := range bp.Tags { - if p.Tags == nil { - p.Tags = make(map[string]string, len(bp.Tags)) - } - p.Tags[k] = v - } - if p.Time == zero_time { - if bp.Time == zero_time { - p.Time = time.Now() - } else { - p.Time = bp.Time - } - } - value = p.MarshalString() - } + value := p.String() m := &sarama.ProducerMessage{ Topic: k.Topic, Value: sarama.StringEncoder(value), } - if h, ok := p.Tags[k.RoutingTag]; ok { + if h, ok := p.Tags()[k.RoutingTag]; ok { m.Key = sarama.StringEncoder(h) } diff --git a/outputs/mqtt/mqtt.go b/outputs/mqtt/mqtt.go index 6d00cab68f738..e7b7f9ac87847 100644 --- a/outputs/mqtt/mqtt.go +++ b/outputs/mqtt/mqtt.go @@ -10,7 +10,7 @@ import ( "sync" paho "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" t "github.com/influxdb/telegraf" "github.com/influxdb/telegraf/outputs" ) @@ -78,35 +78,31 @@ func (m *MQTT) Description() string { return "Configuration for MQTT server to send metrics to" } -func (m *MQTT) Write(bp client.BatchPoints) error { +func (m *MQTT) Write(points []*client.Point) error { m.Lock() defer m.Unlock() - if len(bp.Points) == 0 { + if len(points) == 0 { return nil } - hostname, ok := bp.Tags["host"] + hostname, ok := points[0].Tags()["host"] if !ok { hostname = "" } - for _, p := range bp.Points { + for _, p := range points { var t []string if m.TopicPrefix != "" { t = append(t, m.TopicPrefix) } - tm := strings.Split(p.Measurement, "_") + tm := strings.Split(p.Name(), "_") if len(tm) < 2 { - tm = []string{p.Measurement, "stat"} + tm = []string{p.Name(), "stat"} } + t = append(t, "host", hostname, tm[0], tm[1]) topic := strings.Join(t, "/") - var value string - if p.Raw != "" { - value = p.Raw - } else { - value = getValue(p.Fields["value"]) - } + value := p.String() err := m.publish(topic, value) if err != nil { return fmt.Errorf("Could not write to MQTT server, %s", err) @@ -116,23 +112,6 @@ func (m *MQTT) Write(bp client.BatchPoints) error { return nil } -func getValue(v interface{}) string { - var ret string - switch v.(type) { - default: - ret = fmt.Sprintf("%v", v) - case bool: - ret = fmt.Sprintf("%t", v) - case float32, float64: - ret = fmt.Sprintf("%f", v) - case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: - ret = fmt.Sprintf("%d", v) - case string, []byte: - ret = fmt.Sprintf("%s", v) - } - return ret -} - func (m *MQTT) publish(topic, body string) error { token := m.Client.Publish(topic, 0, false, body) token.Wait() diff --git a/outputs/opentsdb/opentsdb.go b/outputs/opentsdb/opentsdb.go index a05fd2623b546..edc388f976aa9 100644 --- a/outputs/opentsdb/opentsdb.go +++ b/outputs/opentsdb/opentsdb.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/outputs" ) @@ -58,8 +58,8 @@ func (o *OpenTSDB) Connect() error { return nil } -func (o *OpenTSDB) Write(bp client.BatchPoints) error { - if len(bp.Points) == 0 { +func (o *OpenTSDB) Write(points []*client.Point) error { + if len(points) == 0 { return nil } var timeNow = time.Now() @@ -70,19 +70,20 @@ func (o *OpenTSDB) Write(bp client.BatchPoints) error { if err != nil { return fmt.Errorf("OpenTSDB: Telnet connect fail") } - for _, pt := range bp.Points { + for _, pt := range points { metric := &MetricLine{ - Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Measurement), + Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Name()), Timestamp: timeNow.Unix(), } - metricValue, buildError := buildValue(bp, pt) + + metricValue, buildError := buildValue(pt) if buildError != nil { fmt.Printf("OpenTSDB: %s\n", buildError.Error()) continue } metric.Value = metricValue - tagsSlice := buildTags(bp.Tags, pt.Tags) + tagsSlice := buildTags(pt.Tags()) metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags) @@ -99,13 +100,9 @@ func (o *OpenTSDB) Write(bp client.BatchPoints) error { return nil } -func buildTags(bpTags map[string]string, ptTags map[string]string) []string { - tags := make([]string, (len(bpTags) + len(ptTags))) +func buildTags(ptTags map[string]string) []string { + tags := make([]string, len(ptTags)) index := 0 - for k, v := range bpTags { - tags[index] = fmt.Sprintf("%s=%s", k, v) - index += 1 - } for k, v := range ptTags { tags[index] = fmt.Sprintf("%s=%s", k, v) index += 1 @@ -114,9 +111,9 @@ func buildTags(bpTags map[string]string, ptTags map[string]string) []string { return tags } -func buildValue(bp client.BatchPoints, pt client.Point) (string, error) { +func buildValue(pt *client.Point) (string, error) { var retv string - var v = pt.Fields["value"] + var v = pt.Fields()["value"] switch p := v.(type) { case int64: retv = IntToString(int64(p)) diff --git a/outputs/opentsdb/opentsdb_test.go b/outputs/opentsdb/opentsdb_test.go index e73b1ae2b8869..b7eb313bc3bb0 100644 --- a/outputs/opentsdb/opentsdb_test.go +++ b/outputs/opentsdb/opentsdb_test.go @@ -3,47 +3,42 @@ package opentsdb import ( "reflect" "testing" - "time" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/require" ) func TestBuildTagsTelnet(t *testing.T) { var tagtests = []struct { - bpIn map[string]string ptIn map[string]string outTags []string }{ { - map[string]string{"one": "two"}, - map[string]string{"three": "four"}, + map[string]string{"one": "two", "three": "four"}, []string{"one=two", "three=four"}, }, { map[string]string{"aaa": "bbb"}, - map[string]string{}, []string{"aaa=bbb"}, }, { - map[string]string{"one": "two"}, - map[string]string{"aaa": "bbb"}, + map[string]string{"one": "two", "aaa": "bbb"}, []string{"aaa=bbb", "one=two"}, }, { - map[string]string{}, map[string]string{}, []string{}, }, } for _, tt := range tagtests { - tags := buildTags(tt.bpIn, tt.ptIn) + tags := buildTags(tt.ptIn) if !reflect.DeepEqual(tags, tt.outTags) { t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags) } } } + func TestWrite(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -60,36 +55,24 @@ func TestWrite(t *testing.T) { require.NoError(t, err) // Verify that we can successfully write data to OpenTSDB - err = o.Write(testutil.MockBatchPoints()) + err = o.Write(testutil.MockBatchPoints().Points()) require.NoError(t, err) // Verify postive and negative test cases of writing data - var bp client.BatchPoints - bp.Time = time.Now() - bp.Tags = map[string]string{"testkey": "testvalue"} - bp.Points = []client.Point{ - { - Measurement: "justametric.float", - Fields: map[string]interface{}{"value": float64(1.0)}, - }, - { - Measurement: "justametric.int", - Fields: map[string]interface{}{"value": int64(123456789)}, - }, - { - Measurement: "justametric.uint", - Fields: map[string]interface{}{"value": uint64(123456789012345)}, - }, - { - Measurement: "justametric.string", - Fields: map[string]interface{}{"value": "Lorem Ipsum"}, - }, - { - Measurement: "justametric.anotherfloat", - Fields: map[string]interface{}{"value": float64(42.0)}, - }, - } - err = o.Write(bp) + bp := testutil.MockBatchPoints() + tags := make(map[string]string) + bp.AddPoint(client.NewPoint("justametric.float", tags, + map[string]interface{}{"value": float64(1.0)})) + bp.AddPoint(client.NewPoint("justametric.int", tags, + map[string]interface{}{"value": int64(123456789)})) + bp.AddPoint(client.NewPoint("justametric.uint", tags, + map[string]interface{}{"value": uint64(123456789012345)})) + bp.AddPoint(client.NewPoint("justametric.string", tags, + map[string]interface{}{"value": "Lorem Ipsum"})) + bp.AddPoint(client.NewPoint("justametric.anotherfloat", tags, + map[string]interface{}{"value": float64(42.0)})) + + err = o.Write(bp.Points()) require.NoError(t, err) } diff --git a/outputs/registry.go b/outputs/registry.go index 92ce2b34e5aff..842164e0f1732 100644 --- a/outputs/registry.go +++ b/outputs/registry.go @@ -1,7 +1,7 @@ package outputs import ( - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" ) type Output interface { @@ -9,7 +9,7 @@ type Output interface { Close() error Description() string SampleConfig() string - Write(client.BatchPoints) error + Write(points []*client.Point) error } type Creator func() Output diff --git a/plugins/kafka_consumer/kafka_consumer.go b/plugins/kafka_consumer/kafka_consumer.go index c492352591604..7c1258944d74b 100644 --- a/plugins/kafka_consumer/kafka_consumer.go +++ b/plugins/kafka_consumer/kafka_consumer.go @@ -93,7 +93,7 @@ func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte } for _, point := range points { - acc.AddFieldsWithTime(point.Name(), point.Fields(), point.Tags(), point.Time()) + acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) } case <-timeout: return nil diff --git a/plugins/mongodb/mongodb_data.go b/plugins/mongodb/mongodb_data.go index bb9b7b2a4e889..fda1843bb8e15 100644 --- a/plugins/mongodb/mongodb_data.go +++ b/plugins/mongodb/mongodb_data.go @@ -89,7 +89,7 @@ func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, s } func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) { - acc.AddFieldsWithTime( + acc.AddFields( key, map[string]interface{}{ "value": val, diff --git a/plugins/registry.go b/plugins/registry.go index 88a24097b3a87..f061a3e7c20ad 100644 --- a/plugins/registry.go +++ b/plugins/registry.go @@ -6,17 +6,11 @@ type Accumulator interface { // Create a point with a value, decorating it with tags // NOTE: tags is expected to be owned by the caller, don't mutate // it after passing to Add. - Add(measurement string, value interface{}, tags map[string]string) - - // Create a point with a set of values, decorating it with tags - // NOTE: tags and values are expected to be owned by the caller, don't mutate - // them after passing to AddFieldsWithTime. - AddFieldsWithTime( - measurement string, - values map[string]interface{}, - tags map[string]string, - timestamp time.Time, - ) + Add(measurement string, value interface{}, + tags map[string]string, t ...time.Time) + + AddFields(measurement string, fields map[string]interface{}, + tags map[string]string, t ...time.Time) } type Plugin interface { diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 01d9393b43299..8ade867a9e4af 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -22,7 +22,12 @@ type Accumulator struct { } // Add adds a measurement point to the accumulator -func (a *Accumulator) Add(measurement string, value interface{}, tags map[string]string) { +func (a *Accumulator) Add( + measurement string, + value interface{}, + tags map[string]string, + t ...time.Time, +) { a.Lock() defer a.Unlock() if tags == nil { @@ -38,24 +43,56 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string ) } -// AddFieldsWithTime adds a measurement point with a specified timestamp. -func (a *Accumulator) AddFieldsWithTime( +// AddFields adds a measurement point with a specified timestamp. +func (a *Accumulator) AddFields( measurement string, values map[string]interface{}, tags map[string]string, - timestamp time.Time, + timestamp ...time.Time, ) { + var t time.Time + if len(timestamp) > 0 { + t = timestamp[0] + } else { + t = time.Now() + } a.Points = append( a.Points, &Point{ Measurement: measurement, Values: values, Tags: tags, - Time: timestamp, + Time: t, }, ) } +func (a *Accumulator) SetDefaultTags(tags map[string]string) { + // stub for implementing Accumulator interface. +} + +func (a *Accumulator) AddDefaultTag(key, value string) { + // stub for implementing Accumulator interface. +} + +func (a *Accumulator) Prefix() string { + // stub for implementing Accumulator interface. + return "" +} + +func (a *Accumulator) SetPrefix(prefix string) { + // stub for implementing Accumulator interface. +} + +func (a *Accumulator) Debug() bool { + // stub for implementing Accumulator interface. + return true +} + +func (a *Accumulator) SetDebug(debug bool) { + // stub for implementing Accumulator interface. +} + // Get gets the specified measurement point from the accumulator func (a *Accumulator) Get(measurement string) (*Point, bool) { for _, p := range a.Points { diff --git a/testutil/testutil.go b/testutil/testutil.go index 79a7dd54431a6..70b3a53a2deb7 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -4,9 +4,8 @@ import ( "net" "net/url" "os" - "time" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" ) var localhost = "localhost" @@ -34,13 +33,14 @@ func GetLocalHost() string { // MockBatchPoints returns a mock BatchPoints object for using in unit tests // of telegraf output sinks. func MockBatchPoints() client.BatchPoints { - var bp client.BatchPoints - bp.Time = time.Now() - bp.Tags = map[string]string{"tag1": "value1"} - bp.Points = []client.Point{ - { - Fields: map[string]interface{}{"value": 1.0}, - }, - } + // Create a new point batch + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{}) + + // Create a point and add to batch + tags := map[string]string{"tag1": "value1"} + fields := map[string]interface{}{"value": 1.0} + pt := client.NewPoint("test_point", tags, fields) + bp.AddPoint(pt) + return bp }