Skip to content

Commit

Permalink
Update the InfluxDB dep to 1.1 (#991)
Browse files Browse the repository at this point in the history
* update influxdb dep

* git subrepo clone --force --branch=master https://github.com/influxdata/influxdb.git vendor/github.com/influxdata/influxdb

subrepo:
  subdir:   "vendor/github.com/influxdata/influxdb"
  merged:   "af72d9b"
upstream:
  origin:   "https://github.com/influxdata/influxdb.git"
  branch:   "master"
  commit:   "af72d9b"
git-subrepo:
  version:  "0.3.0"
  origin:   "???"
  commit:   "???"

* update for new InfluxDB interfaces

* add cumulative sum
  • Loading branch information
Nathaniel Cook authored Oct 26, 2016
1 parent a9f6b46 commit 465d756
Show file tree
Hide file tree
Showing 187 changed files with 13,363 additions and 4,712 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ See the API docs for more details.
- [#980](https://github.com/influxdata/kapacitor/pull/980): Upgrade to using go 1.7
- [#957](https://github.com/influxdata/kapacitor/issues/957): Add API endpoints for testing service integrations.
- [#958](https://github.com/influxdata/kapacitor/issues/958): Add support for Slack icon emojis and custom usernames.
- [#991](https://github.com/influxdata/kapacitor/pull/991): Bring Kapacitor up to parity with available InfluxQL functions in 1.1

### Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion influxdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ type Point struct {

// Returns byte array of a line protocol representation of the point
func (p Point) Bytes(precision string) []byte {
key := imodels.MakeKey([]byte(p.Name), imodels.Tags(p.Tags))
key := imodels.MakeKey([]byte(p.Name), imodels.NewTags(p.Tags))
fields := imodels.Fields(p.Fields).MarshalBinary()
kl := len(key)
fl := len(fields)
Expand Down
48 changes: 48 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,54 @@ batch

testBatcherWithOutput(t, "TestBatch_MovingAverage", script, 21*time.Second, er, false)
}
func TestBatch_CumulativeSum(t *testing.T) {

var script = `
batch
|query('''
SELECT "value"
FROM "telegraf"."default".packets
''')
.period(10s)
.every(10s)
|cumulativeSum('value')
|httpOut('TestBatch_CumulativeSum')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "cumulativeSum"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.0,
},
{
time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC),
10.0,
},
{
time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC),
30.0,
},
{
time.Date(1971, 1, 1, 0, 0, 16, 0, time.UTC),
60.0,
},
{
time.Date(1971, 1, 1, 0, 0, 18, 0, time.UTC),
100.0,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_CumulativeSum", script, 31*time.Second, er, false)
}

func TestBatch_SimpleMR(t *testing.T) {

Expand Down
50 changes: 50 additions & 0 deletions integrations/data/TestBatch_CumulativeSum.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"name":"packets",
"points":[
{
"fields":{"value":1000},
"time":"2015-10-18T00:00:00Z"
},
{
"fields":{"value":1005},
"time":"2015-10-18T00:00:02Z"
},
{
"fields":{"value":1008},
"time":"2015-10-18T00:00:04Z"
},
{
"fields":{"value":1009},
"time":"2015-10-18T00:00:06Z"
},
{
"fields":{"value":1004},
"time":"2015-10-18T00:00:08Z"
}
]
}
{
"name":"packets",
"points":[
{
"fields":{"value":0},
"time":"2015-10-18T00:00:10Z"
},
{
"fields":{"value":10},
"time":"2015-10-18T00:00:12Z"
},
{
"fields":{"value":20},
"time":"2015-10-18T00:00:14Z"
},
{
"fields":{"value":30},
"time":"2015-10-18T00:00:16Z"
},
{
"fields":{"value":40},
"time":"2015-10-18T00:00:18Z"
}
]
}
33 changes: 33 additions & 0 deletions integrations/data/TestStream_CumulativeSum.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
dbname
rpname
packets value=00 0000000000
dbname
rpname
packets value=00 0000000001
dbname
rpname
packets value=01 0000000002
dbname
rpname
packets value=02 0000000003
dbname
rpname
packets value=03 0000000004
dbname
rpname
packets value=04 0000000005
dbname
rpname
packets value=05 0000000006
dbname
rpname
packets value=06 0000000007
dbname
rpname
packets value=07 0000000008
dbname
rpname
packets value=08 0000000009
dbname
rpname
packets value=09 0000000010
68 changes: 67 additions & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,72 @@ stream
testStreamerWithOutput(t, "TestStream_MovingAverage", script, 16*time.Second, er, false, nil)
}

func TestStream_CumulativeSum(t *testing.T) {
var script = `
stream
|from()
.measurement('packets')
|cumulativeSum('value')
|window()
.period(10s)
.every(10s)
|httpOut('TestStream_CumulativeSum')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "cumulativeSum"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
0.0,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
0.0,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
1.0,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
3.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
6.0,
},
{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
10.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
15.0,
},
{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
21.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
28.0,
},
{
time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC),
36.0,
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_CumulativeSum", script, 13*time.Second, er, false, nil)
}

func TestStream_WindowMissing(t *testing.T) {

var script = `
Expand Down Expand Up @@ -7087,7 +7153,7 @@ stream
if len(p.Tags()) != 1 {
t.Errorf("got %v exp %v", len(p.Tags()), 1)
}
if got, exp := p.Tags()["key"], "value"; got != exp {
if got, exp := p.Tags().GetString("key"), "value"; got != exp {
t.Errorf("got %s exp %s", got, exp)
}
tm := time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC)
Expand Down
4 changes: 2 additions & 2 deletions metaclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ func (m *NoopMetaClient) WaitForLeader(d time.Duration) error {
func (m *NoopMetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error) {
return nil, nil
}
func (m *NoopMetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) {
func (m *NoopMetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) {
return nil, nil
}
func (m *NoopMetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
func (m *NoopMetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
return nil, nil
}
func (m *NoopMetaClient) Database(name string) *meta.DatabaseInfo {
Expand Down
2 changes: 1 addition & 1 deletion models/point.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func ToGroupID(name string, tags map[string]string, dims Dimensions) GroupID {

// Returns byte array of a line protocol representation of the point
func (p Point) Bytes(precision string) []byte {
key := models.MakeKey([]byte(p.Name), models.Tags(p.Tags))
key := models.MakeKey([]byte(p.Name), models.NewTags(p.Tags))
fields := models.Fields(p.Fields).MarshalBinary()
kl := len(key)
fl := len(fields)
Expand Down
18 changes: 18 additions & 0 deletions pipeline/influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,21 @@ func (n *chainnode) holtWinters(field string, h, m int64, interval time.Duration
n.linkChild(i)
return i
}

// Compute a cumulative sum of each point that is received.
// A point is emitted for every point collected.
func (n *chainnode) CumulativeSum(field string) *InfluxQLNode {
i := newInfluxQLNode("cumulativeSum", field, n.Provides(), n.Provides(), ReduceCreater{
CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) {
fn := influxql.NewFloatCumulativeSumReducer()
return fn, fn
},
CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) {
fn := influxql.NewIntegerCumulativeSumReducer()
return fn, fn
},
IsStreamTransformation: true,
})
n.linkChild(i)
return i
}
2 changes: 1 addition & 1 deletion replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func readPointsFromIO(data io.ReadCloser, points chan<- models.Point, precision
RetentionPolicy: rp,
Name: mp.Name(),
Group: models.NilGroup,
Tags: models.Tags(mp.Tags()),
Tags: models.Tags(mp.Tags().Map()),
Fields: models.Fields(mp.Fields()),
Time: mp.Time().UTC(),
}
Expand Down
2 changes: 1 addition & 1 deletion task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyL
RetentionPolicy: retentionPolicy,
Name: mp.Name(),
Group: models.NilGroup,
Tags: models.Tags(mp.Tags()),
Tags: models.Tags(mp.Tags().Map()),
Fields: models.Fields(mp.Fields()),
Time: mp.Time(),
}
Expand Down
2 changes: 1 addition & 1 deletion vendor.list
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ github.com/dustin/go-humanize
github.com/gogo/protobuf
github.com/golang/protobuf
github.com/gorhill/cronexpr
github.com/influxdata/influxdb 1.0
github.com/influxdata/influxdb master
github.com/influxdata/usage-client
github.com/kimor79/gollectd
github.com/mattn/go-runewidth
Expand Down
21 changes: 5 additions & 16 deletions vendor/github.com/influxdata/influxdb/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions vendor/github.com/influxdata/influxdb/.gitrepo

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 465d756

Please sign in to comment.