From c02e53a59049d9e444a33330cc8e1b650bb961a8 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Mon, 16 May 2016 15:17:22 -0600 Subject: [PATCH] require all holt_winters use aggregate function and group by --- influxql/ast.go | 21 +++---- influxql/functions.go | 18 ++++-- influxql/parser_test.go | 67 +++++++++++----------- influxql/select.go | 21 +++---- influxql/select_test.go | 120 ---------------------------------------- 5 files changed, 62 insertions(+), 185 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index 5f9638596a4..910682208ed 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1605,13 +1605,8 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { return err } case "holt_winters", "holt_winters_with_fit": - if min, max, got := 3, 4, len(expr.Args); got > max || got < min { - return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got) - } - for i, arg := range expr.Args[1:3] { - if _, ok := arg.(*IntegerLiteral); !ok { - return fmt.Errorf("expected integer argument as %dth arg in %s()", i+1, expr.Name) - } + if exp, got := 3, len(expr.Args); got != exp { + return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got) } // Validate that if they have grouping by time, they need a sub-call like min/max, etc. groupByInterval, err := s.GroupByInterval() @@ -1621,14 +1616,12 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { if _, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 { return fmt.Errorf("%s aggregate requires a GROUP BY interval", expr.Name) - } else if ok && len(expr.Args) == 4 && groupByInterval > 0 { - return fmt.Errorf("must not pass interval to %s as 4th arg when performing group by on aggregate", expr.Name) - } else if !ok && len(expr.Args) != 4 { - return fmt.Errorf("must pass interval to %s as 4th arg when not performing group by on aggregate", expr.Name) + } else if !ok { + return fmt.Errorf("must use aggregate function with %s", expr.Name) } - if len(expr.Args) == 4 { - if _, ok := expr.Args[3].(*DurationLiteral); !ok { - return fmt.Errorf("expected duration argument as 4th arg in %s()", expr.Name) + for i, arg := range expr.Args[1:3] { + if _, ok := arg.(*IntegerLiteral); !ok { + return fmt.Errorf("expected integer argument as %dth arg in %s", i+1, expr.Name) } } default: diff --git a/influxql/functions.go b/influxql/functions.go index 052af1ee6e5..9c3571863a4 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -390,20 +390,28 @@ func (r *FloatHoltWintersReducer) AggregateInteger(p *IntegerPoint) { r.aggregate(p.Time, float64(p.Value)) } +func (r *FloatHoltWintersReducer) roundTime(t int64) int64 { + return r.interval * ((t + r.interval/2) / r.interval) +} + func (r *FloatHoltWintersReducer) Emit() []FloatPoint { if l := len(r.points); l < 2 || r.seasonal && l < r.m { return nil } // First fill in r.y with values and NaNs for missing values - start, stop := r.points[0].Time, r.points[len(r.points)-1].Time + start, stop := r.roundTime(r.points[0].Time), r.roundTime(r.points[len(r.points)-1].Time) count := (stop - start) / r.interval r.y = make([]float64, 1, count) r.y[0] = r.points[0].Value - t := r.points[0].Time + t := r.roundTime(r.points[0].Time) for _, p := range r.points[1:] { + rounded := r.roundTime(p.Time) + if rounded <= t { + continue + } t += r.interval // Add any missing values before the next point - for p.Time != t { + for rounded != t { // Add in a NaN so we can skip it later. r.y = append(r.y, math.NaN()) t += r.interval @@ -482,7 +490,6 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint { forecasted := r.forecast(r.h, params) var points []FloatPoint if r.includeAllData { - start := r.points[0].Time points = make([]FloatPoint, len(forecasted)) for i, v := range forecasted { t := start + r.interval*(int64(i)) @@ -492,11 +499,10 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint { } } } else { - last := r.points[len(r.points)-1].Time points = make([]FloatPoint, r.h) forecasted := r.forecast(r.h, params) for i, v := range forecasted[len(r.y):] { - t := last + r.interval*(int64(i)+1) + t := stop + r.interval*(int64(i)+1) points[i] = FloatPoint{ Value: v, Time: t, diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 4360b4c677a..87835cc4cb5 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -330,52 +330,59 @@ func TestParser_ParseStatement(t *testing.T) { }, // holt_winters { - s: `SELECT holt_winters(field1, 3, 1, 1h) FROM myseries;`, + s: fmt.Sprintf(`SELECT holt_winters(first(field1), 3, 1) FROM myseries WHERE time > '%s' GROUP BY time(1h);`, now.UTC().Format(time.RFC3339Nano)), stmt: &influxql.SelectStatement{ IsRawQuery: false, Fields: []*influxql.Field{ {Expr: &influxql.Call{ Name: "holt_winters", Args: []influxql.Expr{ - &influxql.VarRef{Val: "field1"}, &influxql.IntegerLiteral{Val: 3}, + &influxql.Call{ + Name: "first", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "field1"}, + }, + }, + &influxql.IntegerLiteral{Val: 3}, &influxql.IntegerLiteral{Val: 1}, - &influxql.DurationLiteral{Val: 1 * time.Hour}, }, }}, }, + Dimensions: []*influxql.Dimension{ + { + Expr: &influxql.Call{ + Name: "time", + Args: []influxql.Expr{ + &influxql.DurationLiteral{Val: 1 * time.Hour}, + }, + }, + }, + }, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, + Condition: &influxql.BinaryExpr{ + Op: influxql.GT, + LHS: &influxql.VarRef{Val: "time"}, + RHS: &influxql.TimeLiteral{Val: now.UTC()}, + }, }, }, { - s: `SELECT holt_winters_with_fit(field1, 3, 1, 1h) FROM myseries;`, + s: fmt.Sprintf(`SELECT holt_winters_with_fit(first(field1), 3, 1) FROM myseries WHERE time > '%s' GROUP BY time(1h);`, now.UTC().Format(time.RFC3339Nano)), stmt: &influxql.SelectStatement{ IsRawQuery: false, Fields: []*influxql.Field{ {Expr: &influxql.Call{ Name: "holt_winters_with_fit", Args: []influxql.Expr{ - &influxql.VarRef{Val: "field1"}, - &influxql.IntegerLiteral{Val: 3}, - &influxql.IntegerLiteral{Val: 1}, - &influxql.DurationLiteral{Val: 1 * time.Hour}, - }}}, - }, - Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, - }, - }, - { - s: fmt.Sprintf(`SELECT holt_winters(field1, 3, 1, 1h) FROM myseries WHERE time < '%s' GROUP BY time(10d);`, now.UTC().Format(time.RFC3339Nano)), - stmt: &influxql.SelectStatement{ - IsRawQuery: false, - Fields: []*influxql.Field{ - {Expr: &influxql.Call{ - Name: "holt_winters", - Args: []influxql.Expr{ - &influxql.VarRef{Val: "field1"}, + &influxql.Call{ + Name: "first", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "field1"}, + }, + }, &influxql.IntegerLiteral{Val: 3}, &influxql.IntegerLiteral{Val: 1}, - &influxql.DurationLiteral{Val: 1 * time.Hour}, }}}, }, Dimensions: []*influxql.Dimension{ @@ -383,20 +390,19 @@ func TestParser_ParseStatement(t *testing.T) { Expr: &influxql.Call{ Name: "time", Args: []influxql.Expr{ - &influxql.DurationLiteral{Val: 10 * 24 * time.Hour}, + &influxql.DurationLiteral{Val: 1 * time.Hour}, }, }, }, }, Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, Condition: &influxql.BinaryExpr{ - Op: influxql.LT, + Op: influxql.GT, LHS: &influxql.VarRef{Val: "time"}, RHS: &influxql.TimeLiteral{Val: now.UTC()}, }, }, }, - { s: fmt.Sprintf(`SELECT holt_winters(max(field1), 4, 5) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)), stmt: &influxql.SelectStatement{ @@ -438,13 +444,13 @@ func TestParser_ParseStatement(t *testing.T) { }, { - s: fmt.Sprintf(`SELECT holt_winters(max(field1), 4, 5) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)), + s: fmt.Sprintf(`SELECT holt_winters_with_fit(max(field1), 4, 5) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)), stmt: &influxql.SelectStatement{ IsRawQuery: false, Fields: []*influxql.Field{ { Expr: &influxql.Call{ - Name: "holt_winters", + Name: "holt_winters_with_fit", Args: []influxql.Expr{ &influxql.Call{ Name: "max", @@ -2166,10 +2172,9 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT moving_average(max(), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`}, {s: `SELECT moving_average(percentile(value), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`}, {s: `SELECT moving_average(mean(value), 2) FROM myseries where time < now() and time > now() - 1d`, err: `moving_average aggregate requires a GROUP BY interval`}, - {s: `SELECT holt_winters(value) FROM myseries where time < now() and time > now() - 1d`, err: `invalid number of arguments for holt_winters, expected at least 3 but no more than 4, got 1`}, - {s: `SELECT holt_winters(value, 10, 2) FROM myseries where time < now() and time > now() - 1d`, err: `must pass interval to holt_winters as 4th arg when not performing group by on aggregate`}, + {s: `SELECT holt_winters(value) FROM myseries where time < now() and time > now() - 1d`, err: `invalid number of arguments for holt_winters, expected 3, got 1`}, + {s: `SELECT holt_winters(value, 10, 2) FROM myseries where time < now() and time > now() - 1d`, err: `must use aggregate function with holt_winters`}, {s: `SELECT holt_winters(min(value), 10, 2) FROM myseries where time < now() and time > now() - 1d`, err: `holt_winters aggregate requires a GROUP BY interval`}, - {s: `SELECT holt_winters(min(value), 10, 2, 1h) FROM myseries where time < now() and time > now() - 1d GROUP BY time(1h)`, err: `must not pass interval to holt_winters as 4th arg when performing group by on aggregate`}, {s: `SELECT field1 from myseries WHERE host =~ 'asd' LIMIT 1`, err: `found asd, expected regex at line 1, char 42`}, {s: `SELECT value > 2 FROM cpu`, err: `invalid operator > in SELECT clause at line 1, char 8; operator is intended for WHERE clause`}, {s: `SELECT value = 2 FROM cpu`, err: `invalid operator = in SELECT clause at line 1, char 8; operator is intended for WHERE clause`}, diff --git a/influxql/select.go b/influxql/select.go index 4f7ae519285..400b9b62339 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -250,21 +250,14 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec } h := expr.Args[1].(*IntegerLiteral) m := expr.Args[2].(*IntegerLiteral) + includeAllData := "holt_winters_with_fit" == expr.Name - var interval time.Duration - // Use the interval on the elapsed() call, if specified. - if len(expr.Args) == 4 { - interval = expr.Args[3].(*DurationLiteral).Val - } else if len(expr.Args) == 3 { - interval = opt.Interval.Duration - // Redifine interval to be unbounded to capture all aggregate results - opt.StartTime = MinTime - opt.EndTime = MaxTime - opt.Interval = Interval{} - } - if interval == 0 { - return nil, fmt.Errorf("must specify interval as 4th arg to %s if not grouping by aggregate function.", expr.Name) - } + + interval := opt.Interval.Duration + // Redifine interval to be unbounded to capture all aggregate results + opt.StartTime = MinTime + opt.EndTime = MaxTime + opt.Interval = Interval{} return newHoltWintersIterator(input, opt, int(h.Val), int(m.Val), includeAllData, interval) case "derivative", "non_negative_derivative", "difference", "moving_average", "elapsed": diff --git a/influxql/select_test.go b/influxql/select_test.go index 627244e9fc0..f2ec8633462 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -2273,126 +2273,6 @@ func TestSelect_MovingAverage_Integer(t *testing.T) { } } -func TestSelect_HoltWinters_Float(t *testing.T) { - var ic IteratorCreator - ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { - return &FloatIterator{Points: []influxql.FloatPoint{ - {Name: "cpu", Time: 0 * Second, Value: 5}, - {Name: "cpu", Time: 4 * Second, Value: 10}, - {Name: "cpu", Time: 8 * Second, Value: 6}, - {Name: "cpu", Time: 12 * Second, Value: 11}, - }}, nil - } - - // Execute selection. - itrs, err := influxql.Select(MustParseSelectStatement(`SELECT holt_winters(value, 2, 2, 4s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) - if err != nil { - t.Fatal(err) - } else if a, err := Iterators(itrs).ReadAll(); err != nil { - t.Fatalf("unexpected error: %s", err) - } else if !deep.Equal(a, [][]influxql.Point{ - {&influxql.FloatPoint{Name: "cpu", Time: 16 * Second, Value: 6.65114462700177}}, - {&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: 10.98709300119875}}, - }) { - t.Fatalf("unexpected points: %s", spew.Sdump(a)) - } -} - -func TestSelect_HoltWinters_Integer(t *testing.T) { - var ic IteratorCreator - ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { - return &IntegerIterator{Points: []influxql.IntegerPoint{ - {Name: "cpu", Time: 0 * Second, Value: 5}, - {Name: "cpu", Time: 4 * Second, Value: 10}, - {Name: "cpu", Time: 8 * Second, Value: 6}, - {Name: "cpu", Time: 12 * Second, Value: 11}, - }}, nil - } - - // Execute selection. - itrs, err := influxql.Select(MustParseSelectStatement(`SELECT holt_winters_with_fit(value, 2, 2, 4s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) - if err != nil { - t.Fatal(err) - } else if a, err := Iterators(itrs).ReadAll(); err != nil { - t.Fatalf("unexpected error: %s", err) - } else if !deep.Equal(a, [][]influxql.Point{ - {&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 5}}, - {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: 10.0003982704228}}, - {&influxql.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 6.000402304344481}}, - {&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 10.988537475385646}}, - {&influxql.FloatPoint{Name: "cpu", Time: 16 * Second, Value: 6.65114462700177}}, - {&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: 10.98709300119875}}, - }) { - t.Fatalf("unexpected points: %s", spew.Sdump(a)) - } -} - -func TestSelect_HoltWinters_Float_GroupBy(t *testing.T) { - var ic IteratorCreator - ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { - return &FloatIterator{Points: []influxql.FloatPoint{ - {Name: "cpu", Time: 0 * Second, Value: 5}, - {Name: "cpu", Time: 2 * Second, Value: 10}, - {Name: "cpu", Time: 4 * Second, Value: 6}, - {Name: "cpu", Time: 6 * Second, Value: 11}, - {Name: "cpu", Time: 8 * Second, Value: 7}, - {Name: "cpu", Time: 10 * Second, Value: 12}, - {Name: "cpu", Time: 12 * Second, Value: 8}, - {Name: "cpu", Time: 14 * Second, Value: 13}, - {Name: "cpu", Time: 16 * Second, Value: 9}, - {Name: "cpu", Time: 18 * Second, Value: 14}, - }}, nil - } - - // Execute selection. - itrs, err := influxql.Select(MustParseSelectStatement(`SELECT holt_winters(value, 2, 2, 2s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:20Z' GROUP BY time(10s)`), &ic, nil) - if err != nil { - t.Fatal(err) - } else if a, err := Iterators(itrs).ReadAll(); err != nil { - t.Fatalf("unexpected error: %s", err) - } else if !deep.Equal(a, [][]influxql.Point{ - {&influxql.FloatPoint{Name: "cpu", Time: 10 * Second, Value: 11.136685559138241}}, - {&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 7.507280682335967}}, - {&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: 9.988834832405697}}, - {&influxql.FloatPoint{Name: "cpu", Time: 22 * Second, Value: 14.886383946380114}}, - }) { - t.Fatalf("unexpected points: %s", spew.Sdump(a)) - } -} - -func TestSelect_HoltWinters_Integer_GroupBy(t *testing.T) { - var ic IteratorCreator - ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { - return &IntegerIterator{Points: []influxql.IntegerPoint{ - {Name: "cpu", Time: 0 * Second, Value: 5}, - {Name: "cpu", Time: 2 * Second, Value: 10}, - {Name: "cpu", Time: 4 * Second, Value: 6}, - {Name: "cpu", Time: 6 * Second, Value: 11}, - {Name: "cpu", Time: 8 * Second, Value: 7}, - {Name: "cpu", Time: 10 * Second, Value: 12}, - {Name: "cpu", Time: 12 * Second, Value: 8}, - {Name: "cpu", Time: 14 * Second, Value: 13}, - {Name: "cpu", Time: 16 * Second, Value: 9}, - {Name: "cpu", Time: 18 * Second, Value: 14}, - }}, nil - } - - // Execute selection. - itrs, err := influxql.Select(MustParseSelectStatement(`SELECT holt_winters(value, 2, 2, 2s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:20Z' GROUP BY time(10s)`), &ic, nil) - if err != nil { - t.Fatal(err) - } else if a, err := Iterators(itrs).ReadAll(); err != nil { - t.Fatalf("unexpected error: %s", err) - } else if !deep.Equal(a, [][]influxql.Point{ - {&influxql.FloatPoint{Name: "cpu", Time: 10 * Second, Value: 11.136685559138241}}, - {&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 7.507280682335967}}, - {&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: 9.988834832405697}}, - {&influxql.FloatPoint{Name: "cpu", Time: 22 * Second, Value: 14.886383946380114}}, - }) { - t.Fatalf("unexpected points: %s", spew.Sdump(a)) - } -} - func TestSelect_HoltWinters_GroupBy_Agg(t *testing.T) { var ic IteratorCreator ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {