diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index aa98cf0ee..fc949894e 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -1238,6 +1238,106 @@ cpu0 testBatcherWithOutput(t, "TestBatch_Join", script, 30*time.Second, er, false) } +func TestBatch_Join_Delimiter(t *testing.T) { + + var script = ` +var cpu0 = batch + |query(''' + SELECT mean("value") + FROM "telegraf"."default".cpu_usage_idle + WHERE "cpu" = 'cpu0' +''') + .period(10s) + .every(10s) + .groupBy(time(2s)) + +var cpu1 = batch + |query(''' + SELECT mean("value") + FROM "telegraf"."default".cpu_usage_idle + WHERE "cpu" = 'cpu1' +''') + .period(10s) + .every(10s) + .groupBy(time(2s)) + +cpu0 + |join(cpu1) + .as('cpu0', 'cpu1') + .delimiter('~') + |count('cpu0~mean') + |window() + .period(20s) + .every(20s) + |sum('count') + |httpOut('TestBatch_Join') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu_usage_idle", + Columns: []string{"time", "sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 10.0, + }}, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_Join", script, 30*time.Second, er, false) +} +func TestBatch_Join_DelimiterEmpty(t *testing.T) { + + var script = ` +var cpu0 = batch + |query(''' + SELECT mean("value") + FROM "telegraf"."default".cpu_usage_idle + WHERE "cpu" = 'cpu0' +''') + .period(10s) + .every(10s) + .groupBy(time(2s)) + +var cpu1 = batch + |query(''' + SELECT mean("value") + FROM "telegraf"."default".cpu_usage_idle + WHERE "cpu" = 'cpu1' +''') + .period(10s) + .every(10s) + .groupBy(time(2s)) + +cpu0 + |join(cpu1) + .as('cpu0', 'cpu1') + .delimiter('') + |count('cpu0mean') + |window() + .period(20s) + .every(20s) + |sum('count') + |httpOut('TestBatch_Join') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu_usage_idle", + Columns: []string{"time", "sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 10.0, + }}, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_Join", script, 30*time.Second, er, false) +} func TestBatch_JoinTolerance(t *testing.T) { diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 66a4148ab..5f6155e94 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -1615,6 +1615,155 @@ errorCounts testStreamerWithOutput(t, "TestStream_Join", script, 13*time.Second, er, nil, true) } +func TestStream_Join_Delimiter(t *testing.T) { + + var script = ` +var errorCounts = stream + |from() + .measurement('errors') + .groupBy('service') + |window() + .period(10s) + .every(10s) + .align() + |sum('value') + +var viewCounts = stream + |from() + .measurement('views') + .groupBy('service') + |window() + .period(10s) + .every(10s) + .align() + |sum('value') + +errorCounts + |join(viewCounts) + .as('errors', 'views') + .delimiter('#') + .streamName('error_view') + |eval(lambda: "errors#sum" / "views#sum") + .as('error_percent') + .keep() + |httpOut('TestStream_Join') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "error_view", + Tags: map[string]string{"service": "cartA"}, + Columns: []string{"time", "error_percent", "errors#sum", "views#sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.01, + 47.0, + 4700.0, + }}, + }, + { + Name: "error_view", + Tags: map[string]string{"service": "login"}, + Columns: []string{"time", "error_percent", "errors#sum", "views#sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.01, + 45.0, + 4500.0, + }}, + }, + { + Name: "error_view", + Tags: map[string]string{"service": "front"}, + Columns: []string{"time", "error_percent", "errors#sum", "views#sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.01, + 32.0, + 3200.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Join", script, 13*time.Second, er, nil, true) +} +func TestStream_Join_DelimiterEmpty(t *testing.T) { + + var script = ` +var errorCounts = stream + |from() + .measurement('errors') + .groupBy('service') + |window() + .period(10s) + .every(10s) + .align() + |sum('value') + +var viewCounts = stream + |from() + .measurement('views') + .groupBy('service') + |window() + .period(10s) + .every(10s) + .align() + |sum('value') + +errorCounts + |join(viewCounts) + .as('errors', 'views') + .delimiter('') + .streamName('error_view') + |eval(lambda: "errorssum" / "viewssum") + .as('error_percent') + .keep() + |httpOut('TestStream_Join') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "error_view", + Tags: map[string]string{"service": "cartA"}, + Columns: []string{"time", "error_percent", "errorssum", "viewssum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.01, + 47.0, + 4700.0, + }}, + }, + { + Name: "error_view", + Tags: map[string]string{"service": "login"}, + Columns: []string{"time", "error_percent", "errorssum", "viewssum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.01, + 45.0, + 4500.0, + }}, + }, + { + Name: "error_view", + Tags: map[string]string{"service": "front"}, + Columns: []string{"time", "error_percent", "errorssum", "viewssum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.01, + 32.0, + 3200.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Join", script, 13*time.Second, er, nil, true) +} + func TestStream_JoinTolerance(t *testing.T) { var script = ` diff --git a/join.go b/join.go index 66e94a5f3..ffd57ef15 100644 --- a/join.go +++ b/join.go @@ -277,6 +277,7 @@ func (g *group) collect(i int, p models.PointInterface) error { g.j.fill, g.j.fillValue, g.j.j.Names, + g.j.j.Delimiter, g.j.j.Tolerance, t, g.j.logger, @@ -296,6 +297,7 @@ func (g *group) collect(i int, p models.PointInterface) error { g.j.fill, g.j.fillValue, g.j.j.Names, + g.j.j.Delimiter, g.j.j.Tolerance, t, g.j.logger, @@ -399,6 +401,7 @@ type joinset struct { fill influxql.FillOption fillValue interface{} prefixes []string + delimiter string time time.Time tolerance time.Duration @@ -418,6 +421,7 @@ func newJoinset( fill influxql.FillOption, fillValue interface{}, prefixes []string, + delimiter string, tolerance time.Duration, time time.Time, l *log.Logger, @@ -428,6 +432,7 @@ func newJoinset( fill: fill, fillValue: fillValue, prefixes: prefixes, + delimiter: delimiter, expected: expected, values: make([]models.PointInterface, expected), first: expected, @@ -467,11 +472,11 @@ func (js *joinset) JoinIntoPoint() (models.Point, bool) { switch js.fill { case influxql.NullFill: for k := range js.First().PointFields() { - fields[js.prefixes[i]+"."+k] = nil + fields[js.prefixes[i]+js.delimiter+k] = nil } case influxql.NumberFill: for k := range js.First().PointFields() { - fields[js.prefixes[i]+"."+k] = js.fillValue + fields[js.prefixes[i]+js.delimiter+k] = js.fillValue } default: // inner join no valid point possible @@ -479,7 +484,7 @@ func (js *joinset) JoinIntoPoint() (models.Point, bool) { } } else { for k, v := range p.PointFields() { - fields[js.prefixes[i]+"."+k] = v + fields[js.prefixes[i]+js.delimiter+k] = v } } } @@ -558,11 +563,11 @@ func (js *joinset) JoinIntoBatch() (models.Batch, bool) { switch js.fill { case influxql.NullFill: for _, k := range fieldNames { - fields[js.prefixes[i]+"."+k] = nil + fields[js.prefixes[i]+js.delimiter+k] = nil } case influxql.NumberFill: for _, k := range fieldNames { - fields[js.prefixes[i]+"."+k] = js.fillValue + fields[js.prefixes[i]+js.delimiter+k] = js.fillValue } default: // inner join no valid point possible @@ -570,7 +575,7 @@ func (js *joinset) JoinIntoBatch() (models.Batch, bool) { } } else { for k, v := range bp.Fields { - fields[js.prefixes[i]+"."+k] = v + fields[js.prefixes[i]+js.delimiter+k] = v } } } diff --git a/pipeline/join.go b/pipeline/join.go index 6d5906652..5d827aef0 100644 --- a/pipeline/join.go +++ b/pipeline/join.go @@ -6,6 +6,10 @@ import ( "time" ) +const ( + defaultJoinDelimiter = "." +) + // Joins the data from any number of nodes. // As each data point is received from a parent node it is paired // with the next data points from the other parent nodes with a @@ -59,6 +63,10 @@ type JoinNode struct { // tick:ignore Dimensions []string `tick:"On"` + // The delimiter for the field name prefixes. + // Can be the empty string. + Delimiter string + // The name of this new joined data stream. // If empty the name of the left parent is used. StreamName string @@ -82,6 +90,7 @@ type JoinNode struct { func newJoinNode(e EdgeType, parents []Node) *JoinNode { j := &JoinNode{ chainnode: newBasicChainNode("join", e, e), + Delimiter: defaultJoinDelimiter, } for _, n := range parents { n.linkChild(j) @@ -145,8 +154,8 @@ func (j *JoinNode) validate() error { if len(name) == 0 { return fmt.Errorf("must provide a prefix name for the join node, see .as() property method") } - if strings.ContainsRune(name, '.') { - return fmt.Errorf("cannot use name %s as field prefix, it contains a '.' character", name) + if j.Delimiter != "" && strings.Contains(name, j.Delimiter) { + return fmt.Errorf("cannot use name %s as field prefix, it contains the delimiter %q ", name, j.Delimiter) } } names := make(map[string]bool, len(j.Names))