Skip to content

Commit

Permalink
add cumulative sum
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Oct 25, 2016
1 parent 74fcca3 commit ffac484
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ See the API docs for more details.
- [#980](https://github.com/influxdata/kapacitor/pull/980): Upgrade to using go 1.7
- [#957](https://github.com/influxdata/kapacitor/issues/957): Add API endpoints for testing service integrations.
- [#958](https://github.com/influxdata/kapacitor/issues/958): Add support for Slack icon emojis and custom usernames.
- [#991](https://github.com/influxdata/kapacitor/pull/991): Bring Kapacitor up to parity with available InfluxQL functions in 1.1

### Bugfixes

Expand Down
48 changes: 48 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,54 @@ batch

testBatcherWithOutput(t, "TestBatch_MovingAverage", script, 21*time.Second, er, false)
}
func TestBatch_CumulativeSum(t *testing.T) {

var script = `
batch
|query('''
SELECT "value"
FROM "telegraf"."default".packets
''')
.period(10s)
.every(10s)
|cumulativeSum('value')
|httpOut('TestBatch_CumulativeSum')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "cumulativeSum"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.0,
},
{
time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC),
10.0,
},
{
time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC),
30.0,
},
{
time.Date(1971, 1, 1, 0, 0, 16, 0, time.UTC),
60.0,
},
{
time.Date(1971, 1, 1, 0, 0, 18, 0, time.UTC),
100.0,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_CumulativeSum", script, 31*time.Second, er, false)
}

func TestBatch_SimpleMR(t *testing.T) {

Expand Down
50 changes: 50 additions & 0 deletions integrations/data/TestBatch_CumulativeSum.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"name":"packets",
"points":[
{
"fields":{"value":1000},
"time":"2015-10-18T00:00:00Z"
},
{
"fields":{"value":1005},
"time":"2015-10-18T00:00:02Z"
},
{
"fields":{"value":1008},
"time":"2015-10-18T00:00:04Z"
},
{
"fields":{"value":1009},
"time":"2015-10-18T00:00:06Z"
},
{
"fields":{"value":1004},
"time":"2015-10-18T00:00:08Z"
}
]
}
{
"name":"packets",
"points":[
{
"fields":{"value":0},
"time":"2015-10-18T00:00:10Z"
},
{
"fields":{"value":10},
"time":"2015-10-18T00:00:12Z"
},
{
"fields":{"value":20},
"time":"2015-10-18T00:00:14Z"
},
{
"fields":{"value":30},
"time":"2015-10-18T00:00:16Z"
},
{
"fields":{"value":40},
"time":"2015-10-18T00:00:18Z"
}
]
}
33 changes: 33 additions & 0 deletions integrations/data/TestStream_CumulativeSum.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
dbname
rpname
packets value=00 0000000000
dbname
rpname
packets value=00 0000000001
dbname
rpname
packets value=01 0000000002
dbname
rpname
packets value=02 0000000003
dbname
rpname
packets value=03 0000000004
dbname
rpname
packets value=04 0000000005
dbname
rpname
packets value=05 0000000006
dbname
rpname
packets value=06 0000000007
dbname
rpname
packets value=07 0000000008
dbname
rpname
packets value=08 0000000009
dbname
rpname
packets value=09 0000000010
66 changes: 66 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,72 @@ stream
testStreamerWithOutput(t, "TestStream_MovingAverage", script, 16*time.Second, er, false, nil)
}

func TestStream_CumulativeSum(t *testing.T) {
var script = `
stream
|from()
.measurement('packets')
|cumulativeSum('value')
|window()
.period(10s)
.every(10s)
|httpOut('TestStream_CumulativeSum')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "cumulativeSum"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
0.0,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
0.0,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
1.0,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
3.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
6.0,
},
{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
10.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
15.0,
},
{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
21.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
28.0,
},
{
time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC),
36.0,
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_CumulativeSum", script, 13*time.Second, er, false, nil)
}

func TestStream_WindowMissing(t *testing.T) {

var script = `
Expand Down
18 changes: 18 additions & 0 deletions pipeline/influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,21 @@ func (n *chainnode) holtWinters(field string, h, m int64, interval time.Duration
n.linkChild(i)
return i
}

// Compute a cumulative sum of each point that is received.
// A point is emitted for every point collected.
func (n *chainnode) CumulativeSum(field string) *InfluxQLNode {
i := newInfluxQLNode("cumulativeSum", field, n.Provides(), n.Provides(), ReduceCreater{
CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) {
fn := influxql.NewFloatCumulativeSumReducer()
return fn, fn
},
CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) {
fn := influxql.NewIntegerCumulativeSumReducer()
return fn, fn
},
IsStreamTransformation: true,
})
n.linkChild(i)
return i
}

0 comments on commit ffac484

Please sign in to comment.