Skip to content

Commit

Permalink
require all holt_winters use aggregate function and group by
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed May 16, 2016
1 parent ca5e475 commit c02e53a
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 185 deletions.
21 changes: 7 additions & 14 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1605,13 +1605,8 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
return err
}
case "holt_winters", "holt_winters_with_fit":
if min, max, got := 3, 4, len(expr.Args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got)
}
for i, arg := range expr.Args[1:3] {
if _, ok := arg.(*IntegerLiteral); !ok {
return fmt.Errorf("expected integer argument as %dth arg in %s()", i+1, expr.Name)
}
if exp, got := 3, len(expr.Args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
}
// Validate that if they have grouping by time, they need a sub-call like min/max, etc.
groupByInterval, err := s.GroupByInterval()
Expand All @@ -1621,14 +1616,12 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {

if _, ok := expr.Args[0].(*Call); ok && groupByInterval == 0 {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", expr.Name)
} else if ok && len(expr.Args) == 4 && groupByInterval > 0 {
return fmt.Errorf("must not pass interval to %s as 4th arg when performing group by on aggregate", expr.Name)
} else if !ok && len(expr.Args) != 4 {
return fmt.Errorf("must pass interval to %s as 4th arg when not performing group by on aggregate", expr.Name)
} else if !ok {
return fmt.Errorf("must use aggregate function with %s", expr.Name)
}
if len(expr.Args) == 4 {
if _, ok := expr.Args[3].(*DurationLiteral); !ok {
return fmt.Errorf("expected duration argument as 4th arg in %s()", expr.Name)
for i, arg := range expr.Args[1:3] {
if _, ok := arg.(*IntegerLiteral); !ok {
return fmt.Errorf("expected integer argument as %dth arg in %s", i+1, expr.Name)
}
}
default:
Expand Down
18 changes: 12 additions & 6 deletions influxql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,20 +390,28 @@ func (r *FloatHoltWintersReducer) AggregateInteger(p *IntegerPoint) {
r.aggregate(p.Time, float64(p.Value))
}

func (r *FloatHoltWintersReducer) roundTime(t int64) int64 {
return r.interval * ((t + r.interval/2) / r.interval)
}

func (r *FloatHoltWintersReducer) Emit() []FloatPoint {
if l := len(r.points); l < 2 || r.seasonal && l < r.m {
return nil
}
// First fill in r.y with values and NaNs for missing values
start, stop := r.points[0].Time, r.points[len(r.points)-1].Time
start, stop := r.roundTime(r.points[0].Time), r.roundTime(r.points[len(r.points)-1].Time)
count := (stop - start) / r.interval
r.y = make([]float64, 1, count)
r.y[0] = r.points[0].Value
t := r.points[0].Time
t := r.roundTime(r.points[0].Time)
for _, p := range r.points[1:] {
rounded := r.roundTime(p.Time)
if rounded <= t {
continue
}
t += r.interval
// Add any missing values before the next point
for p.Time != t {
for rounded != t {
// Add in a NaN so we can skip it later.
r.y = append(r.y, math.NaN())
t += r.interval
Expand Down Expand Up @@ -482,7 +490,6 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint {
forecasted := r.forecast(r.h, params)
var points []FloatPoint
if r.includeAllData {
start := r.points[0].Time
points = make([]FloatPoint, len(forecasted))
for i, v := range forecasted {
t := start + r.interval*(int64(i))
Expand All @@ -492,11 +499,10 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint {
}
}
} else {
last := r.points[len(r.points)-1].Time
points = make([]FloatPoint, r.h)
forecasted := r.forecast(r.h, params)
for i, v := range forecasted[len(r.y):] {
t := last + r.interval*(int64(i)+1)
t := stop + r.interval*(int64(i)+1)
points[i] = FloatPoint{
Value: v,
Time: t,
Expand Down
67 changes: 36 additions & 31 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,73 +330,79 @@ func TestParser_ParseStatement(t *testing.T) {
},
// holt_winters
{
s: `SELECT holt_winters(field1, 3, 1, 1h) FROM myseries;`,
s: fmt.Sprintf(`SELECT holt_winters(first(field1), 3, 1) FROM myseries WHERE time > '%s' GROUP BY time(1h);`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{
Name: "holt_winters",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"}, &influxql.IntegerLiteral{Val: 3},
&influxql.Call{
Name: "first",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"},
},
},
&influxql.IntegerLiteral{Val: 3},
&influxql.IntegerLiteral{Val: 1},
&influxql.DurationLiteral{Val: 1 * time.Hour},
},
}},
},
Dimensions: []*influxql.Dimension{
{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 1 * time.Hour},
},
},
},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
Condition: &influxql.BinaryExpr{
Op: influxql.GT,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.TimeLiteral{Val: now.UTC()},
},
},
},

{
s: `SELECT holt_winters_with_fit(field1, 3, 1, 1h) FROM myseries;`,
s: fmt.Sprintf(`SELECT holt_winters_with_fit(first(field1), 3, 1) FROM myseries WHERE time > '%s' GROUP BY time(1h);`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{
Name: "holt_winters_with_fit",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"},
&influxql.IntegerLiteral{Val: 3},
&influxql.IntegerLiteral{Val: 1},
&influxql.DurationLiteral{Val: 1 * time.Hour},
}}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
},
},
{
s: fmt.Sprintf(`SELECT holt_winters(field1, 3, 1, 1h) FROM myseries WHERE time < '%s' GROUP BY time(10d);`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{
Name: "holt_winters",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"},
&influxql.Call{
Name: "first",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"},
},
},
&influxql.IntegerLiteral{Val: 3},
&influxql.IntegerLiteral{Val: 1},
&influxql.DurationLiteral{Val: 1 * time.Hour},
}}},
},
Dimensions: []*influxql.Dimension{
{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 10 * 24 * time.Hour},
&influxql.DurationLiteral{Val: 1 * time.Hour},
},
},
},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
Condition: &influxql.BinaryExpr{
Op: influxql.LT,
Op: influxql.GT,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.TimeLiteral{Val: now.UTC()},
},
},
},

{
s: fmt.Sprintf(`SELECT holt_winters(max(field1), 4, 5) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
Expand Down Expand Up @@ -438,13 +444,13 @@ func TestParser_ParseStatement(t *testing.T) {
},

{
s: fmt.Sprintf(`SELECT holt_winters(max(field1), 4, 5) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)),
s: fmt.Sprintf(`SELECT holt_winters_with_fit(max(field1), 4, 5) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{
Expr: &influxql.Call{
Name: "holt_winters",
Name: "holt_winters_with_fit",
Args: []influxql.Expr{
&influxql.Call{
Name: "max",
Expand Down Expand Up @@ -2166,10 +2172,9 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT moving_average(max(), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`},
{s: `SELECT moving_average(percentile(value), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`},
{s: `SELECT moving_average(mean(value), 2) FROM myseries where time < now() and time > now() - 1d`, err: `moving_average aggregate requires a GROUP BY interval`},
{s: `SELECT holt_winters(value) FROM myseries where time < now() and time > now() - 1d`, err: `invalid number of arguments for holt_winters, expected at least 3 but no more than 4, got 1`},
{s: `SELECT holt_winters(value, 10, 2) FROM myseries where time < now() and time > now() - 1d`, err: `must pass interval to holt_winters as 4th arg when not performing group by on aggregate`},
{s: `SELECT holt_winters(value) FROM myseries where time < now() and time > now() - 1d`, err: `invalid number of arguments for holt_winters, expected 3, got 1`},
{s: `SELECT holt_winters(value, 10, 2) FROM myseries where time < now() and time > now() - 1d`, err: `must use aggregate function with holt_winters`},
{s: `SELECT holt_winters(min(value), 10, 2) FROM myseries where time < now() and time > now() - 1d`, err: `holt_winters aggregate requires a GROUP BY interval`},
{s: `SELECT holt_winters(min(value), 10, 2, 1h) FROM myseries where time < now() and time > now() - 1d GROUP BY time(1h)`, err: `must not pass interval to holt_winters as 4th arg when performing group by on aggregate`},
{s: `SELECT field1 from myseries WHERE host =~ 'asd' LIMIT 1`, err: `found asd, expected regex at line 1, char 42`},
{s: `SELECT value > 2 FROM cpu`, err: `invalid operator > in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},
{s: `SELECT value = 2 FROM cpu`, err: `invalid operator = in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},
Expand Down
21 changes: 7 additions & 14 deletions influxql/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,14 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec
}
h := expr.Args[1].(*IntegerLiteral)
m := expr.Args[2].(*IntegerLiteral)

includeAllData := "holt_winters_with_fit" == expr.Name
var interval time.Duration
// Use the interval on the elapsed() call, if specified.
if len(expr.Args) == 4 {
interval = expr.Args[3].(*DurationLiteral).Val
} else if len(expr.Args) == 3 {
interval = opt.Interval.Duration
// Redifine interval to be unbounded to capture all aggregate results
opt.StartTime = MinTime
opt.EndTime = MaxTime
opt.Interval = Interval{}
}
if interval == 0 {
return nil, fmt.Errorf("must specify interval as 4th arg to %s if not grouping by aggregate function.", expr.Name)
}

interval := opt.Interval.Duration
// Redifine interval to be unbounded to capture all aggregate results
opt.StartTime = MinTime
opt.EndTime = MaxTime
opt.Interval = Interval{}

return newHoltWintersIterator(input, opt, int(h.Val), int(m.Val), includeAllData, interval)
case "derivative", "non_negative_derivative", "difference", "moving_average", "elapsed":
Expand Down
120 changes: 0 additions & 120 deletions influxql/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2273,126 +2273,6 @@ func TestSelect_MovingAverage_Integer(t *testing.T) {
}
}

func TestSelect_HoltWinters_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 5},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 8 * Second, Value: 6},
{Name: "cpu", Time: 12 * Second, Value: 11},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT holt_winters(value, 2, 2, 4s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 16 * Second, Value: 6.65114462700177}},
{&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: 10.98709300119875}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_HoltWinters_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 5},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 8 * Second, Value: 6},
{Name: "cpu", Time: 12 * Second, Value: 11},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT holt_winters_with_fit(value, 2, 2, 4s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 5}},
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: 10.0003982704228}},
{&influxql.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 6.000402304344481}},
{&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 10.988537475385646}},
{&influxql.FloatPoint{Name: "cpu", Time: 16 * Second, Value: 6.65114462700177}},
{&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: 10.98709300119875}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_HoltWinters_Float_GroupBy(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 5},
{Name: "cpu", Time: 2 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 6},
{Name: "cpu", Time: 6 * Second, Value: 11},
{Name: "cpu", Time: 8 * Second, Value: 7},
{Name: "cpu", Time: 10 * Second, Value: 12},
{Name: "cpu", Time: 12 * Second, Value: 8},
{Name: "cpu", Time: 14 * Second, Value: 13},
{Name: "cpu", Time: 16 * Second, Value: 9},
{Name: "cpu", Time: 18 * Second, Value: 14},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT holt_winters(value, 2, 2, 2s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:20Z' GROUP BY time(10s)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 10 * Second, Value: 11.136685559138241}},
{&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 7.507280682335967}},
{&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: 9.988834832405697}},
{&influxql.FloatPoint{Name: "cpu", Time: 22 * Second, Value: 14.886383946380114}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_HoltWinters_Integer_GroupBy(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 5},
{Name: "cpu", Time: 2 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 6},
{Name: "cpu", Time: 6 * Second, Value: 11},
{Name: "cpu", Time: 8 * Second, Value: 7},
{Name: "cpu", Time: 10 * Second, Value: 12},
{Name: "cpu", Time: 12 * Second, Value: 8},
{Name: "cpu", Time: 14 * Second, Value: 13},
{Name: "cpu", Time: 16 * Second, Value: 9},
{Name: "cpu", Time: 18 * Second, Value: 14},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT holt_winters(value, 2, 2, 2s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:20Z' GROUP BY time(10s)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 10 * Second, Value: 11.136685559138241}},
{&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 7.507280682335967}},
{&influxql.FloatPoint{Name: "cpu", Time: 20 * Second, Value: 9.988834832405697}},
{&influxql.FloatPoint{Name: "cpu", Time: 22 * Second, Value: 14.886383946380114}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_HoltWinters_GroupBy_Agg(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
Expand Down

0 comments on commit c02e53a

Please sign in to comment.