Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tricklenode #2530

Merged
merged 6 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

### Features
- [#2484](https://github.com/influxdata/kapacitor/pull/2484): Add Zenoss alert event handler.
- [#2512](https://github.com/influxdata/kapacitor/pull/2512): Pull in auth code from Kapacitor Enterprise.
- [#2493](https://github.com/influxdata/kapacitor/pull/2493): Route kafka alerts to partitions by ID, and allow for configuring the hashing strategy.
- [#2512](https://github.com/influxdata/kapacitor/pull/2512): Pull in auth code from Kapacitor Enterprise.
- [#2530](https://github.com/influxdata/kapacitor/pull/2530): Add a node tricklenode that converts batches to streams, the inverse of windownode.
- [#2544](https://github.com/influxdata/kapacitor/pull/2544): flux tasks skeleton in Kapacitor
- [#2555](https://github.com/influxdata/kapacitor/pull/2555): run flux tasks with built-in flux engine
- [#2559](https://github.com/influxdata/kapacitor/pull/2559): kapacitor cli supports flux tasks
Expand Down
55 changes: 39 additions & 16 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/clock"
"github.com/influxdata/kapacitor/models"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/storage/storagetest"
"github.com/influxdata/wlog"
)

Expand Down Expand Up @@ -2605,6 +2602,42 @@ data
testBatcherWithOutput(t, "TestBatch_StateTracking", script, 8*time.Second, er, false)
}

func TestBatch_Trickle(t *testing.T) {
var script = `
var data = batch
|query('SELECT value FROM "telegraf"."default"."cpu"')
.period(4s)
.every(4s)
.groupBy('host')
data
|trickle()
|window().period(10s)
|httpOut('TestBatch_Trickle')`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking my understanding: this test is showing that trickle() followed by window is a way to re-batch data? i.e. in this case, re-batching from 4s windows into 10s windows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also that the results of trickle are unbached as window cannot accept batch data, but the inputs to trickle are batched.

er := models.Result{
Series: models.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu-total"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), 90.38281469458698},
{time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), 80.38281469458698},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu0"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), 83.56930693069836},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_Trickle", script, 40*time.Second, er, false)
}

func TestBatch_StateCount(t *testing.T) {
var script = `
var data = batch
Expand Down Expand Up @@ -3689,27 +3722,18 @@ batch

// Helper test function for batcher
func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) {
t.Helper()
if testing.Verbose() {
wlog.SetLevel(wlog.DEBUG)
} else {
wlog.SetLevel(wlog.OFF)
}

// Create a new execution env
d := diagService.NewKapacitorHandler()
tm := kapacitor.NewTaskMaster("testBatcher", newServerInfo(), d)
httpdService := newHTTPDService()
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.HTTPPostService, _ = httppost.NewService(nil, diagService.NewHTTPPostHandler())
as := alertservice.NewService(diagService.NewAlertServiceHandler())
as.StorageService = storagetest.New()
as.HTTPDService = httpdService
if err := as.Open(); err != nil {
tm, err := createTaskMaster("testBatcher")
if err != nil {
t.Fatal(err)
}
tm.AlertService = as
tm.Open()

// Create task
Expand Down Expand Up @@ -3764,7 +3788,6 @@ func testBatcherWithOutput(
if err != nil {
t.Error(err)
}

// Get the result
output, err := et.GetOutput(name)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion integrations/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func Bench(b *testing.B, tasksCount, pointCount, expectedProcessedCount int, tic
for i := 0; i < b.N; i++ {
// Do not time setup
b.StopTimer()
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
b.Fatal(err)
}
Expand Down
18 changes: 9 additions & 9 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11735,7 +11735,7 @@ stream
`

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -11791,7 +11791,7 @@ stream
},
}
// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -11923,7 +11923,7 @@ stream
},
}
// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -12348,7 +12348,7 @@ stream
name := "TestStream_InfluxDBOut"

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -12408,7 +12408,7 @@ stream
name := "TestStream_InfluxDBOut"

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -13459,7 +13459,7 @@ func testStreamer(
}

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -13540,7 +13540,7 @@ func testStreamerWithInputChannel(
}

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -13779,9 +13779,9 @@ func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface
return nil
}

func createTaskMaster() (*kapacitor.TaskMaster, error) {
func createTaskMaster(name string) (*kapacitor.TaskMaster, error) {
d := diagService.NewKapacitorHandler()
tm := kapacitor.NewTaskMaster("testStreamer", newServerInfo(), d)
tm := kapacitor.NewTaskMaster(name, newServerInfo(), d)
httpdService := newHTTPDService()
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
Expand Down
34 changes: 34 additions & 0 deletions integrations/testdata/TestBatch_Trickle.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"name": "cpu_usage_idle",
"tags": {
"cpu": "cpu-total"
},
"points": [
{
"fields": {
"mean": 90.38281469458698
},
"time": "2015-10-30T17:14:12Z"
},
{
"fields": {
"mean": 80.38281469458698
},
"time": "2015-10-30T17:14:13Z"
}
]
}
{
"name": "cpu_usage_idle",
"tags": {
"cpu": "cpu0"
},
"points": [
{
"fields": {
"mean": 83.56930693069836
},
"time": "2015-10-30T17:14:12Z"
}
]
}
11 changes: 11 additions & 0 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,14 @@ func (n *chainnode) Sideload() *SideloadNode {
n.linkChild(s)
return s
}

// Create a node that converts batches (such as windowed data) into non-batches.
func (n *chainnode) Trickle() *TrickleNode {
if n.Provides() != BatchEdge {
panic("cannot Trickle stream edge")
}

s := newTrickleNode()
n.linkChild(s)
return s
}
2 changes: 2 additions & 0 deletions pipeline/tick/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) {
case *pipeline.StreamNode:
s := StreamNode{}
return s.Build()
case *pipeline.TrickleNode:
return NewTrickle(parents).Build(node)
case *pipeline.BatchNode:
b := BatchNode{}
return b.Build()
Expand Down
3 changes: 2 additions & 1 deletion pipeline/tick/tick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/pipeline/tick"
"github.com/influxdata/kapacitor/tick/stateful"
Expand Down Expand Up @@ -116,7 +117,7 @@ func PipelineTickTestHelper(t *testing.T, pipe *pipeline.Pipeline, want string,
}

if got != want {
t.Errorf("unexpected TICKscript:\ngot:\n%v\nwant:\n%v\n", got, want)
t.Errorf("unexpected TICKscript:\n %s", cmp.Diff(got, want))
t.Log(got) // print is helpful to get the correct format.
}

Expand Down
26 changes: 26 additions & 0 deletions pipeline/tick/trickle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package tick

import (
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
)

// TrickleNode converts the StatsNode pipeline node into the TICKScript AST
type TrickleNode struct {
Function
}

// NewTrickle creates a TrickleNode function builder
func NewTrickle(parents []ast.Node) *TrickleNode {
return &TrickleNode{
Function{
Parents: parents,
},
}
}

// Build NewTrickle ast.Node
func (n *TrickleNode) Build(s *pipeline.TrickleNode) (ast.Node, error) {
n.Pipe("trickle")
return n.prev, n.err
}
20 changes: 20 additions & 0 deletions pipeline/tick/trickle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package tick_test

import (
"testing"
"time"
)

func TestTrickle(t *testing.T) {
pipe, _, from := StreamFrom()
w := from.Window()
w.Every = time.Second
w.Trickle()
want := `stream
|from()
|window()
.every(1s)
|trickle()
`
PipelineTickTestHelper(t, pipe, want)
}
53 changes: 53 additions & 0 deletions pipeline/trickle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package pipeline

import (
"encoding/json"
"fmt"
)

// A node that converts from batchedges to streamedges.
// Example:
// var errors = stream
// |from()
// |trickle()
// Children of trickle will be treated as if they are in a stream.
type TrickleNode struct {
chainnode `json:"-"`
}

func newTrickleNode() *TrickleNode {
return &TrickleNode{
chainnode: newBasicChainNode("trickle", BatchEdge, StreamEdge),
}
}

// MarshalJSON converts TrickleNode to JSON
// tick:ignore
func (n *TrickleNode) MarshalJSON() ([]byte, error) {
var raw = &struct {
TypeOf
}{
TypeOf: TypeOf{
Type: "trickle",
ID: n.ID(),
},
}
return json.Marshal(raw)
}

// UnmarshalJSON converts JSON to an TrickleNode
// tick:ignore
func (n *TrickleNode) UnmarshalJSON(data []byte) error {
var raw = &struct {
TypeOf
}{}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "trickle" {
return fmt.Errorf("error unmarshaling node %d of type %s as TrickleNode", raw.ID, raw.Type)
}
n.setID(raw.ID)
return nil
}
2 changes: 2 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node,
n, err = newStateCountNode(et, t, d)
case *pipeline.SideloadNode:
n, err = newSideloadNode(et, t, d)
case *pipeline.TrickleNode:
n = newTrickleNode(et, t, d)
case *pipeline.BarrierNode:
n, err = newBarrierNode(et, t, d)
default:
Expand Down
Loading