Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "integral" function to InfluxQL #8194

Merged
merged 2 commits into from
Mar 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [#7821](https://github.com/influxdata/influxdb/issues/7821): Expose some configuration settings via SHOW DIAGNOSTICS
- [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL.
- [#6541](https://github.com/influxdata/influxdb/issues/6541): Support timezone offsets for queries.
- [#8194](https://github.com/influxdata/influxdb/pull/8194): Add "integral" function to InfluxQL.

### Bugfixes

Expand Down
16 changes: 15 additions & 1 deletion influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,20 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
if err := s.validSampleAggr(expr); err != nil {
return err
}
case "integral":
if err := s.validSelectWithAggregate(); err != nil {
return err
}
if min, max, got := 1, 2, len(expr.Args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got)
}
// If a duration arg is passed, make sure it's a duration
if len(expr.Args) == 2 {
// Second must be a duration .e.g (1h)
if _, ok := expr.Args[1].(*DurationLiteral); !ok {
return errors.New("second argument must be a duration")
}
}
case "holt_winters", "holt_winters_with_fit":
if exp, got := 3, len(expr.Args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
Expand Down Expand Up @@ -4501,7 +4515,7 @@ func EvalType(expr Expr, sources Sources, typmap TypeMapper) DataType {
return typ
case *Call:
switch expr.Name {
case "mean", "median":
case "mean", "median", "integral":
return Float
case "count":
return Integer
Expand Down
20 changes: 20 additions & 0 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,3 +1289,23 @@ func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator,
return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
}
}

// newIntegralIterator returns an iterator for operating on a integral() call.
func newIntegralIterator(input Iterator, opt IteratorOptions, interval Interval) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatIntegralReducer(interval, opt)
return fn, fn
}
return newFloatStreamFloatIterator(input, createFn, opt), nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
fn := NewIntegerIntegralReducer(interval, opt)
return fn, fn
}
return newIntegerStreamFloatIterator(input, createFn, opt), nil
default:
return nil, fmt.Errorf("unsupported integral iterator type: %T", input)
}
}
213 changes: 213 additions & 0 deletions influxql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,3 +757,216 @@ func (r *FloatHoltWintersReducer) constrain(x []float64) {
x[3] = 0
}
}

// FloatIntegralReducer calculates the time-integral of the aggregated points.
type FloatIntegralReducer struct {
interval Interval
sum float64
prev FloatPoint
window struct {
start int64
end int64
}
ch chan FloatPoint
opt IteratorOptions
}

// NewFloatIntegralReducer creates a new FloatIntegralReducer.
func NewFloatIntegralReducer(interval Interval, opt IteratorOptions) *FloatIntegralReducer {
return &FloatIntegralReducer{
interval: interval,
prev: FloatPoint{Nil: true},
ch: make(chan FloatPoint, 1),
opt: opt,
}
}

// AggregateFloat aggregates a point into the reducer.
func (r *FloatIntegralReducer) AggregateFloat(p *FloatPoint) {
// If this is the first point, just save it
if r.prev.Nil {
r.prev = *p
if !r.opt.Interval.IsZero() {
// Record the end of the time interval.
// We do not care for whether the last number is inclusive or exclusive
// because we treat both the same for the involved math.
if r.opt.Ascending {
r.window.start, r.window.end = r.opt.Window(p.Time)
} else {
r.window.end, r.window.start = r.opt.Window(p.Time)
}
}
return
}

// If this point has the same timestamp as the previous one,
// skip the point. Points sent into this reducer are expected
// to be fed in order.
if r.prev.Time == p.Time {
r.prev = *p
return
} else if !r.opt.Interval.IsZero() && ((r.opt.Ascending && p.Time >= r.window.end) || (!r.opt.Ascending && p.Time <= r.window.end)) {
// If our previous time is not equal to the window, we need to
// interpolate the area at the end of this interval.
if r.prev.Time != r.window.end {
value := linearFloat(r.window.end, r.prev.Time, p.Time, r.prev.Value, p.Value)
elapsed := float64(r.window.end-r.prev.Time) / float64(r.interval.Duration)
r.sum += 0.5 * (value + r.prev.Value) * elapsed

r.prev.Value = value
r.prev.Time = r.window.end
}

// Emit the current point through the channel and then clear it.
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
if r.opt.Ascending {
r.window.start, r.window.end = r.opt.Window(p.Time)
} else {
r.window.end, r.window.start = r.opt.Window(p.Time)
}
r.sum = 0.0
}

// Normal operation: update the sum using the trapezium rule
elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration)
r.sum += 0.5 * (p.Value + r.prev.Value) * elapsed
r.prev = *p
}

// Emit emits the time-integral of the aggregated points as a single point.
// InfluxQL convention dictates that outside a group-by-time clause we return
// a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime
// and a higher level will change it to the start of the time group.
func (r *FloatIntegralReducer) Emit() []FloatPoint {
select {
case pt, ok := <-r.ch:
if !ok {
return nil
}
return []FloatPoint{pt}
default:
return nil
}
}

// Close flushes any in progress points to ensure any remaining points are
// emitted.
func (r *FloatIntegralReducer) Close() error {
// If our last point is at the start time, then discard this point since
// there is no area within this bucket. Otherwise, send off what we
// currently have as the final point.
if !r.prev.Nil && r.prev.Time != r.window.start {
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
}
close(r.ch)
return nil
}

// IntegerIntegralReducer calculates the time-integral of the aggregated points.
type IntegerIntegralReducer struct {
interval Interval
sum float64
prev IntegerPoint
window struct {
start int64
end int64
}
ch chan FloatPoint
opt IteratorOptions
}

// NewIntegerIntegralReducer creates a new IntegerIntegralReducer.
func NewIntegerIntegralReducer(interval Interval, opt IteratorOptions) *IntegerIntegralReducer {
return &IntegerIntegralReducer{
interval: interval,
prev: IntegerPoint{Nil: true},
ch: make(chan FloatPoint, 1),
opt: opt,
}
}

// AggregateInteger aggregates a point into the reducer.
func (r *IntegerIntegralReducer) AggregateInteger(p *IntegerPoint) {
// If this is the first point, just save it
if r.prev.Nil {
r.prev = *p

// Record the end of the time interval.
// We do not care for whether the last number is inclusive or exclusive
// because we treat both the same for the involved math.
if r.opt.Ascending {
r.window.start, r.window.end = r.opt.Window(p.Time)
} else {
r.window.end, r.window.start = r.opt.Window(p.Time)
}

// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if r.window.start == MinTime {
r.window.start = 0
}
return
}

// If this point has the same timestamp as the previous one,
// skip the point. Points sent into this reducer are expected
// to be fed in order.
value := float64(p.Value)
if r.prev.Time == p.Time {
r.prev = *p
return
} else if (r.opt.Ascending && p.Time >= r.window.end) || (!r.opt.Ascending && p.Time <= r.window.end) {
// If our previous time is not equal to the window, we need to
// interpolate the area at the end of this interval.
if r.prev.Time != r.window.end {
value = linearFloat(r.window.end, r.prev.Time, p.Time, float64(r.prev.Value), value)
elapsed := float64(r.window.end-r.prev.Time) / float64(r.interval.Duration)
r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed

r.prev.Time = r.window.end
}

// Emit the current point through the channel and then clear it.
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
if r.opt.Ascending {
r.window.start, r.window.end = r.opt.Window(p.Time)
} else {
r.window.end, r.window.start = r.opt.Window(p.Time)
}
r.sum = 0.0
}

// Normal operation: update the sum using the trapezium rule
elapsed := float64(p.Time-r.prev.Time) / float64(r.interval.Duration)
r.sum += 0.5 * (value + float64(r.prev.Value)) * elapsed
r.prev = *p
}

// Emit emits the time-integral of the aggregated points as a single FLOAT point
// InfluxQL convention dictates that outside a group-by-time clause we return
// a timestamp of zero. Within a group-by-time, we can set the time to ZeroTime
// and a higher level will change it to the start of the time group.
func (r *IntegerIntegralReducer) Emit() []FloatPoint {
select {
case pt, ok := <-r.ch:
if !ok {
return nil
}
return []FloatPoint{pt}
default:
return nil
}
}

// Close flushes any in progress points to ensure any remaining points are
// emitted.
func (r *IntegerIntegralReducer) Close() error {
// If our last point is at the start time, then discard this point since
// there is no area within this bucket. Otherwise, send off what we
// currently have as the final point.
if !r.prev.Nil && r.prev.Time != r.window.start {
r.ch <- FloatPoint{Time: r.window.start, Value: r.sum}
}
close(r.ch)
return nil
}
2 changes: 1 addition & 1 deletion influxql/internal/internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading