diff --git a/accumulator.go b/accumulator.go index f7fb7c1e421d0..d04cd7a918383 100644 --- a/accumulator.go +++ b/accumulator.go @@ -7,62 +7,134 @@ import ( "sync" "time" - "github.com/influxdb/influxdb/client" + oldclient "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" ) -// BatchPoints is used to send a batch of data in a single write from telegraf -// to influx -type BatchPoints struct { - sync.Mutex +type Accumulator interface { + Add(measurement string, fields map[string]interface{}, + tags map[string]string, t ...time.Time) - client.BatchPoints + BatchPoints() client.BatchPoints - Debug bool + SetDefaultTags(tags map[string]string) + AddDefaultTag(key, value string) - Prefix string + Prefix() string + SetPrefix(prefix string) - Config *ConfiguredPlugin + Debug() bool + SetDebug(enabled bool) } -// deepcopy returns a deep copy of the BatchPoints object. This is primarily so -// we can do multithreaded output flushing (see Agent.flush) -func (bp *BatchPoints) deepcopy() *BatchPoints { - bp.Lock() - defer bp.Unlock() +func NewAccumulator( + precision string, + database string, + config *ConfiguredPlugin, +) (Accumulator, error) { + var err error + acc := accumulator{} + bpconfig := client.BatchPointsConfig{ + Precision: precision, + Database: database, + } + if acc.batch, err = client.NewBatchPoints(bpconfig); err != nil { + return nil, err + } + acc.config = config + return &acc, nil +} - var bpc BatchPoints - bpc.Time = bp.Time - bpc.Precision = bp.Precision +type accumulator struct { + sync.Mutex - bpc.Tags = make(map[string]string) - for k, v := range bp.Tags { - bpc.Tags[k] = v - } + batch client.BatchPoints + + defaultTags map[string]string - var pts []client.Point - for _, pt := range bp.Points { - var ptc client.Point + debug bool - ptc.Measurement = pt.Measurement - ptc.Time = pt.Time - ptc.Precision = pt.Precision - ptc.Raw = pt.Raw + config *ConfiguredPlugin + + prefix string +} + +func (ac *accumulator) Add( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + ac.Lock() + defer ac.Unlock() - ptc.Tags = make(map[string]string) - ptc.Fields = make(map[string]interface{}) + if len(t) > 0 { + timestamp := t[0] + } else { + timestamp := time.Now() + } - for k, v := range pt.Tags { - ptc.Tags[k] = v + if ac.config != nil { + if !ac.config.ShouldPass(measurement, tags) { + return } + } - for k, v := range pt.Fields { - ptc.Fields[k] = v + for k, v := range ac.defaultTags { + if _, ok := tags[k]; !ok { + tags[k] = v } - pts = append(pts, ptc) } - bpc.Points = pts - return &bpc + pt := client.NewPoint(measurement, tags, fields, timestamp) + if ac.debug { + fmt.Println(pt.PrecisionString(ac.batch.Precision())) + } + ac.batch.AddPoint(pt) +} + +func (ac *accumulator) BatchPoints() client.BatchPoints { + ac.Lock() + defer ac.Unlock() + return ac.batch +} + +func (ac *accumulator) SetDefaultTags(tags map[string]string) { + ac.defaultTags = tags +} + +func (ac *accumulator) AddDefaultTag(key, value string) { + ac.defaultTags[key] = value +} + +func (ac *accumulator) Prefix() string { + return ac.prefix +} + +func (ac *accumulator) SetPrefix(prefix string) { + ac.prefix = prefix +} + +func (ac *accumulator) Debug() bool { + return ac.debug +} + +func (ac *accumulator) SetDebug(enabled bool) { + ac.debug = enabled +} + +// BatchPoints is used to send a batch of data in a single write from telegraf +// to influx +type BatchPoints struct { + sync.Mutex + + oldclient.BatchPoints + + Debug bool + + Prefix string + + Config *ConfiguredPlugin } // Add adds a measurement @@ -116,7 +188,7 @@ func (bp *BatchPoints) AddFieldsWithTime( // fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) // } - // bp.Points = append(bp.Points, client.Point{ + // bp.Points = append(bp.Points, oldclient.Point{ // Measurement: measurement, // Tags: tags, // Fields: fields, @@ -171,7 +243,7 @@ func (bp *BatchPoints) AddFields( fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) } - bp.Points = append(bp.Points, client.Point{ + bp.Points = append(bp.Points, oldclient.Point{ Measurement: measurement, Tags: tags, Fields: fields,