Skip to content

Commit

Permalink
add elapsed function from InfluxQL
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 19, 2016
1 parent 00e6f4d commit 877660e
Show file tree
Hide file tree
Showing 59 changed files with 1,481 additions and 734 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ For example, let's say we want to store all data that triggered an alert in Infl
- [#451](https://github.com/influxdata/kapacitor/issues/451): StreamNode supports `|groupBy` and `|where` methods.
- [#93](https://github.com/influxdata/kapacitor/issues/93): AlertNode now outputs data to child nodes. The output data can have either a tag or field indicating the alert level.
- [#281](https://github.com/influxdata/kapacitor/issues/281): AlertNode now has an `.all()` property that specifies that all points in a batch must match the criteria in order to trigger an alert.
- [#384](https://github.com/influxdata/kapacitor/issues/384): Add `elapsed` function to compute the time difference between subsequent points.


### Bugfixes
Expand Down
5 changes: 4 additions & 1 deletion cmd/kapacitord/run/server_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func (s *Server) Write(db, rp, body string, params url.Values) (results string,
resp, err := http.Post(s.URL()+"/write?"+params.Encode(), "", strings.NewReader(body))
if err != nil {
return "", err
} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body))
}
return string(MustReadAll(resp.Body)), nil
Expand All @@ -118,6 +120,7 @@ func (s *Server) HTTPGetRetry(url, exp string, retries int, sleep time.Duration)
if err != nil {
return err
}
defer resp.Body.Close()
r = string(MustReadAll(resp.Body))
if r == exp {
break
Expand Down
20 changes: 10 additions & 10 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestServer_StreamTask(t *testing.T) {
endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)

// Request data before any writes and expect null responses
nullResponse := `{"Series":null,"Err":null}`
nullResponse := `{"Series":null,"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -380,7 +380,7 @@ test value=1 0000000011
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

exp := `{"Series":[{"name":"test","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}],"Err":null}`
exp := `{"Series":[{"name":"test","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestServer_StreamTask_AllMeasurements(t *testing.T) {
endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)

// Request data before any writes and expect null responses
nullResponse := `{"Series":null,"Err":null}`
nullResponse := `{"Series":null,"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -447,7 +447,7 @@ test0 value=1 0000000011
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

exp := `{"Series":[{"name":"test0","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}],"Err":null}`
exp := `{"Series":[{"name":"test0","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -513,7 +513,7 @@ func TestServer_BatchTask(t *testing.T) {

endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)

exp := `{"Series":[{"name":"cpu","columns":["time","count"],"values":[["1971-01-01T00:00:01.002Z",2]]}],"Err":null}`
exp := `{"Series":[{"name":"cpu","columns":["time","count"],"values":[["1971-01-01T00:00:01.002Z",2]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -955,7 +955,7 @@ func testStreamAgent(t *testing.T, c *run.Config) {
endpoint := fmt.Sprintf("%s/task/%s/moving_avg", s.URL(), name)

// Request data before any writes and expect null responses
nullResponse := `{"Series":null,"Err":null}`
nullResponse := `{"Series":null,"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -990,7 +990,7 @@ test,group=b value=0 0000000011
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

exp := `{"Series":[{"name":"test","tags":{"group":"a"},"columns":["time","mean"],"values":[["1970-01-01T00:00:11Z",0.9]]},{"name":"test","tags":{"group":"b"},"columns":["time","mean"],"values":[["1970-01-01T00:00:11Z",1.9]]}],"Err":null}`
exp := `{"Series":[{"name":"test","tags":{"group":"a"},"columns":["time","mean"],"values":[["1970-01-01T00:00:11Z",0.9]]},{"name":"test","tags":{"group":"b"},"columns":["time","mean"],"values":[["1970-01-01T00:00:11Z",1.9]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func testStreamAgentSocket(t *testing.T, c *run.Config) {
endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)

// Request data before any writes and expect null responses
nullResponse := `{"Series":null,"Err":null}`
nullResponse := `{"Series":null,"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand All @@ -1149,7 +1149,7 @@ test,group=a value=0 0000000011
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

exp := `{"Series":[{"name":"test","tags":{"group":"a"},"columns":["time","count"],"values":[["1970-01-01T00:00:10Z",10]]}],"Err":null}`
exp := `{"Series":[{"name":"test","tags":{"group":"a"},"columns":["time","count"],"values":[["1970-01-01T00:00:10Z",10]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -1328,7 +1328,7 @@ func testBatchAgent(t *testing.T, c *run.Config) {
}

endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)
exp := `{"Series":[{"name":"cpu","tags":{"count":"1"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]},{"name":"cpu","tags":{"count":"0"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]}],"Err":null}`
exp := `{"Series":[{"name":"cpu","tags":{"count":"1"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]},{"name":"cpu","tags":{"count":"0"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*50)
if err != nil {
t.Error(err)
Expand Down
4 changes: 2 additions & 2 deletions influxql.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type floatPointEmitter struct {
func (e *floatPointEmitter) EmitPoint() (models.Point, error) {
slice := e.emitter.Emit()
if len(slice) != 1 {
return models.Point{}, fmt.Errorf("unexpected result from InfluxQL function, got %d points expected 1", len(slice))
return models.Point{}, ErrEmptyEmit
}
ap := slice[0]
var t time.Time
Expand Down Expand Up @@ -301,7 +301,7 @@ type integerPointEmitter struct {
func (e *integerPointEmitter) EmitPoint() (models.Point, error) {
slice := e.emitter.Emit()
if len(slice) != 1 {
return models.Point{}, fmt.Errorf("unexpected result from InfluxQL function, got %d points expected 1", len(slice))
return models.Point{}, ErrEmptyEmit
}
ap := slice[0]
var t time.Time
Expand Down
2 changes: 1 addition & 1 deletion influxql.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type {{.name}}PointEmitter struct {
func (e *{{.name}}PointEmitter) EmitPoint() (models.Point, error) {
slice := e.emitter.Emit()
if len(slice) != 1 {
return models.Point{}, fmt.Errorf("unexpected result from InfluxQL function, got %d points expected 1", len(slice))
return models.Point{}, ErrEmptyEmit
}
ap := slice[0]
var t time.Time
Expand Down
43 changes: 30 additions & 13 deletions influxql.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kapacitor

import (
"errors"
"fmt"
"log"
"time"
Expand All @@ -14,16 +15,20 @@ import (

type createReduceContextFunc func(c baseReduceContext) reduceContext

var ErrEmptyEmit = errors.New("error call to emit produced no results")

type InfluxQLNode struct {
node
n *pipeline.InfluxQLNode
createFn createReduceContextFunc
n *pipeline.InfluxQLNode
createFn createReduceContextFunc
isStreamTransformation bool
}

func newInfluxQLNode(et *ExecutingTask, n *pipeline.InfluxQLNode, l *log.Logger) (*InfluxQLNode, error) {
m := &InfluxQLNode{
node: node{Node: n, et: et, logger: l},
n: n,
isStreamTransformation: n.ReduceCreater.IsStreamTransformation,
}
m.node.runF = m.runInfluxQLs
return m, nil
Expand Down Expand Up @@ -79,7 +84,7 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {
dimensions: p.Dimensions,
tags: p.PointTags(),
time: p.Time,
pointTimes: n.n.PointTimes,
pointTimes: n.n.PointTimes || n.isStreamTransformation,
}

createFn, err := n.getCreateFn(p.Fields[c.field])
Expand All @@ -89,22 +94,32 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {

context = createFn(c)
contexts[p.Group] = context
context.AggregatePoint(&p)

} else if p.Time.Equal(context.Time()) {
}
if n.isStreamTransformation {
context.AggregatePoint(&p)
// advance to next point
p, ok = n.ins[0].NextPoint()
} else {

err := n.emit(context)
if err != nil {
if err != nil && err != ErrEmptyEmit {
return err
}

// Nil out reduced point
contexts[p.Group] = nil
// do not advance,
// go through loop again to initialize new iterator.
} else {
if p.Time.Equal(context.Time()) {
context.AggregatePoint(&p)
// advance to next point
p, ok = n.ins[0].NextPoint()
} else {
err := n.emit(context)
if err != nil {
return err
}

// Nil out reduced point
contexts[p.Group] = nil
// do not advance,
// go through loop again to initialize new iterator.
}
}
}
return nil
Expand Down Expand Up @@ -159,6 +174,7 @@ func (n *InfluxQLNode) emit(context reduceContext) error {
switch n.Provides() {
case pipeline.StreamEdge:
p, err := context.EmitPoint()
n.logger.Println("D! point", p)
if err != nil {
return err
}
Expand All @@ -170,6 +186,7 @@ func (n *InfluxQLNode) emit(context reduceContext) error {
}
case pipeline.BatchEdge:
b := context.EmitBatch()
n.logger.Println("D! batch", b)
for _, out := range n.outs {
err := out.CollectBatch(b)
if err != nil {
Expand Down
33 changes: 33 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,39 @@ batch
testBatcherWithOutput(t, "TestBatch_DerivativeNN", script, 21*time.Second, er)
}

func TestBatch_Elapsed(t *testing.T) {

var script = `
batch
|query('''
SELECT "value"
FROM "telegraf"."default".packets
''')
.period(10s)
.every(10s)
|elapsed('value', 1ms)
|httpOut('TestBatch_Elapsed')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "elapsed"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
2000.0,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_Elapsed", script, 21*time.Second, er)
}

func TestBatch_SimpleMR(t *testing.T) {

var script = `
Expand Down
1 change: 1 addition & 0 deletions integrations/data/TestBatch_Elapsed.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"name":"packets","points":[{"fields":{"value":1000},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":1001},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":1002},"time":"2015-10-18T00:00:04Z"},{"fields":{"value":1003},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":1004},"time":"2015-10-18T00:00:08Z"}]}
27 changes: 27 additions & 0 deletions integrations/data/TestStream_Elapsed.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
dbname
rpname
packets value=1000 0000000001
dbname
rpname
packets value=1001 0000000002
dbname
rpname
packets value=1002 0000000003
dbname
rpname
packets value=1003 0000000004
dbname
rpname
packets value=1004 0000000005
dbname
rpname
packets value=1006 0000000006
dbname
rpname
packets value=1009 0000000010
dbname
rpname
packets value=1010 0000000011
dbname
rpname
packets value=1011 0000000012
30 changes: 30 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,36 @@ stream
testStreamerWithOutput(t, "TestStream_DerivativeNN", script, 15*time.Second, er, nil, false)
}

func TestStream_Elapsed(t *testing.T) {

var script = `
stream
|from()
.measurement('packets')
|elapsed('value', 1s)
|window()
.period(10s)
.every(10s)
|max('elapsed')
|httpOut('TestStream_Elapsed')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "max"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC),
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Elapsed", script, 15*time.Second, er, nil, false)
}

func TestStream_WindowMissing(t *testing.T) {

var script = `
Expand Down
5 changes: 3 additions & 2 deletions pipeline/influxql.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ type ReduceCreater struct {
CreateIntegerReducer func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter)
CreateIntegerBulkReducer func() (IntegerBulkPointAggregator, influxql.IntegerPointEmitter)

TopBottomCallInfo *TopBottomCallInfo
IsSimpleSelector bool
TopBottomCallInfo *TopBottomCallInfo
IsSimpleSelector bool
IsStreamTransformation bool
}

type FloatBulkPointAggregator interface {
Expand Down
1 change: 1 addition & 0 deletions pipeline/influxql.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ReduceCreater struct {

TopBottomCallInfo *TopBottomCallInfo
IsSimpleSelector bool
IsStreamTransformation bool
}

{{range .}}
Expand Down
Loading

0 comments on commit 877660e

Please sign in to comment.