From ffac48456c35b6efc98c738fef5271abc4edd98d Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 25 Oct 2016 13:21:45 -0600 Subject: [PATCH] add cumulative sum --- CHANGELOG.md | 1 + integrations/batcher_test.go | 48 ++++++++++++++ .../data/TestBatch_CumulativeSum.0.brpl | 50 ++++++++++++++ .../data/TestStream_CumulativeSum.srpl | 33 ++++++++++ integrations/streamer_test.go | 66 +++++++++++++++++++ pipeline/influxql.go | 18 +++++ 6 files changed, 216 insertions(+) create mode 100644 integrations/data/TestBatch_CumulativeSum.0.brpl create mode 100644 integrations/data/TestStream_CumulativeSum.srpl diff --git a/CHANGELOG.md b/CHANGELOG.md index 31b62ea24b..7466bae5f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ See the API docs for more details. - [#980](https://github.com/influxdata/kapacitor/pull/980): Upgrade to using go 1.7 - [#957](https://github.com/influxdata/kapacitor/issues/957): Add API endpoints for testing service integrations. - [#958](https://github.com/influxdata/kapacitor/issues/958): Add support for Slack icon emojis and custom usernames. +- [#991](https://github.com/influxdata/kapacitor/pull/991): Bring Kapacitor up to parity with available InfluxQL functions in 1.1 ### Bugfixes diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index e972b8d780..97cddc3052 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -374,6 +374,54 @@ batch testBatcherWithOutput(t, "TestBatch_MovingAverage", script, 21*time.Second, er, false) } +func TestBatch_CumulativeSum(t *testing.T) { + + var script = ` +batch + |query(''' + SELECT "value" + FROM "telegraf"."default".packets +''') + .period(10s) + .every(10s) + |cumulativeSum('value') + |httpOut('TestBatch_CumulativeSum') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "packets", + Tags: nil, + Columns: []string{"time", "cumulativeSum"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC), + 10.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC), + 30.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 16, 0, time.UTC), + 60.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 18, 0, time.UTC), + 100.0, + }, + }, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_CumulativeSum", script, 31*time.Second, er, false) +} func TestBatch_SimpleMR(t *testing.T) { diff --git a/integrations/data/TestBatch_CumulativeSum.0.brpl b/integrations/data/TestBatch_CumulativeSum.0.brpl new file mode 100644 index 0000000000..dd83801e2f --- /dev/null +++ b/integrations/data/TestBatch_CumulativeSum.0.brpl @@ -0,0 +1,50 @@ +{ + "name":"packets", + "points":[ + { + "fields":{"value":1000}, + "time":"2015-10-18T00:00:00Z" + }, + { + "fields":{"value":1005}, + "time":"2015-10-18T00:00:02Z" + }, + { + "fields":{"value":1008}, + "time":"2015-10-18T00:00:04Z" + }, + { + "fields":{"value":1009}, + "time":"2015-10-18T00:00:06Z" + }, + { + "fields":{"value":1004}, + "time":"2015-10-18T00:00:08Z" + } + ] +} +{ + "name":"packets", + "points":[ + { + "fields":{"value":0}, + "time":"2015-10-18T00:00:10Z" + }, + { + "fields":{"value":10}, + "time":"2015-10-18T00:00:12Z" + }, + { + "fields":{"value":20}, + "time":"2015-10-18T00:00:14Z" + }, + { + "fields":{"value":30}, + "time":"2015-10-18T00:00:16Z" + }, + { + "fields":{"value":40}, + "time":"2015-10-18T00:00:18Z" + } + ] +} diff --git a/integrations/data/TestStream_CumulativeSum.srpl b/integrations/data/TestStream_CumulativeSum.srpl new file mode 100644 index 0000000000..3284e88c6e --- /dev/null +++ b/integrations/data/TestStream_CumulativeSum.srpl @@ -0,0 +1,33 @@ +dbname +rpname +packets value=00 0000000000 +dbname +rpname +packets value=00 0000000001 +dbname +rpname +packets value=01 0000000002 +dbname +rpname +packets value=02 0000000003 +dbname +rpname +packets value=03 0000000004 +dbname +rpname +packets value=04 0000000005 +dbname +rpname +packets value=05 0000000006 +dbname +rpname +packets value=06 0000000007 +dbname +rpname +packets value=07 0000000008 +dbname +rpname +packets value=08 0000000009 +dbname +rpname +packets value=09 0000000010 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index e7ea57d0f2..cd56c6b4c4 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -457,6 +457,72 @@ stream testStreamerWithOutput(t, "TestStream_MovingAverage", script, 16*time.Second, er, false, nil) } +func TestStream_CumulativeSum(t *testing.T) { + var script = ` +stream + |from() + .measurement('packets') + |cumulativeSum('value') + |window() + .period(10s) + .every(10s) + |httpOut('TestStream_CumulativeSum') +` + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "packets", + Tags: nil, + Columns: []string{"time", "cumulativeSum"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 0.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + 0.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 1.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + 3.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 6.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 10.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 15.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC), + 21.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 28.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC), + 36.0, + }, + }, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_CumulativeSum", script, 13*time.Second, er, false, nil) +} + func TestStream_WindowMissing(t *testing.T) { var script = ` diff --git a/pipeline/influxql.go b/pipeline/influxql.go index 6770c29070..962cbdab82 100644 --- a/pipeline/influxql.go +++ b/pipeline/influxql.go @@ -480,3 +480,21 @@ func (n *chainnode) holtWinters(field string, h, m int64, interval time.Duration n.linkChild(i) return i } + +// Compute a cumulative sum of each point that is received. +// A point is emitted for every point collected. +func (n *chainnode) CumulativeSum(field string) *InfluxQLNode { + i := newInfluxQLNode("cumulativeSum", field, n.Provides(), n.Provides(), ReduceCreater{ + CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatCumulativeSumReducer() + return fn, fn + }, + CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerCumulativeSumReducer() + return fn, fn + }, + IsStreamTransformation: true, + }) + n.linkChild(i) + return i +}