Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add elapsed function #477

Merged
merged 1 commit into from
Apr 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ For example, let's say we want to store all data that triggered an alert in Infl
- [#451](https://github.com/influxdata/kapacitor/issues/451): StreamNode supports `|groupBy` and `|where` methods.
- [#93](https://github.com/influxdata/kapacitor/issues/93): AlertNode now outputs data to child nodes. The output data can have either a tag or field indicating the alert level.
- [#281](https://github.com/influxdata/kapacitor/issues/281): AlertNode now has an `.all()` property that specifies that all points in a batch must match the criteria in order to trigger an alert.
- [#384](https://github.com/influxdata/kapacitor/issues/384): Add `elapsed` function to compute the time difference between subsequent points.


### Bugfixes
Expand Down
5 changes: 4 additions & 1 deletion cmd/kapacitord/run/server_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func (s *Server) Write(db, rp, body string, params url.Values) (results string,
resp, err := http.Post(s.URL()+"/write?"+params.Encode(), "", strings.NewReader(body))
if err != nil {
return "", err
} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body))
}
return string(MustReadAll(resp.Body)), nil
Expand All @@ -118,6 +120,7 @@ func (s *Server) HTTPGetRetry(url, exp string, retries int, sleep time.Duration)
if err != nil {
return err
}
defer resp.Body.Close()
r = string(MustReadAll(resp.Body))
if r == exp {
break
Expand Down
20 changes: 10 additions & 10 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestServer_StreamTask(t *testing.T) {
endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)

// Request data before any writes and expect null responses
nullResponse := `{"Series":null,"Err":null}`
nullResponse := `{"Series":null,"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -380,7 +380,7 @@ test value=1 0000000011
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

exp := `{"Series":[{"name":"test","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}],"Err":null}`
exp := `{"Series":[{"name":"test","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestServer_StreamTask_AllMeasurements(t *testing.T) {
endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)

// Request data before any writes and expect null responses
nullResponse := `{"Series":null,"Err":null}`
nullResponse := `{"Series":null,"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -447,7 +447,7 @@ test0 value=1 0000000011
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

exp := `{"Series":[{"name":"test0","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}],"Err":null}`
exp := `{"Series":[{"name":"test0","columns":["time","count"],"values":[["1970-01-01T00:00:10Z",15]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -513,7 +513,7 @@ func TestServer_BatchTask(t *testing.T) {

endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)

exp := `{"Series":[{"name":"cpu","columns":["time","count"],"values":[["1971-01-01T00:00:01.002Z",2]]}],"Err":null}`
exp := `{"Series":[{"name":"cpu","columns":["time","count"],"values":[["1971-01-01T00:00:01.002Z",2]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -955,7 +955,7 @@ func testStreamAgent(t *testing.T, c *run.Config) {
endpoint := fmt.Sprintf("%s/task/%s/moving_avg", s.URL(), name)

// Request data before any writes and expect null responses
nullResponse := `{"Series":null,"Err":null}`
nullResponse := `{"Series":null,"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -990,7 +990,7 @@ test,group=b value=0 0000000011
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

exp := `{"Series":[{"name":"test","tags":{"group":"a"},"columns":["time","mean"],"values":[["1970-01-01T00:00:11Z",0.9]]},{"name":"test","tags":{"group":"b"},"columns":["time","mean"],"values":[["1970-01-01T00:00:11Z",1.9]]}],"Err":null}`
exp := `{"Series":[{"name":"test","tags":{"group":"a"},"columns":["time","mean"],"values":[["1970-01-01T00:00:11Z",0.9]]},{"name":"test","tags":{"group":"b"},"columns":["time","mean"],"values":[["1970-01-01T00:00:11Z",1.9]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func testStreamAgentSocket(t *testing.T, c *run.Config) {
endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)

// Request data before any writes and expect null responses
nullResponse := `{"Series":null,"Err":null}`
nullResponse := `{"Series":null,"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, nullResponse, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand All @@ -1149,7 +1149,7 @@ test,group=a value=0 0000000011
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", points, v)

exp := `{"Series":[{"name":"test","tags":{"group":"a"},"columns":["time","count"],"values":[["1970-01-01T00:00:10Z",10]]}],"Err":null}`
exp := `{"Series":[{"name":"test","tags":{"group":"a"},"columns":["time","count"],"values":[["1970-01-01T00:00:10Z",10]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -1328,7 +1328,7 @@ func testBatchAgent(t *testing.T, c *run.Config) {
}

endpoint := fmt.Sprintf("%s/task/%s/count", s.URL(), name)
exp := `{"Series":[{"name":"cpu","tags":{"count":"1"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]},{"name":"cpu","tags":{"count":"0"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]}],"Err":null}`
exp := `{"Series":[{"name":"cpu","tags":{"count":"1"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]},{"name":"cpu","tags":{"count":"0"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]}],"Messages":null,"Err":null}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*50)
if err != nil {
t.Error(err)
Expand Down
4 changes: 2 additions & 2 deletions influxql.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type floatPointEmitter struct {
func (e *floatPointEmitter) EmitPoint() (models.Point, error) {
slice := e.emitter.Emit()
if len(slice) != 1 {
return models.Point{}, fmt.Errorf("unexpected result from InfluxQL function, got %d points expected 1", len(slice))
return models.Point{}, ErrEmptyEmit
}
ap := slice[0]
var t time.Time
Expand Down Expand Up @@ -301,7 +301,7 @@ type integerPointEmitter struct {
func (e *integerPointEmitter) EmitPoint() (models.Point, error) {
slice := e.emitter.Emit()
if len(slice) != 1 {
return models.Point{}, fmt.Errorf("unexpected result from InfluxQL function, got %d points expected 1", len(slice))
return models.Point{}, ErrEmptyEmit
}
ap := slice[0]
var t time.Time
Expand Down
2 changes: 1 addition & 1 deletion influxql.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type {{.name}}PointEmitter struct {
func (e *{{.name}}PointEmitter) EmitPoint() (models.Point, error) {
slice := e.emitter.Emit()
if len(slice) != 1 {
return models.Point{}, fmt.Errorf("unexpected result from InfluxQL function, got %d points expected 1", len(slice))
return models.Point{}, ErrEmptyEmit
}
ap := slice[0]
var t time.Time
Expand Down
41 changes: 28 additions & 13 deletions influxql.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kapacitor

import (
"errors"
"fmt"
"log"
"time"
Expand All @@ -14,16 +15,20 @@ import (

type createReduceContextFunc func(c baseReduceContext) reduceContext

var ErrEmptyEmit = errors.New("error call to emit produced no results")

type InfluxQLNode struct {
node
n *pipeline.InfluxQLNode
createFn createReduceContextFunc
n *pipeline.InfluxQLNode
createFn createReduceContextFunc
isStreamTransformation bool
}

func newInfluxQLNode(et *ExecutingTask, n *pipeline.InfluxQLNode, l *log.Logger) (*InfluxQLNode, error) {
m := &InfluxQLNode{
node: node{Node: n, et: et, logger: l},
n: n,
isStreamTransformation: n.ReduceCreater.IsStreamTransformation,
}
m.node.runF = m.runInfluxQLs
return m, nil
Expand Down Expand Up @@ -79,7 +84,7 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {
dimensions: p.Dimensions,
tags: p.PointTags(),
time: p.Time,
pointTimes: n.n.PointTimes,
pointTimes: n.n.PointTimes || n.isStreamTransformation,
}

createFn, err := n.getCreateFn(p.Fields[c.field])
Expand All @@ -89,22 +94,32 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {

context = createFn(c)
contexts[p.Group] = context
context.AggregatePoint(&p)

} else if p.Time.Equal(context.Time()) {
}
if n.isStreamTransformation {
context.AggregatePoint(&p)
// advance to next point
p, ok = n.ins[0].NextPoint()
} else {

err := n.emit(context)
if err != nil {
if err != nil && err != ErrEmptyEmit {
return err
}

// Nil out reduced point
contexts[p.Group] = nil
// do not advance,
// go through loop again to initialize new iterator.
} else {
if p.Time.Equal(context.Time()) {
context.AggregatePoint(&p)
// advance to next point
p, ok = n.ins[0].NextPoint()
} else {
err := n.emit(context)
if err != nil {
return err
}

// Nil out reduced point
contexts[p.Group] = nil
// do not advance,
// go through loop again to initialize new iterator.
}
}
}
return nil
Expand Down
33 changes: 33 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,39 @@ batch
testBatcherWithOutput(t, "TestBatch_DerivativeNN", script, 21*time.Second, er)
}

func TestBatch_Elapsed(t *testing.T) {

var script = `
batch
|query('''
SELECT "value"
FROM "telegraf"."default".packets
''')
.period(10s)
.every(10s)
|elapsed('value', 1ms)
|httpOut('TestBatch_Elapsed')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "elapsed"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
2000.0,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_Elapsed", script, 21*time.Second, er)
}

func TestBatch_SimpleMR(t *testing.T) {

var script = `
Expand Down
1 change: 1 addition & 0 deletions integrations/data/TestBatch_Elapsed.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"name":"packets","points":[{"fields":{"value":1000},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":1001},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":1002},"time":"2015-10-18T00:00:04Z"},{"fields":{"value":1003},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":1004},"time":"2015-10-18T00:00:08Z"}]}
27 changes: 27 additions & 0 deletions integrations/data/TestStream_Elapsed.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
dbname
rpname
packets value=1000 0000000001
dbname
rpname
packets value=1001 0000000002
dbname
rpname
packets value=1002 0000000003
dbname
rpname
packets value=1003 0000000004
dbname
rpname
packets value=1004 0000000005
dbname
rpname
packets value=1006 0000000006
dbname
rpname
packets value=1009 0000000010
dbname
rpname
packets value=1010 0000000011
dbname
rpname
packets value=1011 0000000012
30 changes: 30 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,36 @@ stream
testStreamerWithOutput(t, "TestStream_DerivativeNN", script, 15*time.Second, er, nil, false)
}

func TestStream_Elapsed(t *testing.T) {

var script = `
stream
|from()
.measurement('packets')
|elapsed('value', 1s)
|window()
.period(10s)
.every(10s)
|max('elapsed')
|httpOut('TestStream_Elapsed')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "max"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC),
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Elapsed", script, 15*time.Second, er, nil, false)
}

func TestStream_WindowMissing(t *testing.T) {

var script = `
Expand Down
5 changes: 3 additions & 2 deletions pipeline/influxql.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ type ReduceCreater struct {
CreateIntegerReducer func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter)
CreateIntegerBulkReducer func() (IntegerBulkPointAggregator, influxql.IntegerPointEmitter)

TopBottomCallInfo *TopBottomCallInfo
IsSimpleSelector bool
TopBottomCallInfo *TopBottomCallInfo
IsSimpleSelector bool
IsStreamTransformation bool
}

type FloatBulkPointAggregator interface {
Expand Down
1 change: 1 addition & 0 deletions pipeline/influxql.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ReduceCreater struct {

TopBottomCallInfo *TopBottomCallInfo
IsSimpleSelector bool
IsStreamTransformation bool
}

{{range .}}
Expand Down
Loading