Skip to content

Commit

Permalink
Fix side-effecting WhereNode, fixes #842 (#845)
Browse files Browse the repository at this point in the history
* fix side-effecting WhereNode, fixes #842

* CHANGELOG.md
  • Loading branch information
Nathaniel Cook authored Aug 30, 2016
1 parent 696be4b commit 5784788
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 16 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## v1.0.0-rc3 [unreleased]

### Release Notes

### Features

### Bugfixes

- [#842](https://github.com/influxdata/kapacitor/issues/842): Fix side-effecting modification in batch WhereNode.

## v1.0.0-rc2 [2016-08-29]

### Release Notes
Expand Down
96 changes: 96 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,102 @@ batch

testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er, false)
}

func TestBatch_Where_NoSideEffect(t *testing.T) {

var script = `
var data = batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|where(lambda: "mean" > 85)
// Unused where clause should not side-effect
data
|where(lambda: FALSE)
data
|httpOut('TestBatch_SimpleMR')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu-total"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
91.06416290101595,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
85.9694442394385,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
90.62985736134186,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
86.45443196005628,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
88.97243107764031,
},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu0"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
85.08910891088406,
},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu1"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
96.49999999996908,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
93.46464646468584,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
95.00950095007724,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
92.99999999998636,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
90.99999999998545,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er, false)
}

func TestBatch_CountEmptyBatch(t *testing.T) {
var script = `
batch
Expand Down
38 changes: 38 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,44 @@ stream
testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er, nil, false)
}

func TestStream_Where_NoSideEffect(t *testing.T) {

var script = `
var data = stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.where(lambda: "host" != 'serverB')
|window()
.period(10s)
.every(10s)
|count('value')
|where(lambda: "count" > 0)
// Unused where clause should not side-effect
data
|where(lambda: FALSE)
data
|httpOut('TestStream_SimpleMR')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
10.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er, nil, false)
}

func TestStream_VarWhereString(t *testing.T) {

var script = `
Expand Down
10 changes: 0 additions & 10 deletions pipeline/where.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,3 @@ func newWhereNode(wants EdgeType, predicate *ast.LambdaNode) *WhereNode {
Lambda: predicate,
}
}

// And another expression onto the existing expression.
func (w *WhereNode) Where(lambda *ast.LambdaNode) *WhereNode {
w.Lambda.Expression = &ast.BinaryNode{
Operator: ast.TokenAnd,
Left: w.Lambda.Expression,
Right: lambda.Expression,
}
return w
}
11 changes: 5 additions & 6 deletions where.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,14 @@ func (w *WhereNode) runWhere(snapshot []byte) error {
scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(w.w.Lambda.Expression))
w.scopePools[b.Group] = scopePool
}
for i := 0; i < len(b.Points); {
p := b.Points[i]
if pass, err := EvalPredicate(expr, scopePool, p.Time, p.Fields, p.Tags); !pass {
points := b.Points
b.Points = make([]models.BatchPoint, 0, len(b.Points))
for _, p := range points {
if pass, err := EvalPredicate(expr, scopePool, p.Time, p.Fields, p.Tags); pass {
if err != nil {
w.logger.Println("E! error while evaluating WHERE expression:", err)
}
b.Points = append(b.Points[:i], b.Points[i+1:]...)
} else {
i++
b.Points = append(b.Points, p)
}
}
w.timer.Stop()
Expand Down

0 comments on commit 5784788

Please sign in to comment.