diff --git a/accumulator.go b/accumulator.go index f7fb7c1e421d0..cc92cefb69d7b 100644 --- a/accumulator.go +++ b/accumulator.go @@ -7,15 +7,167 @@ import ( "sync" "time" - "github.com/influxdb/influxdb/client" + oldclient "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" ) +type Accumulator interface { + Add(measurement string, value interface{}, + tags map[string]string, t ...time.Time) + AddFields(measurement string, fields map[string]interface{}, + tags map[string]string, t ...time.Time) + + BatchPoints() client.BatchPoints + + SetDefaultTags(tags map[string]string) + AddDefaultTag(key, value string) + + Prefix() string + SetPrefix(prefix string) + + Debug() bool + SetDebug(enabled bool) + + Database() string + SetDatabase(database string) +} + +func NewAccumulator( + precision string, + config *ConfiguredPlugin, +) (Accumulator, error) { + var err error + acc := accumulator{} + bpconfig := client.BatchPointsConfig{ + Precision: precision, + } + if acc.batch, err = client.NewBatchPoints(bpconfig); err != nil { + return nil, err + } + acc.config = config + return &acc, nil +} + +type accumulator struct { + sync.Mutex + + batch client.BatchPoints + + defaultTags map[string]string + + debug bool + + config *ConfiguredPlugin + + prefix string +} + +func (ac *accumulator) Add( + measurement string, + value interface{}, + tags map[string]string, + t ...time.Time, +) { + fields := make(map[string]interface{}) + fields["value"] = value + ac.AddFields(measurement, fields, tags, t...) +} + +func (ac *accumulator) AddFields( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + ac.Lock() + defer ac.Unlock() + + var timestamp time.Time + if len(t) > 0 { + timestamp = t[0] + } else { + timestamp = time.Now() + } + + if ac.config != nil { + if !ac.config.ShouldPass(measurement, tags) { + return + } + } + + for k, v := range ac.defaultTags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + + pt := client.NewPoint(measurement, tags, fields, timestamp) + if ac.debug { + fmt.Println(pt.PrecisionString(ac.batch.Precision())) + } + ac.batch.AddPoint(pt) +} + +func (ac *accumulator) BatchPoints() client.BatchPoints { + ac.Lock() + defer ac.Unlock() + return ac.batch +} + +func (ac *accumulator) SetDefaultTags(tags map[string]string) { + ac.Lock() + defer ac.Unlock() + ac.defaultTags = tags +} + +func (ac *accumulator) AddDefaultTag(key, value string) { + ac.Lock() + defer ac.Unlock() + ac.defaultTags[key] = value +} + +func (ac *accumulator) Prefix() string { + ac.Lock() + defer ac.Unlock() + return ac.prefix +} + +func (ac *accumulator) SetPrefix(prefix string) { + ac.Lock() + defer ac.Unlock() + ac.prefix = prefix +} + +func (ac *accumulator) Debug() bool { + ac.Lock() + defer ac.Unlock() + return ac.debug +} + +func (ac *accumulator) SetDebug(enabled bool) { + ac.Lock() + defer ac.Unlock() + ac.debug = enabled +} + +func (ac *accumulator) Database() string { + ac.Lock() + defer ac.Unlock() + return ac.batch.Database() +} + +func (ac *accumulator) SetDatabase(database string) { + ac.Lock() + defer ac.Unlock() + ac.batch.SetDatabase(database) +} + // BatchPoints is used to send a batch of data in a single write from telegraf // to influx type BatchPoints struct { sync.Mutex - client.BatchPoints + oldclient.BatchPoints Debug bool @@ -39,9 +191,9 @@ func (bp *BatchPoints) deepcopy() *BatchPoints { bpc.Tags[k] = v } - var pts []client.Point + var pts []oldclient.Point for _, pt := range bp.Points { - var ptc client.Point + var ptc oldclient.Point ptc.Measurement = pt.Measurement ptc.Time = pt.Time @@ -70,66 +222,20 @@ func (bp *BatchPoints) Add( measurement string, val interface{}, tags map[string]string, + timestamp ...time.Time, ) { fields := make(map[string]interface{}) fields["value"] = val bp.AddFields(measurement, fields, tags) } -// AddFieldsWithTime adds a measurement with a provided timestamp -func (bp *BatchPoints) AddFieldsWithTime( - measurement string, - fields map[string]interface{}, - tags map[string]string, - timestamp time.Time, -) { - // TODO this function should add the fields with the timestamp, but that will - // need to wait for the InfluxDB point precision/unit to be fixed - bp.AddFields(measurement, fields, tags) - // bp.Lock() - // defer bp.Unlock() - - // measurement = bp.Prefix + measurement - - // if bp.Config != nil { - // if !bp.Config.ShouldPass(measurement, tags) { - // return - // } - // } - - // if bp.Debug { - // var tg []string - - // for k, v := range tags { - // tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v)) - // } - - // var vals []string - - // for k, v := range fields { - // vals = append(vals, fmt.Sprintf("%s=%v", k, v)) - // } - - // sort.Strings(tg) - // sort.Strings(vals) - - // fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) - // } - - // bp.Points = append(bp.Points, client.Point{ - // Measurement: measurement, - // Tags: tags, - // Fields: fields, - // Time: timestamp, - // }) -} - // AddFields will eventually replace the Add function, once we move to having a // single plugin as a single measurement with multiple fields func (bp *BatchPoints) AddFields( measurement string, fields map[string]interface{}, tags map[string]string, + timestamp ...time.Time, ) { bp.Lock() defer bp.Unlock() @@ -171,7 +277,7 @@ func (bp *BatchPoints) AddFields( fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) } - bp.Points = append(bp.Points, client.Point{ + bp.Points = append(bp.Points, oldclient.Point{ Measurement: measurement, Tags: tags, Fields: fields, diff --git a/plugins/kafka_consumer/kafka_consumer.go b/plugins/kafka_consumer/kafka_consumer.go index c492352591604..7c1258944d74b 100644 --- a/plugins/kafka_consumer/kafka_consumer.go +++ b/plugins/kafka_consumer/kafka_consumer.go @@ -93,7 +93,7 @@ func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte } for _, point := range points { - acc.AddFieldsWithTime(point.Name(), point.Fields(), point.Tags(), point.Time()) + acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) } case <-timeout: return nil diff --git a/plugins/mongodb/mongodb_data.go b/plugins/mongodb/mongodb_data.go index bb9b7b2a4e889..fda1843bb8e15 100644 --- a/plugins/mongodb/mongodb_data.go +++ b/plugins/mongodb/mongodb_data.go @@ -89,7 +89,7 @@ func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, s } func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) { - acc.AddFieldsWithTime( + acc.AddFields( key, map[string]interface{}{ "value": val, diff --git a/plugins/registry.go b/plugins/registry.go index 88a24097b3a87..f061a3e7c20ad 100644 --- a/plugins/registry.go +++ b/plugins/registry.go @@ -6,17 +6,11 @@ type Accumulator interface { // Create a point with a value, decorating it with tags // NOTE: tags is expected to be owned by the caller, don't mutate // it after passing to Add. - Add(measurement string, value interface{}, tags map[string]string) - - // Create a point with a set of values, decorating it with tags - // NOTE: tags and values are expected to be owned by the caller, don't mutate - // them after passing to AddFieldsWithTime. - AddFieldsWithTime( - measurement string, - values map[string]interface{}, - tags map[string]string, - timestamp time.Time, - ) + Add(measurement string, value interface{}, + tags map[string]string, t ...time.Time) + + AddFields(measurement string, fields map[string]interface{}, + tags map[string]string, t ...time.Time) } type Plugin interface { diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 01d9393b43299..06fc54f41921a 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -22,7 +22,12 @@ type Accumulator struct { } // Add adds a measurement point to the accumulator -func (a *Accumulator) Add(measurement string, value interface{}, tags map[string]string) { +func (a *Accumulator) Add( + measurement string, + value interface{}, + tags map[string]string, + t ...time.Time, +) { a.Lock() defer a.Unlock() if tags == nil { @@ -38,20 +43,26 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string ) } -// AddFieldsWithTime adds a measurement point with a specified timestamp. -func (a *Accumulator) AddFieldsWithTime( +// AddFields adds a measurement point with a specified timestamp. +func (a *Accumulator) AddFields( measurement string, values map[string]interface{}, tags map[string]string, - timestamp time.Time, + timestamp ...time.Time, ) { + var t time.Time + if len(timestamp) > 0 { + t = timestamp[0] + } else { + t = time.Now() + } a.Points = append( a.Points, &Point{ Measurement: measurement, Values: values, Tags: tags, - Time: timestamp, + Time: t, }, ) }