From 1803d589372e12022a4b6e977901ac8eeebb4e7f Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Sat, 2 Jul 2016 08:59:10 -0600 Subject: [PATCH] add |combine method (#693) --- CHANGELOG.md | 2 + combine.go | 313 +++++++++++++++++++++ combine_test.go | 137 +++++++++ integrations/batcher_test.go | 186 ++++++++++++ integrations/data/TestBatch_Combine.0.brpl | 18 ++ integrations/data/TestStream_Combine.srpl | 36 +++ integrations/streamer_test.go | 180 ++++++++++++ models/point.go | 8 + pipeline/combine.go | 133 +++++++++ pipeline/node.go | 7 + task.go | 2 + union.go | 12 +- 12 files changed, 1028 insertions(+), 6 deletions(-) create mode 100644 combine.go create mode 100644 combine_test.go create mode 100644 integrations/data/TestBatch_Combine.0.brpl create mode 100644 integrations/data/TestStream_Combine.srpl create mode 100644 pipeline/combine.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a6e62f39..24b1b01e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ - [#662](https://github.com/influxdata/kapacitor/pull/662): Add `-skipVerify` flag to `kapacitor` CLI tool to skip SSL verification. - [#680](https://github.com/influxdata/kapacitor/pull/680): Add Telegram Alerting option +- [#46](https://github.com/influxdata/kapacitor/issue/46): Can now create combinations of points within the same stream. + This is kind of like join but instead joining a stream with itself. ### Bugfixes diff --git a/combine.go b/combine.go new file mode 100644 index 000000000..795a78ca0 --- /dev/null +++ b/combine.go @@ -0,0 +1,313 @@ +package kapacitor + +import ( + "fmt" + "log" + "sort" + "time" + + "github.com/influxdata/kapacitor/models" + "github.com/influxdata/kapacitor/pipeline" + "github.com/influxdata/kapacitor/tick/stateful" +) + +type CombineNode struct { + node + c *pipeline.CombineNode + + expressions []stateful.Expression + expressionsByGroup map[models.GroupID][]stateful.Expression + scopePools []stateful.ScopePool + + combination combination +} + +// Create a new CombineNode, which combines a stream with itself dynamically. +func newCombineNode(et *ExecutingTask, n *pipeline.CombineNode, l *log.Logger) (*CombineNode, error) { + cn := &CombineNode{ + c: n, + node: node{Node: n, et: et, logger: l}, + expressionsByGroup: make(map[models.GroupID][]stateful.Expression), + combination: combination{max: n.Max}, + } + // Create stateful expressions + cn.expressions = make([]stateful.Expression, len(n.Lambdas)) + cn.scopePools = make([]stateful.ScopePool, len(n.Lambdas)) + for i, lambda := range n.Lambdas { + statefulExpr, err := stateful.NewExpression(lambda.Expression) + if err != nil { + return nil, fmt.Errorf("Failed to compile %v expression: %v", i, err) + } + cn.expressions[i] = statefulExpr + cn.scopePools[i] = stateful.NewScopePool(stateful.FindReferenceVariables(lambda.Expression)) + } + cn.node.runF = cn.runCombine + return cn, nil +} + +type buffer struct { + Time time.Time + Name string + Group models.GroupID + Dimensions models.Dimensions + Points []rawPoint +} + +type timeList []time.Time + +func (t timeList) Len() int { return len(t) } +func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) } +func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] } + +func (n *CombineNode) runCombine([]byte) error { + switch n.Wants() { + case pipeline.StreamEdge: + buffers := make(map[models.GroupID]*buffer) + for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() { + n.timer.Start() + t := p.Time.Round(n.c.Tolerance) + currentBuf, ok := buffers[p.Group] + if !ok { + currentBuf = &buffer{ + Time: t, + Name: p.Name, + Group: p.Group, + Dimensions: p.Dimensions, + } + buffers[p.Group] = currentBuf + } + rp := rawPoint{ + Time: t, + Fields: p.Fields, + Tags: p.Tags, + } + if t.Equal(currentBuf.Time) { + currentBuf.Points = append(currentBuf.Points, rp) + } else { + if err := n.combineBuffer(currentBuf); err != nil { + return err + } + currentBuf.Time = t + currentBuf.Name = p.Name + currentBuf.Group = p.Group + currentBuf.Dimensions = p.Dimensions + currentBuf.Points = currentBuf.Points[0:1] + currentBuf.Points[0] = rp + } + n.timer.Stop() + } + case pipeline.BatchEdge: + allBuffers := make(map[models.GroupID]map[time.Time]*buffer) + groupTimes := make(map[models.GroupID]time.Time) + for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() { + n.timer.Start() + t := b.TMax.Round(n.c.Tolerance) + buffers, ok := allBuffers[b.Group] + if !ok { + buffers = make(map[time.Time]*buffer) + allBuffers[b.Group] = buffers + groupTimes[b.Group] = t + } + groupTime := groupTimes[b.Group] + if !t.Equal(groupTime) { + // Set new groupTime + groupTimes[b.Group] = t + // Combine/Emit all old buffers + times := make(timeList, 0, len(buffers)) + for t := range buffers { + times = append(times, t) + } + sort.Sort(times) + for _, t := range times { + if err := n.combineBuffer(buffers[t]); err != nil { + return err + } + delete(buffers, t) + } + } + for _, p := range b.Points { + t := p.Time.Round(n.c.Tolerance) + currentBuf, ok := buffers[t] + if !ok { + currentBuf = &buffer{ + Time: t, + Name: b.Name, + Group: b.Group, + Dimensions: b.PointDimensions(), + } + buffers[t] = currentBuf + } + currentBuf.Points = append(currentBuf.Points, rawPoint{ + Time: t, + Fields: p.Fields, + Tags: p.Tags, + }) + } + n.timer.Stop() + } + } + return nil +} + +// Simple container for point data. +type rawPoint struct { + Time time.Time + Fields models.Fields + Tags models.Tags +} + +// Combine a set of points into all their combinations. +func (n *CombineNode) combineBuffer(buf *buffer) error { + if len(buf.Points) == 0 { + return nil + } + l := len(n.expressions) + expressions, ok := n.expressionsByGroup[buf.Group] + if !ok { + expressions = make([]stateful.Expression, l) + for i, expr := range n.expressions { + expressions[i] = expr.CopyReset() + } + n.expressionsByGroup[buf.Group] = expressions + } + + // Compute matching result for all points + matches := make([]map[int]bool, l) + for i := 0; i < l; i++ { + matches[i] = make(map[int]bool, len(buf.Points)) + } + for idx, p := range buf.Points { + for i := range expressions { + matched, err := EvalPredicate(expressions[i], n.scopePools[i], p.Time, p.Fields, p.Tags) + if err != nil { + n.logger.Println("E! evaluating lambda expression:", err) + } + matches[i][idx] = matched + } + } + + p := models.Point{ + Name: buf.Name, + Group: buf.Group, + Dimensions: buf.Dimensions, + } + dimensions := p.Dimensions.ToSet() + set := make([]rawPoint, l) + return n.combination.Do(len(buf.Points), l, func(indices []int) error { + valid := true + for s := 0; s < l; s++ { + found := false + for i := range indices { + if matches[s][indices[i]] { + set[s] = buf.Points[indices[i]] + indices = append(indices[0:i], indices[i+1:]...) + found = true + break + } + } + if !found { + valid = false + break + } + } + if valid { + rp := n.merge(set, dimensions) + + p.Time = rp.Time.Round(n.c.Tolerance) + p.Fields = rp.Fields + p.Tags = rp.Tags + + n.timer.Pause() + for _, out := range n.outs { + err := out.CollectPoint(p) + if err != nil { + return err + } + } + n.timer.Resume() + } + return nil + }) +} + +// Merge a set of points into a single point. +func (n *CombineNode) merge(points []rawPoint, dimensions map[string]bool) rawPoint { + fields := make(models.Fields, len(points[0].Fields)*len(points)) + tags := make(models.Tags, len(points[0].Tags)*len(points)) + + for i, p := range points { + for field, value := range p.Fields { + fields[n.c.Names[i]+n.c.Delimiter+field] = value + } + for tag, value := range p.Tags { + if !dimensions[tag] { + tags[n.c.Names[i]+n.c.Delimiter+tag] = value + } else { + tags[tag] = value + } + } + } + + return rawPoint{ + Time: points[0].Time, + Fields: fields, + Tags: tags, + } +} + +// Type for performing actions on a set of combinations. +type combination struct { + max int64 +} + +// Do action for each combination, based on combinatorial logic n choose k. +// If n choose k > max an error is returned +func (c combination) Do(n, k int, f func(indices []int) error) error { + if count := c.Count(int64(n), int64(k)); count > c.max { + return fmt.Errorf("refusing to perform combination as total combinations %d exceeds max combinations %d", count, c.max) + } else if count == -1 { + // Nothing to do + return nil + } + + indices := make([]int, k) + indicesCopy := make([]int, k) + for i := 0; i < k; i++ { + indices[i] = i + } + copy(indicesCopy, indices) + if err := f(indicesCopy); err != nil { + return err + } + for { + i := k - 1 + for ; i >= 0; i-- { + if indices[i] != i+n-k { + break + } + } + if i == -1 { + return nil + } + indices[i]++ + for j := i + 1; j < k; j++ { + indices[j] = indices[j-1] + 1 + } + copy(indicesCopy, indices) + if err := f(indicesCopy); err != nil { + return err + } + } +} + +// Count the number of possible combinations of n choose k. +func (c combination) Count(n, k int64) int64 { + if n < k { + return -1 + } + count := int64(1) + for i := int64(0); i < k; i++ { + count = (count * (n - i)) / (i + 1) + } + return count +} diff --git a/combine_test.go b/combine_test.go new file mode 100644 index 000000000..77c489c3b --- /dev/null +++ b/combine_test.go @@ -0,0 +1,137 @@ +package kapacitor + +import ( + "reflect" + "testing" +) + +func Test_Combination_Count(t *testing.T) { + c := combination{max: 1e9} + testCases := []struct { + n, k, exp int64 + }{ + { + n: 1, + k: 0, + exp: 1, + }, + { + n: 1, + k: 1, + exp: 1, + }, + { + n: 2, + k: 1, + exp: 2, + }, + { + n: 5, + k: 2, + exp: 10, + }, + { + n: 5, + k: 3, + exp: 10, + }, + { + n: 52, + k: 5, + exp: 2598960, + }, + } + for _, tc := range testCases { + if exp, got := tc.exp, c.Count(tc.n, tc.k); exp != got { + t.Errorf("unexpected combination count for %d choose %d: got %d exp %d", tc.n, tc.k, got, exp) + } + } +} +func Test_Combination_Do(t *testing.T) { + c := combination{max: 1e9} + testCases := []struct { + n, k int + exp [][]int + }{ + { + n: 1, + k: 1, + exp: [][]int{{0}}, + }, + { + n: 5, + k: 2, + exp: [][]int{ + {0, 1}, + {0, 2}, + {0, 3}, + {0, 4}, + {1, 2}, + {1, 3}, + {1, 4}, + {2, 3}, + {2, 4}, + {3, 4}, + }, + }, + { + n: 5, + k: 3, + exp: [][]int{ + {0, 1, 2}, + {0, 1, 3}, + {0, 1, 4}, + {0, 2, 3}, + {0, 2, 4}, + {0, 3, 4}, + {1, 2, 3}, + {1, 2, 4}, + {1, 3, 4}, + {2, 3, 4}, + }, + }, + { + n: 7, + k: 5, + exp: [][]int{ + {0, 1, 2, 3, 4}, + {0, 1, 2, 3, 5}, + {0, 1, 2, 3, 6}, + {0, 1, 2, 4, 5}, + {0, 1, 2, 4, 6}, + {0, 1, 2, 5, 6}, + {0, 1, 3, 4, 5}, + {0, 1, 3, 4, 6}, + {0, 1, 3, 5, 6}, + {0, 1, 4, 5, 6}, + {0, 2, 3, 4, 5}, + {0, 2, 3, 4, 6}, + {0, 2, 3, 5, 6}, + {0, 2, 4, 5, 6}, + {0, 3, 4, 5, 6}, + {1, 2, 3, 4, 5}, + {1, 2, 3, 4, 6}, + {1, 2, 3, 5, 6}, + {1, 2, 4, 5, 6}, + {1, 3, 4, 5, 6}, + {2, 3, 4, 5, 6}, + }, + }, + } + for _, tc := range testCases { + i := 0 + c.Do(tc.n, tc.k, func(indices []int) error { + if i == len(tc.exp) { + t.Fatalf("too many combinations returned for %d choose %d: got %v", tc.n, tc.k, indices) + } + if !reflect.DeepEqual(tc.exp[i], indices) { + t.Errorf("unexpected combination set for %d choose %d index %d: got %v exp %v", tc.n, tc.k, i, indices, tc.exp[i]) + } + i++ + return nil + }) + if i != len(tc.exp) { + t.Errorf("not enough combinations returned for %d choose %d", tc.n, tc.k) + } + } +} diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index 676b18c87..4a0829612 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -943,6 +943,192 @@ batch } } +func TestBatch_Combine_All(t *testing.T) { + var script = ` +batch + |query('SELECT value FROM "telegraf"."default"."request_latency"') + .period(10s) + .every(10s) + .groupBy('dc','service') + |groupBy('dc') + |combine(lambda: TRUE, lambda: TRUE) + .as('first', 'second') + .tolerance(5s) + .delimiter('.') + |groupBy('first.service', 'second.service', 'dc') + |eval(lambda: "first.value" / "second.value") + .as('ratio') + |httpOut('TestBatch_Combine') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "first.service": "cart", "second.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 3.0 / 2.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "first.service": "cart", "second.service": "log"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 3.0 / 1.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "first.service": "auth", "second.service": "log"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 2.0 / 1.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "first.service": "cart", "second.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 7.0 / 6.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "first.service": "cart", "second.service": "log"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 7.0 / 4.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "first.service": "auth", "second.service": "log"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 6.0 / 4.0, + }}, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_Combine", script, 40*time.Second, er, true) +} + +func TestBatch_Combine_Filtered(t *testing.T) { + var script = ` +batch + |query('SELECT value FROM "telegraf"."default"."request_latency"') + .period(10s) + .every(10s) + .groupBy('dc','service') + |groupBy('dc') + |combine(lambda: "service" == 'auth', lambda: TRUE) + .as('auth', 'other') + .tolerance(5s) + .delimiter('.') + |groupBy('auth.service', 'other.service', 'dc') + |eval(lambda: "auth.value" / "other.value") + .as('ratio') + |httpOut('TestBatch_Combine') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "other.service": "log", "auth.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 2.0 / 1.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "other.service": "cart", "auth.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 2.0 / 3.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "other.service": "log", "auth.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 6.0 / 4.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "other.service": "cart", "auth.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 6.0 / 7.0, + }}, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_Combine", script, 30*time.Second, er, true) +} + +func TestBatch_Combine_All_Triples(t *testing.T) { + var script = ` +batch + |query('SELECT value FROM "telegraf"."default"."request_latency"') + .period(10s) + .every(10s) + .groupBy('dc','service') + |groupBy('dc') + |combine(lambda: TRUE, lambda: TRUE, lambda: TRUE) + .as('first', 'second','third') + .tolerance(5s) + .delimiter('.') + |groupBy('first.service', 'second.service', 'third.service', 'dc') + |eval(lambda: "first.value" + "second.value" + "third.value") + .as('sum') + |httpOut('TestBatch_Combine') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "first.service": "cart", "second.service": "auth", "third.service": "log"}, + Columns: []string{"time", "sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 6.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "first.service": "cart", "second.service": "auth", "third.service": "log"}, + Columns: []string{"time", "sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 15, 0, time.UTC), + 17.0, + }}, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_Combine", script, 30*time.Second, er, true) +} + func TestBatch_Join(t *testing.T) { var script = ` diff --git a/integrations/data/TestBatch_Combine.0.brpl b/integrations/data/TestBatch_Combine.0.brpl new file mode 100644 index 000000000..3811f1d5b --- /dev/null +++ b/integrations/data/TestBatch_Combine.0.brpl @@ -0,0 +1,18 @@ +{"name":"request_latency","tags":{"dc":"A","service":"cart"},"points":[{"fields":{"value":0},"tags":{"dc":"A","service":"cart"},"time":"2015-10-30T00:00:00Z"},{"fields":{"value":0},"tags":{"dc":"A","service":"cart"},"time":"2015-10-30T00:00:05Z"}]} +{"name":"request_latency","tags":{"dc":"A","service":"auth"},"points":[{"fields":{"value":0},"tags":{"dc":"A","service":"auth"},"time":"2015-10-30T00:00:01Z"},{"fields":{"value":0},"tags":{"dc":"A","service":"auth"},"time":"2015-10-30T00:00:06Z"}]} +{"name":"request_latency","tags":{"dc":"A","service":"log"}, "points":[{"fields":{"value":0},"tags":{"dc":"A","service":"log"}, "time":"2015-10-30T00:00:02Z"},{"fields":{"value":0},"tags":{"dc":"A","service":"log"}, "time":"2015-10-30T00:00:07Z"}]} +{"name":"request_latency","tags":{"dc":"B","service":"cart"},"points":[{"fields":{"value":0},"tags":{"dc":"B","service":"cart"},"time":"2015-10-30T00:00:00Z"},{"fields":{"value":0},"tags":{"dc":"B","service":"cart"},"time":"2015-10-30T00:00:05Z"}]} +{"name":"request_latency","tags":{"dc":"B","service":"auth"},"points":[{"fields":{"value":0},"tags":{"dc":"B","service":"auth"},"time":"2015-10-30T00:00:01Z"},{"fields":{"value":0},"tags":{"dc":"B","service":"auth"},"time":"2015-10-30T00:00:06Z"}]} +{"name":"request_latency","tags":{"dc":"B","service":"log"}, "points":[{"fields":{"value":0},"tags":{"dc":"B","service":"log"}, "time":"2015-10-30T00:00:02Z"},{"fields":{"value":0},"tags":{"dc":"B","service":"log"}, "time":"2015-10-30T00:00:07Z"}]} +{"name":"request_latency","tags":{"dc":"A","service":"cart"},"points":[{"fields":{"value":8},"tags":{"dc":"A","service":"cart"},"time":"2015-10-30T00:00:10Z"},{"fields":{"value":3},"tags":{"dc":"A","service":"cart"},"time":"2015-10-30T00:00:15Z"}]} +{"name":"request_latency","tags":{"dc":"A","service":"auth"},"points":[{"fields":{"value":4},"tags":{"dc":"A","service":"auth"},"time":"2015-10-30T00:00:11Z"},{"fields":{"value":2},"tags":{"dc":"A","service":"auth"},"time":"2015-10-30T00:00:16Z"}]} +{"name":"request_latency","tags":{"dc":"A","service":"log"}, "points":[{"fields":{"value":7},"tags":{"dc":"A","service":"log"}, "time":"2015-10-30T00:00:12Z"},{"fields":{"value":1},"tags":{"dc":"A","service":"log"}, "time":"2015-10-30T00:00:17Z"}]} +{"name":"request_latency","tags":{"dc":"B","service":"cart"},"points":[{"fields":{"value":3},"tags":{"dc":"B","service":"cart"},"time":"2015-10-30T00:00:10Z"},{"fields":{"value":7},"tags":{"dc":"B","service":"cart"},"time":"2015-10-30T00:00:15Z"}]} +{"name":"request_latency","tags":{"dc":"B","service":"auth"},"points":[{"fields":{"value":9},"tags":{"dc":"B","service":"auth"},"time":"2015-10-30T00:00:11Z"},{"fields":{"value":6},"tags":{"dc":"B","service":"auth"},"time":"2015-10-30T00:00:16Z"}]} +{"name":"request_latency","tags":{"dc":"B","service":"log"}, "points":[{"fields":{"value":5},"tags":{"dc":"B","service":"log"}, "time":"2015-10-30T00:00:12Z"},{"fields":{"value":4},"tags":{"dc":"B","service":"log"}, "time":"2015-10-30T00:00:17Z"}]} +{"name":"request_latency","tags":{"dc":"A","service":"cart"},"points":[{"fields":{"value":0},"tags":{"dc":"A","service":"cart"},"time":"2015-10-30T00:00:20Z"},{"fields":{"value":0},"tags":{"dc":"A","service":"cart"},"time":"2015-10-30T00:00:25Z"}]} +{"name":"request_latency","tags":{"dc":"A","service":"auth"},"points":[{"fields":{"value":0},"tags":{"dc":"A","service":"auth"},"time":"2015-10-30T00:00:21Z"},{"fields":{"value":0},"tags":{"dc":"A","service":"auth"},"time":"2015-10-30T00:00:26Z"}]} +{"name":"request_latency","tags":{"dc":"A","service":"log"}, "points":[{"fields":{"value":0},"tags":{"dc":"A","service":"log"}, "time":"2015-10-30T00:00:22Z"},{"fields":{"value":0},"tags":{"dc":"A","service":"log"}, "time":"2015-10-30T00:00:27Z"}]} +{"name":"request_latency","tags":{"dc":"B","service":"cart"},"points":[{"fields":{"value":0},"tags":{"dc":"B","service":"cart"},"time":"2015-10-30T00:00:20Z"},{"fields":{"value":0},"tags":{"dc":"B","service":"cart"},"time":"2015-10-30T00:00:25Z"}]} +{"name":"request_latency","tags":{"dc":"B","service":"auth"},"points":[{"fields":{"value":0},"tags":{"dc":"B","service":"auth"},"time":"2015-10-30T00:00:21Z"},{"fields":{"value":0},"tags":{"dc":"B","service":"auth"},"time":"2015-10-30T00:00:26Z"}]} +{"name":"request_latency","tags":{"dc":"B","service":"log"}, "points":[{"fields":{"value":0},"tags":{"dc":"B","service":"log"}, "time":"2015-10-30T00:00:22Z"},{"fields":{"value":0},"tags":{"dc":"B","service":"log"}, "time":"2015-10-30T00:00:27Z"}]} diff --git a/integrations/data/TestStream_Combine.srpl b/integrations/data/TestStream_Combine.srpl new file mode 100644 index 000000000..5f40a95d5 --- /dev/null +++ b/integrations/data/TestStream_Combine.srpl @@ -0,0 +1,36 @@ +dbname +rpname +request_latency,service=auth,dc=A value=700 0000000001 +dbname +rpname +request_latency,service=log,dc=A value=600 0000000001 +dbname +rpname +request_latency,service=cart,dc=A value=800 0000000001 +dbname +rpname +request_latency,service=auth,dc=B value=750 0000000001 +dbname +rpname +request_latency,service=log,dc=B value=650 0000000001 +dbname +rpname +request_latency,service=cart,dc=B value=850 0000000001 +dbname +rpname +request_latency,service=auth,dc=A value=500 0000000002 +dbname +rpname +request_latency,service=log,dc=A value=700 0000000002 +dbname +rpname +request_latency,service=cart,dc=A value=300 0000000002 +dbname +rpname +request_latency,service=auth,dc=B value=850 0000000002 +dbname +rpname +request_latency,service=log,dc=B value=450 0000000002 +dbname +rpname +request_latency,service=cart,dc=B value=950 0000000002 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 62464581a..958000c75 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -1313,6 +1313,186 @@ byCpu testStreamerWithOutput(t, "TestStream_GroupByWhere", script, 13*time.Second, er, nil, true) } +func TestStream_Combine_All(t *testing.T) { + var script = ` +stream + |from() + .measurement('request_latency') + .groupBy('dc') + |combine(lambda: TRUE, lambda: TRUE) + .as('first', 'second') + .tolerance(1s) + .delimiter('.') + |groupBy('first.service', 'second.service', 'dc') + |eval(lambda: "first.value" / "second.value") + .as('ratio') + |httpOut('TestStream_Combine') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "second.service": "log", "first.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 7.0 / 6.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "second.service": "cart", "first.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 7.0 / 8.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "second.service": "cart", "first.service": "log"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 6.0 / 8.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "second.service": "log", "first.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 7.5 / 6.5, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "second.service": "cart", "first.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 7.5 / 8.5, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "second.service": "cart", "first.service": "log"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 6.5 / 8.5, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Combine", script, 13*time.Second, er, nil, true) +} + +func TestStream_Combine_Filtered(t *testing.T) { + var script = ` +stream + |from() + .measurement('request_latency') + .groupBy('dc') + |combine(lambda: "service" == 'auth', lambda: TRUE) + .as('auth', 'other') + .tolerance(1s) + .delimiter('.') + |groupBy('other.service','dc') + |eval(lambda: "auth.value" / "other.value") + .as('ratio') + |httpOut('TestStream_Combine') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "other.service": "log", "auth.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 7.0 / 6.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "other.service": "cart", "auth.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 7.0 / 8.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "other.service": "log", "auth.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 7.5 / 6.5, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "other.service": "cart", "auth.service": "auth"}, + Columns: []string{"time", "ratio"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 7.5 / 8.5, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Combine", script, 13*time.Second, er, nil, true) +} + +func TestStream_Combine_All_Triples(t *testing.T) { + var script = ` +stream + |from() + .measurement('request_latency') + .groupBy('dc') + |combine(lambda: TRUE, lambda: TRUE, lambda: TRUE) + .as('first', 'second', 'third') + .tolerance(1s) + .delimiter('.') + |groupBy('first.service', 'second.service', 'third.service', 'dc') + |eval(lambda: "first.value" + "second.value" + "third.value") + .as('sum') + |httpOut('TestStream_Combine') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "request_latency", + Tags: map[string]string{"dc": "A", "first.service": "auth", "second.service": "log", "third.service": "cart"}, + Columns: []string{"time", "sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 2100.0, + }}, + }, + { + Name: "request_latency", + Tags: map[string]string{"dc": "B", "first.service": "auth", "second.service": "log", "third.service": "cart"}, + Columns: []string{"time", "sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 2250.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Combine", script, 13*time.Second, er, nil, true) +} + func TestStream_Join(t *testing.T) { var script = ` diff --git a/models/point.go b/models/point.go index e1fa67ae7..b96dda24d 100644 --- a/models/point.go +++ b/models/point.go @@ -203,3 +203,11 @@ func (d Dimensions) Copy() Dimensions { copy(cd, d) return cd } + +func (d Dimensions) ToSet() map[string]bool { + set := make(map[string]bool, len(d)) + for _, dim := range d { + set[dim] = true + } + return set +} diff --git a/pipeline/combine.go b/pipeline/combine.go new file mode 100644 index 000000000..b58029c09 --- /dev/null +++ b/pipeline/combine.go @@ -0,0 +1,133 @@ +package pipeline + +import ( + "fmt" + "strings" + "time" + + "github.com/influxdata/kapacitor/tick/ast" +) + +const ( + defaultCombineDelimiter = "." + defaultMaxCombinations = 1e6 +) + +// Combine the data from a single node with itself. +// Points with the same time are grouped and then combinations are created. +// The size of the combinations is defined by how many expressions are given. +// Combinations are order independent and will not ever include the same point multiple times. +// +// Example: +// stream +// |from() +// .measurement('request_latency') +// |combine(lambda: "service" == 'login', lambda: TRUE) +// .as('login', 'other') +// // points that are within 1 second are considered the same time. +// .tolerance(1s) +// // delimiter for new field and tag names +// .delimiter('.') +// // Change group by to be new other.service tag +// |groupBy('other.service') +// // Both the "value" fields from each data point have been prefixed +// // with the respective names 'login' and 'other'. +// |eval(lambda: "login.value" / "other.value") +// .as('ratio') +// ... +// +// In the above example the data points for the `login` service are combined with the data points from all other services. +// +// Example: +// |combine(lambda: TRUE, lambda: TRUE) +// .as('login', 'other') +// +// In the above example all combination pairs are created. +// +// Example: +// |combine(lambda: TRUE, lambda: TRUE, lambda: TRUE) +// .as('login', 'other', 'another') +// +// In the above example all combinations triples are created. +type CombineNode struct { + chainnode + + // The list of expressions for matching pairs + // tick:ignore + Lambdas []*ast.LambdaNode + + // The alias names of the two parents. + // Note: + // Names[1] corresponds to the left parent + // Names[0] corresponds to the right parent + // tick:ignore + Names []string `tick:"As"` + + // The delimiter between the As names and existing field an tag keys. + // Can be the empty string, but you are responsible for ensuring conflicts are not possible if you use the empty string. + Delimiter string + + // The maximum duration of time that two incoming points + // can be apart and still be considered to be equal in time. + // The joined data point's time will be rounded to the nearest + // multiple of the tolerance duration. + Tolerance time.Duration + + // Maximum number of possible combinations. + // Since the number of possible combinations can grow very rapidly + // you can set a maximum number of combinations allowed. + // If the max is crossed, an error is logged and the combinations are not calculated. + // Default: 10,000 + Max int64 +} + +func newCombineNode(e EdgeType, lambdas []*ast.LambdaNode) *CombineNode { + c := &CombineNode{ + chainnode: newBasicChainNode("combine", e, StreamEdge), + Lambdas: lambdas, + Delimiter: defaultCombineDelimiter, + Max: defaultMaxCombinations, + } + return c +} + +// Prefix names for all fields from the respective nodes. +// Each field from the parent nodes will be prefixed with the provided name and a '.'. +// See the example above. +// +// The names cannot have a dot '.' character. +// +// tick:property +func (n *CombineNode) As(names ...string) *CombineNode { + n.Names = names + return n +} + +// Validate that the as() specification is consistent with the number of combine expressions. +func (n *CombineNode) validate() error { + if len(n.Names) == 0 { + return fmt.Errorf("a call to combine.as() is required to specify the output stream prefixes.") + } + + if len(n.Names) != len(n.Lambdas) { + return fmt.Errorf("number of prefixes specified by combine.as() must match the number of combine expressions") + } + + for _, name := range n.Names { + if len(name) == 0 { + return fmt.Errorf("must provide a prefix name for the combine node, see .as() property method") + } + if strings.Contains(name, n.Delimiter) { + return fmt.Errorf("cannot use name %s as field prefix, it contains the delimiter character %s", name, n.Delimiter) + } + } + names := make(map[string]bool, len(n.Names)) + for _, name := range n.Names { + if names[name] { + return fmt.Errorf("cannot use the same prefix name see .as() property method") + } + names[name] = true + } + + return nil +} diff --git a/pipeline/node.go b/pipeline/node.go index 7a183f827..e37539b47 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -354,6 +354,13 @@ func (n *chainnode) Join(others ...Node) *JoinNode { return j } +// Combine this node with itself. The data is combine on timestamp. +func (n *chainnode) Combine(expressions ...*ast.LambdaNode) *CombineNode { + c := newCombineNode(n.provides, expressions) + n.linkChild(c) + return c +} + // Create an eval node that will evaluate the given transformation function to each data point. // A list of expressions may be provided and will be evaluated in the order they are given // and results of previous expressions are made available to later expressions. diff --git a/task.go b/task.go index 1c2d4d4da..066714e3e 100644 --- a/task.go +++ b/task.go @@ -485,6 +485,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (n Node, err n, err = newLogNode(et, t, l) case *pipeline.DefaultNode: n, err = newDefaultNode(et, t, l) + case *pipeline.CombineNode: + n, err = newCombineNode(et, t, l) default: return nil, fmt.Errorf("unknown pipeline node type %T", p) } diff --git a/union.go b/union.go index 1f4777232..cd0d5b278 100644 --- a/union.go +++ b/union.go @@ -33,10 +33,6 @@ func newUnionNode(et *ExecutingTask, n *pipeline.UnionNode, l *log.Logger) (*Uni func (u *UnionNode) runUnion([]byte) error { union := make(chan srcPoint) u.rename = u.u.Rename - if u.rename == "" { - //the calling node is always the last node - u.rename = u.parents[len(u.parents)-1].Name() - } // Spawn goroutine for each parent errors := make(chan error, len(u.ins)) for i, in := range u.ins { @@ -155,7 +151,9 @@ func (u *UnionNode) emit(v models.PointInterface) error { switch u.Provides() { case pipeline.StreamEdge: p := v.(models.Point) - p.Name = u.rename + if u.rename != "" { + p.Name = u.rename + } for _, child := range u.outs { err := child.CollectPoint(p) if err != nil { @@ -164,7 +162,9 @@ func (u *UnionNode) emit(v models.PointInterface) error { } case pipeline.BatchEdge: b := v.(models.Batch) - b.Name = u.rename + if u.rename != "" { + b.Name = u.rename + } for _, child := range u.outs { err := child.CollectBatch(b) if err != nil {