Skip to content

Commit

Permalink
make fill previous for count() queries work
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Dec 7, 2015
1 parent 06bc25d commit 7ffbbc1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- [#4872](https://github.com/influxdb/influxdb/pull/4872): Add option to disable logging for meta service.

### Bugfixes
- [#4849](https://github.com/influxdb/influxdb/issues/4849): Derivative works with count, mean, median, sum, first, last, max, min, and percentile.
- [#4984](https://github.com/influxdb/influxdb/pull/4984): Allow math on fields, fixes regression. Thanks @mengjinglei
- [#4666](https://github.com/influxdb/influxdb/issues/4666): Fix panic in derivative with invalid values.
- [#4404](https://github.com/influxdb/influxdb/issues/4404): Return better error for currently unsupported DELETE queries.
Expand Down
5 changes: 5 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,11 @@ cpu value=20 1278010021000000000
command: `SELECT derivative(count(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate derivative of count of distinct with unit default (4s) group by time with fill previous",
command: `SELECT derivative(count(distinct(value))) from db0.rp0.position where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:07' group by time(4s) fill(previous)`,
exp: `{"results":[{"error":"aggregate call didn't contain a field derivative(count(distinct(value)))"}]}`,
},
&Query{
name: "calculate derivative of mean with unit default (2s) group by time with fill 0",
command: `SELECT derivative(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
Expand Down
28 changes: 28 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,34 @@ func (s *SelectStatement) IsSimpleDerivative() bool {
return false
}

// HasSimpleCount return true if one of the function calls is a count function with a
// variable ref as the first arg
func (s *SelectStatement) HasSimpleCount() bool {
// recursively check for a simple count(varref) function
var hasCount func(f *Call) bool
hasCount = func(f *Call) bool {
if strings.HasSuffix(f.Name, "count") {
// it's nested if the first argument is an aggregate function
if _, ok := f.Args[0].(*VarRef); ok {
return true
}
} else {
for _, arg := range f.Args {
if child, ok := arg.(*Call); ok {
return hasCount(child)
}
}
}
return false
}
for _, f := range s.FunctionCalls() {
if hasCount(f) {
return true
}
}
return false
}

// TimeAscending returns true if the time field is sorted in chronological order.
func (s *SelectStatement) TimeAscending() bool {
return len(s.SortFields) == 0 || s.SortFields[0].Ascending
Expand Down
17 changes: 15 additions & 2 deletions tsdb/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,12 @@ func (e *AggregateExecutor) processFill(results [][]interface{}) [][]interface{}
return newResults
}

isCount := e.stmt.HasSimpleCount()
// They're either filling with previous values or a specific number
for i, vals := range results {
// start at 1 because the first value is always time
for j := 1; j < len(vals); j++ {
if vals[j] == nil {
if vals[j] == nil || (isCount && isZero(vals[j])) {
switch e.stmt.Fill {
case influxql.PreviousFill:
if i != 0 {
Expand All @@ -361,6 +362,18 @@ func (e *AggregateExecutor) processFill(results [][]interface{}) [][]interface{}
return results
}

// Returns true if the given interface is a zero valued int64 or float64.
func isZero(i interface{}) bool {
switch v := i.(type) {
case int64:
return v == 0
case float64:
return v == 0
default:
return false
}
}

// processDerivative returns the derivatives of the results
func (e *AggregateExecutor) processDerivative(results [][]interface{}) [][]interface{} {
// Return early if we're not supposed to process the derivatives
Expand Down Expand Up @@ -694,7 +707,7 @@ func (m *AggregateMapper) initializeMapFunctions() error {
}
m.mapFuncs[i] = mfn

// Check for calls like `derivative(lmean(value), 1d)`
// Check for calls like `derivative(mean(value), 1d)`
var nested *influxql.Call = c
if fn, ok := c.Args[0].(*influxql.Call); ok {
nested = fn
Expand Down

0 comments on commit 7ffbbc1

Please sign in to comment.