From 144a0bf547d557b4204a2a8e6217a4ee46023cc9 Mon Sep 17 00:00:00 2001 From: "j. Emrys Landivar (docmerlin)" Date: Fri, 9 Apr 2021 14:20:42 -0500 Subject: [PATCH 1/4] feat: tricklenode --- integrations/batcher_test.go | 55 +++++++++++----- integrations/benchmark_test.go | 2 +- integrations/streamer_test.go | 18 ++--- .../testdata/TestBatch_Trickle.0.brpl | 34 ++++++++++ pipeline/node.go | 11 ++++ pipeline/tick/ast.go | 2 + pipeline/tick/trickle.go | 26 ++++++++ pipeline/tick/trickle_test.go | 15 +++++ pipeline/trickle.go | 53 +++++++++++++++ task.go | 2 + trickle.go | 66 +++++++++++++++++++ 11 files changed, 258 insertions(+), 26 deletions(-) create mode 100644 integrations/testdata/TestBatch_Trickle.0.brpl create mode 100644 pipeline/tick/trickle.go create mode 100644 pipeline/tick/trickle_test.go create mode 100644 pipeline/trickle.go create mode 100644 trickle.go diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index b8fbfce0c..2e2c77dc6 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -16,9 +16,6 @@ import ( "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/clock" "github.com/influxdata/kapacitor/models" - alertservice "github.com/influxdata/kapacitor/services/alert" - "github.com/influxdata/kapacitor/services/httppost" - "github.com/influxdata/kapacitor/services/storage/storagetest" "github.com/influxdata/wlog" ) @@ -2605,6 +2602,42 @@ data testBatcherWithOutput(t, "TestBatch_StateTracking", script, 8*time.Second, er, false) } +func TestBatch_Trickle(t *testing.T) { + var script = ` + var data = batch + |query('SELECT value FROM "telegraf"."default"."cpu"') + .period(4s) + .every(4s) + .groupBy('host') +data + |trickle() + |window().period(10s) + |httpOut('TestBatch_Trickle')` + er := models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu-total"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + {time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), 90.38281469458698}, + {time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), 80.38281469458698}, + }, + }, + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu0"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + {time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), 83.56930693069836}, + }, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_Trickle", script, 40*time.Second, er, false) +} + func TestBatch_StateCount(t *testing.T) { var script = ` var data = batch @@ -3689,6 +3722,7 @@ batch // Helper test function for batcher func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) { + t.Helper() if testing.Verbose() { wlog.SetLevel(wlog.DEBUG) } else { @@ -3696,20 +3730,10 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex } // Create a new execution env - d := diagService.NewKapacitorHandler() - tm := kapacitor.NewTaskMaster("testBatcher", newServerInfo(), d) - httpdService := newHTTPDService() - tm.HTTPDService = httpdService - tm.TaskStore = taskStore{} - tm.DeadmanService = deadman{} - tm.HTTPPostService, _ = httppost.NewService(nil, diagService.NewHTTPPostHandler()) - as := alertservice.NewService(diagService.NewAlertServiceHandler()) - as.StorageService = storagetest.New() - as.HTTPDService = httpdService - if err := as.Open(); err != nil { + tm, err := createTaskMaster("testBatcher") + if err != nil { t.Fatal(err) } - tm.AlertService = as tm.Open() // Create task @@ -3764,7 +3788,6 @@ func testBatcherWithOutput( if err != nil { t.Error(err) } - // Get the result output, err := et.GetOutput(name) if err != nil { diff --git a/integrations/benchmark_test.go b/integrations/benchmark_test.go index e3aba343a..3e1e302ff 100644 --- a/integrations/benchmark_test.go +++ b/integrations/benchmark_test.go @@ -187,7 +187,7 @@ func Bench(b *testing.B, tasksCount, pointCount, expectedProcessedCount int, tic for i := 0; i < b.N; i++ { // Do not time setup b.StopTimer() - tm, err := createTaskMaster() + tm, err := createTaskMaster("testStreamer") if err != nil { b.Fatal(err) } diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 9d3e9fcaf..ed8358fbd 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -11732,7 +11732,7 @@ stream ` // Create a new execution env - tm, err := createTaskMaster() + tm, err := createTaskMaster("testStreamer") if err != nil { t.Fatal(err) } @@ -11788,7 +11788,7 @@ stream }, } // Create a new execution env - tm, err := createTaskMaster() + tm, err := createTaskMaster("testStreamer") if err != nil { t.Fatal(err) } @@ -11920,7 +11920,7 @@ stream }, } // Create a new execution env - tm, err := createTaskMaster() + tm, err := createTaskMaster("testStreamer") if err != nil { t.Fatal(err) } @@ -12345,7 +12345,7 @@ stream name := "TestStream_InfluxDBOut" // Create a new execution env - tm, err := createTaskMaster() + tm, err := createTaskMaster("testStreamer") if err != nil { t.Fatal(err) } @@ -12405,7 +12405,7 @@ stream name := "TestStream_InfluxDBOut" // Create a new execution env - tm, err := createTaskMaster() + tm, err := createTaskMaster("testStreamer") if err != nil { t.Fatal(err) } @@ -13456,7 +13456,7 @@ func testStreamer( } // Create a new execution env - tm, err := createTaskMaster() + tm, err := createTaskMaster("testStreamer") if err != nil { t.Fatal(err) } @@ -13537,7 +13537,7 @@ func testStreamerWithInputChannel( } // Create a new execution env - tm, err := createTaskMaster() + tm, err := createTaskMaster("testStreamer") if err != nil { t.Fatal(err) } @@ -13776,9 +13776,9 @@ func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface return nil } -func createTaskMaster() (*kapacitor.TaskMaster, error) { +func createTaskMaster(name string) (*kapacitor.TaskMaster, error) { d := diagService.NewKapacitorHandler() - tm := kapacitor.NewTaskMaster("testStreamer", newServerInfo(), d) + tm := kapacitor.NewTaskMaster(name, newServerInfo(), d) httpdService := newHTTPDService() tm.HTTPDService = httpdService tm.TaskStore = taskStore{} diff --git a/integrations/testdata/TestBatch_Trickle.0.brpl b/integrations/testdata/TestBatch_Trickle.0.brpl new file mode 100644 index 000000000..57c306919 --- /dev/null +++ b/integrations/testdata/TestBatch_Trickle.0.brpl @@ -0,0 +1,34 @@ +{ + "name": "cpu_usage_idle", + "tags": { + "cpu": "cpu-total" + }, + "points": [ + { + "fields": { + "mean": 90.38281469458698 + }, + "time": "2015-10-30T17:14:12Z" + }, + { + "fields": { + "mean": 80.38281469458698 + }, + "time": "2015-10-30T17:14:13Z" + } + ] +} +{ + "name": "cpu_usage_idle", + "tags": { + "cpu": "cpu0" + }, + "points": [ + { + "fields": { + "mean": 83.56930693069836 + }, + "time": "2015-10-30T17:14:12Z" + } + ] +} \ No newline at end of file diff --git a/pipeline/node.go b/pipeline/node.go index 9e19d54b6..d4d138576 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -535,3 +535,14 @@ func (n *chainnode) Sideload() *SideloadNode { n.linkChild(s) return s } + +// Create a node that converts batches (such as windowed data) into non-batches. +func (n *chainnode) Trickle() *TrickleNode { + if n.Provides() != BatchEdge { + panic("cannot Trickle stream edge") + } + + s := newTrickleNode() + n.linkChild(s) + return s +} diff --git a/pipeline/tick/ast.go b/pipeline/tick/ast.go index 75b3db35a..44a4d4823 100644 --- a/pipeline/tick/ast.go +++ b/pipeline/tick/ast.go @@ -147,6 +147,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) { case *pipeline.StreamNode: s := StreamNode{} return s.Build() + case *pipeline.TrickleNode: + return NewTrickle(parents).Build(node) case *pipeline.BatchNode: b := BatchNode{} return b.Build() diff --git a/pipeline/tick/trickle.go b/pipeline/tick/trickle.go new file mode 100644 index 000000000..53da387e8 --- /dev/null +++ b/pipeline/tick/trickle.go @@ -0,0 +1,26 @@ +package tick + +import ( + "github.com/influxdata/kapacitor/pipeline" + "github.com/influxdata/kapacitor/tick/ast" +) + +// TrickleNode converts the StatsNode pipeline node into the TICKScript AST +type TrickleNode struct { + Function +} + +// NewTrickle creates a TrickleNode function builder +func NewTrickle(parents []ast.Node) *TrickleNode { + return &TrickleNode{ + Function{ + Parents: parents, + }, + } +} + +// Build NewTrickle ast.Node +func (n *TrickleNode) Build(s *pipeline.TrickleNode) (ast.Node, error) { + n.Pipe("trickle") + return n.prev, n.err +} diff --git a/pipeline/tick/trickle_test.go b/pipeline/tick/trickle_test.go new file mode 100644 index 000000000..d8cd09a24 --- /dev/null +++ b/pipeline/tick/trickle_test.go @@ -0,0 +1,15 @@ +package tick_test + +import ( + "testing" +) + +func TestTrickle(t *testing.T) { + pipe, _, from := StreamFrom() + from.Trickle() + want := `stream + |from() + |trickle() +` + PipelineTickTestHelper(t, pipe, want) +} diff --git a/pipeline/trickle.go b/pipeline/trickle.go new file mode 100644 index 000000000..6ae51c862 --- /dev/null +++ b/pipeline/trickle.go @@ -0,0 +1,53 @@ +package pipeline + +import ( + "encoding/json" + "fmt" +) + +// A node that converts from batchedges to streamedges. +// Example: +// var errors = stream +// |from() +// |trickle() +// Children of trickle will be treated as if they are in a stream. +type TrickleNode struct { + chainnode `json:"-"` +} + +func newTrickleNode() *TrickleNode { + return &TrickleNode{ + chainnode: newBasicChainNode("trickle", BatchEdge, StreamEdge), + } +} + +// MarshalJSON converts TrickleNode to JSON +// tick:ignore +func (n *TrickleNode) MarshalJSON() ([]byte, error) { + var raw = &struct { + TypeOf + }{ + TypeOf: TypeOf{ + Type: "trickle", + ID: n.ID(), + }, + } + return json.Marshal(raw) +} + +// UnmarshalJSON converts JSON to an TrickleNode +// tick:ignore +func (n *TrickleNode) UnmarshalJSON(data []byte) error { + var raw = &struct { + TypeOf + }{} + err := json.Unmarshal(data, raw) + if err != nil { + return err + } + if raw.Type != "trickle" { + return fmt.Errorf("error unmarshaling node %d of type %s as TrickleNode", raw.ID, raw.Type) + } + n.setID(raw.ID) + return nil +} diff --git a/task.go b/task.go index cb72a9a17..70c2f1554 100644 --- a/task.go +++ b/task.go @@ -515,6 +515,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, n, err = newStateCountNode(et, t, d) case *pipeline.SideloadNode: n, err = newSideloadNode(et, t, d) + case *pipeline.TrickleNode: + n = newTrickleNode(et, t, d) case *pipeline.BarrierNode: n, err = newBarrierNode(et, t, d) default: diff --git a/trickle.go b/trickle.go new file mode 100644 index 000000000..7765798de --- /dev/null +++ b/trickle.go @@ -0,0 +1,66 @@ +package kapacitor + +import ( + "github.com/influxdata/kapacitor/edge" + "github.com/influxdata/kapacitor/models" + "github.com/influxdata/kapacitor/pipeline" +) + +func newTrickleNode(et *ExecutingTask, n *pipeline.TrickleNode, d NodeDiagnostic) *TrickleNode { + sn := &TrickleNode{ + node: node{Node: n, et: et, diag: d}, + } + sn.node.runF = sn.runTrickle + return sn +} + +type TrickleNode struct { + node + dims models.Dimensions + name string +} + +func (n *TrickleNode) runTrickle(_ []byte) error { + consumer := edge.NewConsumerWithReceiver( + n.ins[0], + n, + ) + return consumer.Consume() +} + +// BeginBatch sets some batch variables on the node, and isn't forwarded. +func (n *TrickleNode) BeginBatch(b edge.BeginBatchMessage) error { + n.dims = b.Dimensions() + n.name = b.Name() + return nil +} + +// BatchPoint forwards a PointMessage +func (n *TrickleNode) BatchPoint(bp edge.BatchPointMessage) error { + return n.outs[0].Collect(edge.NewPointMessage( + n.name, + "", + "", + n.dims, + bp.Fields(), + bp.Tags(), + bp.Time())) +} + +func (n *TrickleNode) EndBatch(end edge.EndBatchMessage) error { + return nil +} + +func (n *TrickleNode) Point(p edge.PointMessage) error { + return n.outs[0].Collect(p) +} + +func (n *TrickleNode) Barrier(barrier edge.BarrierMessage) error { + return n.outs[0].Collect(barrier) +} + +func (n *TrickleNode) DeleteGroup(d edge.DeleteGroupMessage) error { + return n.outs[0].Collect(d) +} + +func (n *TrickleNode) Done() {} From 463747f473713099142f4ce1202804a7c0902e56 Mon Sep 17 00:00:00 2001 From: "j. Emrys Landivar (docmerlin)" Date: Tue, 13 Apr 2021 09:57:03 -0500 Subject: [PATCH 2/4] chore: update changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d932885a3..b4c0fa6dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,9 @@ ### Features - [#2484](https://github.com/influxdata/kapacitor/pull/2484): Add Zenoss alert event handler. -- [#2512](https://github.com/influxdata/kapacitor/pull/2512): Pull in auth code from Kapacitor Enterprise. - [#2493](https://github.com/influxdata/kapacitor/pull/2493): Route kafka alerts to partitions by ID, and allow for configuring the hashing strategy. +- [#2512](https://github.com/influxdata/kapacitor/pull/2512): Pull in auth code from Kapacitor Enterprise. +- [#2530](https://github.com/influxdata/kapacitor/pull/2530): Add a node tricklenode that converts batches to streams, the inverse of windownode. ## v1.5.9 [2021-04-01] From 95e24ef5bb7b6ca804389aae29a1c2a0c61adf81 Mon Sep 17 00:00:00 2001 From: "j. Emrys Landivar (docmerlin)" Date: Tue, 13 Apr 2021 11:37:41 -0500 Subject: [PATCH 3/4] test: fix TestTrickle ast test --- pipeline/tick/tick_test.go | 4 +++- pipeline/tick/trickle_test.go | 7 ++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pipeline/tick/tick_test.go b/pipeline/tick/tick_test.go index 009a44fea..a25b91930 100644 --- a/pipeline/tick/tick_test.go +++ b/pipeline/tick/tick_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/pipeline/tick" "github.com/influxdata/kapacitor/tick/stateful" @@ -116,7 +118,7 @@ func PipelineTickTestHelper(t *testing.T, pipe *pipeline.Pipeline, want string, } if got != want { - t.Errorf("unexpected TICKscript:\ngot:\n%v\nwant:\n%v\n", got, want) + t.Errorf("unexpected TICKscript:\n %s", cmp.Diff(got, want)) t.Log(got) // print is helpful to get the correct format. } diff --git a/pipeline/tick/trickle_test.go b/pipeline/tick/trickle_test.go index d8cd09a24..0fcd1e74b 100644 --- a/pipeline/tick/trickle_test.go +++ b/pipeline/tick/trickle_test.go @@ -2,13 +2,18 @@ package tick_test import ( "testing" + "time" ) func TestTrickle(t *testing.T) { pipe, _, from := StreamFrom() - from.Trickle() + w := from.Window() + w.Every = time.Second + w.Trickle() want := `stream |from() + |window() + .every(1s) |trickle() ` PipelineTickTestHelper(t, pipe, want) From 7a99ba6ce3ead42fb9bc30279d195279edd8f389 Mon Sep 17 00:00:00 2001 From: "j. Emrys Landivar (docmerlin)" Date: Fri, 28 May 2021 13:51:03 -0500 Subject: [PATCH 4/4] chore: cleanup imports --- pipeline/tick/tick_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pipeline/tick/tick_test.go b/pipeline/tick/tick_test.go index a25b91930..e4b344a96 100644 --- a/pipeline/tick/tick_test.go +++ b/pipeline/tick/tick_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/pipeline/tick" "github.com/influxdata/kapacitor/tick/stateful"