diff --git a/influxql/ast.go b/influxql/ast.go index 337ebbc444a..ff2f78750f3 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -642,7 +642,9 @@ type SelectStatement struct { FillValue interface{} } -func (s *SelectStatement) IsNonNestedDerivative() bool { +// HasDerivative returns true if one of the field in the statement is a +// derivative aggregate +func (s *SelectStatement) HasDerivative() bool { for _, f := range s.Fields { if f.Name() == "derivative" { return true @@ -651,6 +653,25 @@ func (s *SelectStatement) IsNonNestedDerivative() bool { return false } +// IsSimpleDerivative return true if a field is a derivative function with a +// variable ref as the first arg +func (s *SelectStatement) IsSimpleDerivative() bool { + for _, f := range s.Fields { + if f.Name() == "derivative" { + // cast to derivative call + if d, ok := f.Expr.(*Call); ok { + + // it's nested if the first argument is an aggregate function + if _, ok := d.Args[0].(*VarRef); ok { + return true + } + } + return false + } + } + return false +} + // Clone returns a deep copy of the statement. func (s *SelectStatement) Clone() *SelectStatement { clone := &SelectStatement{ @@ -893,7 +914,7 @@ func (s *SelectStatement) Validate(tr targetRequirement) error { } func (s *SelectStatement) validateDerivative() error { - if !s.IsNonNestedDerivative() { + if !s.HasDerivative() { return nil } diff --git a/influxql/engine.go b/influxql/engine.go index 0764cd6d47e..caf18ee282b 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -76,8 +76,8 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { } defer m.Close() - // if it's a raw query we handle processing differently - if m.stmt.IsRawQuery { + // if it's a raw query or a non-nested derivative we handle processing differently + if m.stmt.IsRawQuery || m.stmt.IsSimpleDerivative() { m.processRawQuery(out, filterEmptyResults) return } @@ -196,6 +196,9 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { // handle any fill options resultValues = m.processFill(resultValues) + // process derivatives + resultValues = m.processDerivative(resultValues) + row := &Row{ Name: m.MeasurementName, Tags: m.TagSet.Tags, @@ -228,6 +231,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { valuesOffset := 0 valuesToReturn := make([]*rawQueryMapOutput, 0) + var lastValueFromPreviousChunk *rawQueryMapOutput // loop until we've emptied out all the mappers and sent everything out for { // collect up to the limit for each mapper @@ -324,6 +328,10 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { // hit the chunk size? Send out what has been accumulated, but keep // processing. if len(valuesToReturn) >= m.chunkSize { + lastValueFromPreviousChunk = valuesToReturn[len(valuesToReturn)-1] + + valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn) + row := m.processRawResults(valuesToReturn) // perform post-processing, such as math. row.Values = m.processResults(row.Values) @@ -342,6 +350,8 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { out <- m.processRawResults(nil) } } else { + valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn) + row := m.processRawResults(valuesToReturn) // perform post-processing, such as math. row.Values = m.processResults(row.Values) @@ -349,6 +359,106 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { } } +// derivativeInterval returns the time interval for the one (and only) derivative func +func (m *MapReduceJob) derivativeInterval() time.Duration { + return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val +} + +func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *rawQueryMapOutput, valuesToReturn []*rawQueryMapOutput) []*rawQueryMapOutput { + // If we're called and do not have a derivative aggregate function, then return what was passed in + if !m.stmt.HasDerivative() { + return valuesToReturn + } + + if len(valuesToReturn) == 0 { + return valuesToReturn + } + + // If we only have 1 value, then the value did not change, so return + // a single row w/ 0.0 + if len(valuesToReturn) == 1 { + return []*rawQueryMapOutput{ + &rawQueryMapOutput{ + Time: valuesToReturn[0].Time, + Values: 0.0, + }, + } + } + + if lastValueFromPreviousChunk == nil { + lastValueFromPreviousChunk = valuesToReturn[0] + } + + // The duration to normalize the derivative by. This is so the derivative values + // can be expressed as "per second", etc.. within each time segment + interval := m.derivativeInterval() + + derivativeValues := make([]*rawQueryMapOutput, len(valuesToReturn)-1) + for i := 1; i < len(valuesToReturn); i++ { + v := valuesToReturn[i] + + // Calculate the derivate of successive points by dividing the difference + // of each value by the elapsed time normalized to the interval + diff := v.Values.(float64) - lastValueFromPreviousChunk.Values.(float64) + elapsed := v.Time - lastValueFromPreviousChunk.Time + + derivativeValues[i-1] = &rawQueryMapOutput{ + Time: v.Time, + Values: diff / (float64(elapsed) / float64(interval)), + } + lastValueFromPreviousChunk = v + } + + return derivativeValues +} + +// processDerivative returns the derivatives of the results +func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{} { + + // Return early if we're not supposed to process the derivatives + if !m.stmt.HasDerivative() { + return results + } + + // Return early if we can't calculate derivatives + if len(results) == 0 { + return results + } + + // If we only have 1 value, then the value did not change, so return + // a single row w/ 0.0 + if len(results) == 1 { + return [][]interface{}{ + []interface{}{results[0][0], 0.0}, + } + } + + // Otherwise calculate the derivatives as the difference between consequtive + // points divided by the elapsed time. Then normalize to the requested + // interval. + derivatives := make([][]interface{}, len(results)-1) + for i := 1; i < len(results); i++ { + prev := results[i-1] + cur := results[i] + + if cur[1] == nil || prev[1] == nil { + derivatives[i-1] = cur + continue + } + + elapsed := cur[0].(time.Time).Sub(prev[0].(time.Time)) + diff := cur[1].(float64) - prev[1].(float64) + + val := []interface{}{ + cur[0], + float64(diff) / (float64(elapsed) / float64(m.derivativeInterval())), + } + derivatives[i-1] = val + } + + return derivatives +} + // processsResults will apply any math that was specified in the select statement against the passed in results func (m *MapReduceJob) processResults(results [][]interface{}) [][]interface{} { hasMath := false diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 0bd93fd0609..62add5f9899 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -106,6 +106,18 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // SELECT statement + { + s: `SELECT derivative(field1, 1h) FROM myseries;`, + stmt: &influxql.SelectStatement{ + IsRawQuery: false, + Fields: []*influxql.Field{ + {Expr: &influxql.Call{Name: "derivative", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.DurationLiteral{Val: time.Hour}}}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, + }, + }, + // SELECT statement (lowercase) { s: `select my_field from myseries`, diff --git a/tx.go b/tx.go index c087558e1c6..65b08dbcfb0 100644 --- a/tx.go +++ b/tx.go @@ -93,13 +93,20 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri } } - // If a numerical aggregate is requested, ensure it is only performed on numeric data. + // If a numerical aggregate is requested, ensure it is only performed on numeric data or on a + // nested aggregate on numeric data. for _, a := range stmt.FunctionCalls() { - lit, ok := a.Args[0].(*influxql.VarRef) + // Check for fields like `derivative(mean(value), 1d)` + var nested *influxql.Call = a + if fn, ok := nested.Args[0].(*influxql.Call); ok { + nested = fn + } + + lit, ok := nested.Args[0].(*influxql.VarRef) if !ok { return nil, fmt.Errorf("aggregate call didn't contain a field %s", a.String()) } - if influxql.IsNumeric(a) { + if influxql.IsNumeric(nested) { f := m.FieldByName(lit.Val) if f.Type != influxql.Float && f.Type != influxql.Integer { return nil, fmt.Errorf("aggregate '%s' requires numerical field values. Field '%s' is of type %s", @@ -348,6 +355,7 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) l.limit = math.MaxUint64 } } else { + // Check for calls like `derivative(mean(value), 1d)` var nested *influxql.Call = c if fn, ok := c.Args[0].(*influxql.Call); ok { nested = fn