From 7ffbbc1072470f6f7c53cd2b07afa6390449d038 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Mon, 7 Dec 2015 10:21:15 -0700 Subject: [PATCH] make fill previous for count() queries work --- CHANGELOG.md | 1 + cmd/influxd/run/server_test.go | 5 +++++ influxql/ast.go | 28 ++++++++++++++++++++++++++++ tsdb/aggregate.go | 17 +++++++++++++++-- 4 files changed, 49 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0acf44b11d1..46822742fd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - [#4872](https://github.com/influxdb/influxdb/pull/4872): Add option to disable logging for meta service. ### Bugfixes +- [#4849](https://github.com/influxdb/influxdb/issues/4849): Derivative works with count, mean, median, sum, first, last, max, min, and percentile. - [#4984](https://github.com/influxdb/influxdb/pull/4984): Allow math on fields, fixes regression. Thanks @mengjinglei - [#4666](https://github.com/influxdb/influxdb/issues/4666): Fix panic in derivative with invalid values. - [#4404](https://github.com/influxdb/influxdb/issues/4404): Return better error for currently unsupported DELETE queries. diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index e0cffc352af..e95cf6c343d 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -1738,6 +1738,11 @@ cpu value=20 1278010021000000000 command: `SELECT derivative(count(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`, }, + &Query{ + name: "calculate derivative of count of distinct with unit default (4s) group by time with fill previous", + command: `SELECT derivative(count(distinct(value))) from db0.rp0.position where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:07' group by time(4s) fill(previous)`, + exp: `{"results":[{"error":"aggregate call didn't contain a field derivative(count(distinct(value)))"}]}`, + }, &Query{ name: "calculate derivative of mean with unit default (2s) group by time with fill 0", command: `SELECT derivative(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, diff --git a/influxql/ast.go b/influxql/ast.go index 0f0a1020330..a7afc59d76a 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -793,6 +793,34 @@ func (s *SelectStatement) IsSimpleDerivative() bool { return false } +// HasSimpleCount return true if one of the function calls is a count function with a +// variable ref as the first arg +func (s *SelectStatement) HasSimpleCount() bool { + // recursively check for a simple count(varref) function + var hasCount func(f *Call) bool + hasCount = func(f *Call) bool { + if strings.HasSuffix(f.Name, "count") { + // it's nested if the first argument is an aggregate function + if _, ok := f.Args[0].(*VarRef); ok { + return true + } + } else { + for _, arg := range f.Args { + if child, ok := arg.(*Call); ok { + return hasCount(child) + } + } + } + return false + } + for _, f := range s.FunctionCalls() { + if hasCount(f) { + return true + } + } + return false +} + // TimeAscending returns true if the time field is sorted in chronological order. func (s *SelectStatement) TimeAscending() bool { return len(s.SortFields) == 0 || s.SortFields[0].Ascending diff --git a/tsdb/aggregate.go b/tsdb/aggregate.go index f6b67a1e697..1dd2a40f6c4 100644 --- a/tsdb/aggregate.go +++ b/tsdb/aggregate.go @@ -342,11 +342,12 @@ func (e *AggregateExecutor) processFill(results [][]interface{}) [][]interface{} return newResults } + isCount := e.stmt.HasSimpleCount() // They're either filling with previous values or a specific number for i, vals := range results { // start at 1 because the first value is always time for j := 1; j < len(vals); j++ { - if vals[j] == nil { + if vals[j] == nil || (isCount && isZero(vals[j])) { switch e.stmt.Fill { case influxql.PreviousFill: if i != 0 { @@ -361,6 +362,18 @@ func (e *AggregateExecutor) processFill(results [][]interface{}) [][]interface{} return results } +// Returns true if the given interface is a zero valued int64 or float64. +func isZero(i interface{}) bool { + switch v := i.(type) { + case int64: + return v == 0 + case float64: + return v == 0 + default: + return false + } +} + // processDerivative returns the derivatives of the results func (e *AggregateExecutor) processDerivative(results [][]interface{}) [][]interface{} { // Return early if we're not supposed to process the derivatives @@ -694,7 +707,7 @@ func (m *AggregateMapper) initializeMapFunctions() error { } m.mapFuncs[i] = mfn - // Check for calls like `derivative(lmean(value), 1d)` + // Check for calls like `derivative(mean(value), 1d)` var nested *influxql.Call = c if fn, ok := c.Args[0].(*influxql.Call); ok { nested = fn