Skip to content

Commit

Permalink
Utilizing new client and overhauling Accumulator interface
Browse files Browse the repository at this point in the history
Fixes #280
Fixes #281
  • Loading branch information
sparrc committed Oct 17, 2015
1 parent 73f1ed4 commit bca16b7
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 71 deletions.
212 changes: 159 additions & 53 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,167 @@ import (
"sync"
"time"

"github.com/influxdb/influxdb/client"
oldclient "github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/client/v2"
)

type Accumulator interface {
Add(measurement string, value interface{},
tags map[string]string, t ...time.Time)
AddFields(measurement string, fields map[string]interface{},
tags map[string]string, t ...time.Time)

BatchPoints() client.BatchPoints

SetDefaultTags(tags map[string]string)
AddDefaultTag(key, value string)

Prefix() string
SetPrefix(prefix string)

Debug() bool
SetDebug(enabled bool)

Database() string
SetDatabase(database string)
}

func NewAccumulator(
precision string,
config *ConfiguredPlugin,
) (Accumulator, error) {
var err error
acc := accumulator{}
bpconfig := client.BatchPointsConfig{
Precision: precision,
}
if acc.batch, err = client.NewBatchPoints(bpconfig); err != nil {
return nil, err
}
acc.config = config
return &acc, nil
}

type accumulator struct {
sync.Mutex

batch client.BatchPoints

defaultTags map[string]string

debug bool

config *ConfiguredPlugin

prefix string
}

func (ac *accumulator) Add(
measurement string,
value interface{},
tags map[string]string,
t ...time.Time,
) {
fields := make(map[string]interface{})
fields["value"] = value
ac.AddFields(measurement, fields, tags, t...)
}

func (ac *accumulator) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
ac.Lock()
defer ac.Unlock()

var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}

if ac.config != nil {
if !ac.config.ShouldPass(measurement, tags) {
return
}
}

for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

pt := client.NewPoint(measurement, tags, fields, timestamp)
if ac.debug {
fmt.Println(pt.PrecisionString(ac.batch.Precision()))
}
ac.batch.AddPoint(pt)
}

func (ac *accumulator) BatchPoints() client.BatchPoints {
ac.Lock()
defer ac.Unlock()
return ac.batch
}

func (ac *accumulator) SetDefaultTags(tags map[string]string) {
ac.Lock()
defer ac.Unlock()
ac.defaultTags = tags
}

func (ac *accumulator) AddDefaultTag(key, value string) {
ac.Lock()
defer ac.Unlock()
ac.defaultTags[key] = value
}

func (ac *accumulator) Prefix() string {
ac.Lock()
defer ac.Unlock()
return ac.prefix
}

func (ac *accumulator) SetPrefix(prefix string) {
ac.Lock()
defer ac.Unlock()
ac.prefix = prefix
}

func (ac *accumulator) Debug() bool {
ac.Lock()
defer ac.Unlock()
return ac.debug
}

func (ac *accumulator) SetDebug(enabled bool) {
ac.Lock()
defer ac.Unlock()
ac.debug = enabled
}

func (ac *accumulator) Database() string {
ac.Lock()
defer ac.Unlock()
return ac.batch.Database()
}

func (ac *accumulator) SetDatabase(database string) {
ac.Lock()
defer ac.Unlock()
ac.batch.SetDatabase(database)
}

// BatchPoints is used to send a batch of data in a single write from telegraf
// to influx
type BatchPoints struct {
sync.Mutex

client.BatchPoints
oldclient.BatchPoints

Debug bool

Expand All @@ -39,9 +191,9 @@ func (bp *BatchPoints) deepcopy() *BatchPoints {
bpc.Tags[k] = v
}

var pts []client.Point
var pts []oldclient.Point
for _, pt := range bp.Points {
var ptc client.Point
var ptc oldclient.Point

ptc.Measurement = pt.Measurement
ptc.Time = pt.Time
Expand Down Expand Up @@ -70,66 +222,20 @@ func (bp *BatchPoints) Add(
measurement string,
val interface{},
tags map[string]string,
timestamp ...time.Time,
) {
fields := make(map[string]interface{})
fields["value"] = val
bp.AddFields(measurement, fields, tags)
}

// AddFieldsWithTime adds a measurement with a provided timestamp
func (bp *BatchPoints) AddFieldsWithTime(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp time.Time,
) {
// TODO this function should add the fields with the timestamp, but that will
// need to wait for the InfluxDB point precision/unit to be fixed
bp.AddFields(measurement, fields, tags)
// bp.Lock()
// defer bp.Unlock()

// measurement = bp.Prefix + measurement

// if bp.Config != nil {
// if !bp.Config.ShouldPass(measurement, tags) {
// return
// }
// }

// if bp.Debug {
// var tg []string

// for k, v := range tags {
// tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
// }

// var vals []string

// for k, v := range fields {
// vals = append(vals, fmt.Sprintf("%s=%v", k, v))
// }

// sort.Strings(tg)
// sort.Strings(vals)

// fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
// }

// bp.Points = append(bp.Points, client.Point{
// Measurement: measurement,
// Tags: tags,
// Fields: fields,
// Time: timestamp,
// })
}

// AddFields will eventually replace the Add function, once we move to having a
// single plugin as a single measurement with multiple fields
func (bp *BatchPoints) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time,
) {
bp.Lock()
defer bp.Unlock()
Expand Down Expand Up @@ -171,7 +277,7 @@ func (bp *BatchPoints) AddFields(
fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
}

bp.Points = append(bp.Points, client.Point{
bp.Points = append(bp.Points, oldclient.Point{
Measurement: measurement,
Tags: tags,
Fields: fields,
Expand Down
2 changes: 1 addition & 1 deletion plugins/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte
}

for _, point := range points {
acc.AddFieldsWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
}
case <-timeout:
return nil
Expand Down
2 changes: 1 addition & 1 deletion plugins/mongodb/mongodb_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, s
}

func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
acc.AddFieldsWithTime(
acc.AddFields(
key,
map[string]interface{}{
"value": val,
Expand Down
16 changes: 5 additions & 11 deletions plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@ type Accumulator interface {
// Create a point with a value, decorating it with tags
// NOTE: tags is expected to be owned by the caller, don't mutate
// it after passing to Add.
Add(measurement string, value interface{}, tags map[string]string)

// Create a point with a set of values, decorating it with tags
// NOTE: tags and values are expected to be owned by the caller, don't mutate
// them after passing to AddFieldsWithTime.
AddFieldsWithTime(
measurement string,
values map[string]interface{},
tags map[string]string,
timestamp time.Time,
)
Add(measurement string, value interface{},
tags map[string]string, t ...time.Time)

AddFields(measurement string, fields map[string]interface{},
tags map[string]string, t ...time.Time)
}

type Plugin interface {
Expand Down
21 changes: 16 additions & 5 deletions testutil/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ type Accumulator struct {
}

// Add adds a measurement point to the accumulator
func (a *Accumulator) Add(measurement string, value interface{}, tags map[string]string) {
func (a *Accumulator) Add(
measurement string,
value interface{},
tags map[string]string,
t ...time.Time,
) {
a.Lock()
defer a.Unlock()
if tags == nil {
Expand All @@ -38,20 +43,26 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string
)
}

// AddFieldsWithTime adds a measurement point with a specified timestamp.
func (a *Accumulator) AddFieldsWithTime(
// AddFields adds a measurement point with a specified timestamp.
func (a *Accumulator) AddFields(
measurement string,
values map[string]interface{},
tags map[string]string,
timestamp time.Time,
timestamp ...time.Time,
) {
var t time.Time
if len(timestamp) > 0 {
t = timestamp[0]
} else {
t = time.Now()
}
a.Points = append(
a.Points,
&Point{
Measurement: measurement,
Values: values,
Tags: tags,
Time: timestamp,
Time: t,
},
)
}
Expand Down

0 comments on commit bca16b7

Please sign in to comment.