diff --git a/CHANGELOG.md b/CHANGELOG.md index 64e8484ca8c..b05a65638cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio - [#8384](https://github.com/influxdata/influxdb/pull/8384): Write and compaction stability - [#7862](https://github.com/influxdata/influxdb/pull/7861): Add new profile endpoint for gathering all debug profiles and querues in single archive. - [#8390](https://github.com/influxdata/influxdb/issues/8390): Add nanosecond duration literal support. +- [#8394](https://github.com/influxdata/influxdb/pull/8394): Optimize top() and bottom() using an incremental aggregator. ### Bugfixes diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index 8062f7732b0..ded4f7f3e69 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -1,8 +1,6 @@ package influxql import ( - "bytes" - "container/heap" "fmt" "math" "sort" @@ -783,19 +781,17 @@ func IntegerSpreadReduceSlice(a []IntegerPoint) []IntegerPoint { return []IntegerPoint{{Time: ZeroTime, Value: max - min}} } -func newTopIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, tags []int) (Iterator, error) { +func newTopIterator(input Iterator, opt IteratorOptions, n int) (Iterator, error) { switch input := input.(type) { case FloatIterator: - aggregateFn := NewFloatTopReduceSliceFunc(int(n.Val), tags, opt) createFn := func() (FloatPointAggregator, FloatPointEmitter) { - fn := NewFloatSliceFuncReducer(aggregateFn) + fn := NewFloatTopReducer(n) return fn, fn } return newFloatReduceFloatIterator(input, opt, createFn), nil case IntegerIterator: - aggregateFn := NewIntegerTopReduceSliceFunc(int(n.Val), tags, opt) createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { - fn := NewIntegerSliceFuncReducer(aggregateFn) + fn := NewIntegerTopReducer(n) return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil @@ -804,103 +800,17 @@ func newTopIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, tags } } -// NewFloatTopReduceSliceFunc returns the top values within a window. -func NewFloatTopReduceSliceFunc(n int, tags []int, opt IteratorOptions) FloatReduceSliceFunc { - return func(a []FloatPoint) []FloatPoint { - // Filter by tags if they exist. - if len(tags) > 0 { - a = filterFloatByUniqueTags(a, tags, func(cur, p *FloatPoint) bool { - return p.Value > cur.Value || (p.Value == cur.Value && p.Time < cur.Time) - }) - } - - // If we ask for more elements than exist, restrict n to be the length of the array. - size := n - if size > len(a) { - size = len(a) - } - - // Construct a heap preferring higher values and breaking ties - // based on the earliest time for a point. - h := floatPointsSortBy(a, func(a, b *FloatPoint) bool { - if a.Value != b.Value { - return a.Value > b.Value - } - return a.Time < b.Time - }) - heap.Init(h) - - // Pop the first n elements and then sort by time. - points := make([]FloatPoint, 0, size) - for i := 0; i < size; i++ { - p := heap.Pop(h).(FloatPoint) - points = append(points, p) - } - - // Order the points by time if an ordered output was requested. - // Try to keep the original ordering if possible by using a stable sort. - if opt.Ordered { - sort.Stable(floatPointsByTime(points)) - } - return points - } -} - -// NewIntegerTopReduceSliceFunc returns the top values within a window. -func NewIntegerTopReduceSliceFunc(n int, tags []int, opt IteratorOptions) IntegerReduceSliceFunc { - return func(a []IntegerPoint) []IntegerPoint { - // Filter by tags if they exist. - if len(tags) > 0 { - a = filterIntegerByUniqueTags(a, tags, func(cur, p *IntegerPoint) bool { - return p.Value > cur.Value || (p.Value == cur.Value && p.Time < cur.Time) - }) - } - - // If we ask for more elements than exist, restrict n to be the length of the array. - size := n - if size > len(a) { - size = len(a) - } - - // Construct a heap preferring higher values and breaking ties - // based on the earliest time for a point. - h := integerPointsSortBy(a, func(a, b *IntegerPoint) bool { - if a.Value != b.Value { - return a.Value > b.Value - } - return a.Time < b.Time - }) - heap.Init(h) - - // Pop the first n elements and then sort by time. - points := make([]IntegerPoint, 0, size) - for i := 0; i < size; i++ { - p := heap.Pop(h).(IntegerPoint) - points = append(points, p) - } - - // Order the points by time if an ordered output was requested. - // Try to keep the original ordering if possible by using a stable sort. - if opt.Ordered { - sort.Stable(integerPointsByTime(points)) - } - return points - } -} - -func newBottomIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, tags []int) (Iterator, error) { +func newBottomIterator(input Iterator, opt IteratorOptions, n int) (Iterator, error) { switch input := input.(type) { case FloatIterator: - aggregateFn := NewFloatBottomReduceSliceFunc(int(n.Val), tags, opt) createFn := func() (FloatPointAggregator, FloatPointEmitter) { - fn := NewFloatSliceFuncReducer(aggregateFn) + fn := NewFloatBottomReducer(n) return fn, fn } return newFloatReduceFloatIterator(input, opt, createFn), nil case IntegerIterator: - aggregateFn := NewIntegerBottomReduceSliceFunc(int(n.Val), tags, opt) createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { - fn := NewIntegerSliceFuncReducer(aggregateFn) + fn := NewIntegerBottomReducer(n) return fn, fn } return newIntegerReduceIntegerIterator(input, opt, createFn), nil @@ -909,150 +819,6 @@ func newBottomIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, t } } -// NewFloatBottomReduceSliceFunc returns the bottom values within a window. -func NewFloatBottomReduceSliceFunc(n int, tags []int, opt IteratorOptions) FloatReduceSliceFunc { - return func(a []FloatPoint) []FloatPoint { - // Filter by tags if they exist. - if len(tags) > 0 { - a = filterFloatByUniqueTags(a, tags, func(cur, p *FloatPoint) bool { - return p.Value < cur.Value || (p.Value == cur.Value && p.Time < cur.Time) - }) - } - - // If we ask for more elements than exist, restrict n to be the length of the array. - size := n - if size > len(a) { - size = len(a) - } - - // Construct a heap preferring lower values and breaking ties - // based on the earliest time for a point. - h := floatPointsSortBy(a, func(a, b *FloatPoint) bool { - if a.Value != b.Value { - return a.Value < b.Value - } - return a.Time < b.Time - }) - heap.Init(h) - - // Pop the first n elements and then sort by time. - points := make([]FloatPoint, 0, size) - for i := 0; i < size; i++ { - p := heap.Pop(h).(FloatPoint) - points = append(points, p) - } - - // Order the points by time if an ordered output was requested. - // Try to keep the original ordering if possible by using a stable sort. - if opt.Ordered { - sort.Stable(floatPointsByTime(points)) - } - return points - } -} - -// NewIntegerBottomReduceSliceFunc returns the bottom values within a window. -func NewIntegerBottomReduceSliceFunc(n int, tags []int, opt IteratorOptions) IntegerReduceSliceFunc { - return func(a []IntegerPoint) []IntegerPoint { - // Filter by tags if they exist. - if len(tags) > 0 { - a = filterIntegerByUniqueTags(a, tags, func(cur, p *IntegerPoint) bool { - return p.Value < cur.Value || (p.Value == cur.Value && p.Time < cur.Time) - }) - } - - // If we ask for more elements than exist, restrict n to be the length of the array. - size := n - if size > len(a) { - size = len(a) - } - - // Construct a heap preferring lower values and breaking ties - // based on the earliest time for a point. - h := integerPointsSortBy(a, func(a, b *IntegerPoint) bool { - if a.Value != b.Value { - return a.Value < b.Value - } - return a.Time < b.Time - }) - heap.Init(h) - - // Pop the first n elements and then sort by time. - points := make([]IntegerPoint, 0, size) - for i := 0; i < size; i++ { - p := heap.Pop(h).(IntegerPoint) - points = append(points, p) - } - - // Order the points by time if an ordered output was requested. - // Try to keep the original ordering if possible by using a stable sort. - if opt.Ordered { - sort.Stable(integerPointsByTime(points)) - } - return points - } -} - -func filterFloatByUniqueTags(a []FloatPoint, tags []int, cmpFunc func(cur, p *FloatPoint) bool) []FloatPoint { - pointMap := make(map[string]FloatPoint) - for _, p := range a { - keyBuf := bytes.NewBuffer(nil) - for i, index := range tags { - if i > 0 { - keyBuf.WriteString(",") - } - fmt.Fprintf(keyBuf, "%s", p.Aux[index]) - } - key := keyBuf.String() - - cur, ok := pointMap[key] - if ok { - if cmpFunc(&cur, &p) { - pointMap[key] = p - } - } else { - pointMap[key] = p - } - } - - // Recreate the original array with our new filtered list. - points := make([]FloatPoint, 0, len(pointMap)) - for _, p := range pointMap { - points = append(points, p) - } - return points -} - -func filterIntegerByUniqueTags(a []IntegerPoint, tags []int, cmpFunc func(cur, p *IntegerPoint) bool) []IntegerPoint { - pointMap := make(map[string]IntegerPoint) - for _, p := range a { - keyBuf := bytes.NewBuffer(nil) - for i, index := range tags { - if i > 0 { - keyBuf.WriteString(",") - } - fmt.Fprintf(keyBuf, "%s", p.Aux[index]) - } - key := keyBuf.String() - - cur, ok := pointMap[key] - if ok { - if cmpFunc(&cur, &p) { - pointMap[key] = p - } - } else { - pointMap[key] = p - } - } - - // Recreate the original array with our new filtered list. - points := make([]IntegerPoint, 0, len(pointMap)) - for _, p := range pointMap { - points = append(points, p) - } - return points -} - // newPercentileIterator returns an iterator for operating on a percentile() call. func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) (Iterator, error) { switch input := input.(type) { diff --git a/influxql/call_iterator_test.go b/influxql/call_iterator_test.go index 51b49b54539..583c92cdaaf 100644 --- a/influxql/call_iterator_test.go +++ b/influxql/call_iterator_test.go @@ -973,3 +973,11 @@ func (g *FloatPointGenerator) Next() (*influxql.FloatPoint, error) { g.i++ return p, nil } + +func MustCallIterator(input influxql.Iterator, opt influxql.IteratorOptions) influxql.Iterator { + itr, err := influxql.NewCallIterator(input, opt) + if err != nil { + panic(err) + } + return itr +} diff --git a/influxql/functions.go b/influxql/functions.go index 5541c3b238b..4fad6e75b88 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -1,7 +1,9 @@ package influxql import ( + "container/heap" "math" + "sort" "time" "github.com/influxdata/influxdb/influxql/neldermead" @@ -987,3 +989,175 @@ func (r *IntegerIntegralReducer) Close() error { close(r.ch) return nil } + +type FloatTopReducer struct { + h *floatPointsByFunc +} + +func NewFloatTopReducer(n int) *FloatTopReducer { + return &FloatTopReducer{ + h: floatPointsSortBy(make([]FloatPoint, 0, n), func(a, b *FloatPoint) bool { + if a.Value != b.Value { + return a.Value < b.Value + } + return a.Time > b.Time + }), + } +} + +func (r *FloatTopReducer) AggregateFloat(p *FloatPoint) { + if r.h.Len() == cap(r.h.points) { + // Compare the minimum point and the aggregated point. If our value is + // larger, replace the current min value. + if !r.h.cmp(&r.h.points[0], p) { + return + } + r.h.points[0] = *p + heap.Fix(r.h, 0) + return + } + heap.Push(r.h, *p) +} + +func (r *FloatTopReducer) Emit() []FloatPoint { + // Ensure the points are sorted with the maximum value last. While the + // first point may be the minimum value, the rest is not guaranteed to be + // in any particular order while it is a heap. + points := make([]FloatPoint, len(r.h.points)) + for i, p := range r.h.points { + p.Aggregated = 0 + points[i] = p + } + h := floatPointsByFunc{points: points, cmp: r.h.cmp} + sort.Sort(sort.Reverse(&h)) + return points +} + +type IntegerTopReducer struct { + h *integerPointsByFunc +} + +func NewIntegerTopReducer(n int) *IntegerTopReducer { + return &IntegerTopReducer{ + h: integerPointsSortBy(make([]IntegerPoint, 0, n), func(a, b *IntegerPoint) bool { + if a.Value != b.Value { + return a.Value < b.Value + } + return a.Time > b.Time + }), + } +} + +func (r *IntegerTopReducer) AggregateInteger(p *IntegerPoint) { + if r.h.Len() == cap(r.h.points) { + // Compare the minimum point and the aggregated point. If our value is + // larger, replace the current min value. + if !r.h.cmp(&r.h.points[0], p) { + return + } + r.h.points[0] = *p + heap.Fix(r.h, 0) + return + } + heap.Push(r.h, *p) +} + +func (r *IntegerTopReducer) Emit() []IntegerPoint { + // Ensure the points are sorted with the maximum value last. While the + // first point may be the minimum value, the rest is not guaranteed to be + // in any particular order while it is a heap. + points := make([]IntegerPoint, len(r.h.points)) + for i, p := range r.h.points { + p.Aggregated = 0 + points[i] = p + } + h := integerPointsByFunc{points: points, cmp: r.h.cmp} + sort.Sort(sort.Reverse(&h)) + return points +} + +type FloatBottomReducer struct { + h *floatPointsByFunc +} + +func NewFloatBottomReducer(n int) *FloatBottomReducer { + return &FloatBottomReducer{ + h: floatPointsSortBy(make([]FloatPoint, 0, n), func(a, b *FloatPoint) bool { + if a.Value != b.Value { + return a.Value > b.Value + } + return a.Time > b.Time + }), + } +} + +func (r *FloatBottomReducer) AggregateFloat(p *FloatPoint) { + if r.h.Len() == cap(r.h.points) { + // Compare the minimum point and the aggregated point. If our value is + // larger, replace the current min value. + if !r.h.cmp(&r.h.points[0], p) { + return + } + r.h.points[0] = *p + heap.Fix(r.h, 0) + return + } + heap.Push(r.h, *p) +} + +func (r *FloatBottomReducer) Emit() []FloatPoint { + // Ensure the points are sorted with the maximum value last. While the + // first point may be the minimum value, the rest is not guaranteed to be + // in any particular order while it is a heap. + points := make([]FloatPoint, len(r.h.points)) + for i, p := range r.h.points { + p.Aggregated = 0 + points[i] = p + } + h := floatPointsByFunc{points: points, cmp: r.h.cmp} + sort.Sort(sort.Reverse(&h)) + return points +} + +type IntegerBottomReducer struct { + h *integerPointsByFunc +} + +func NewIntegerBottomReducer(n int) *IntegerBottomReducer { + return &IntegerBottomReducer{ + h: integerPointsSortBy(make([]IntegerPoint, 0, n), func(a, b *IntegerPoint) bool { + if a.Value != b.Value { + return a.Value > b.Value + } + return a.Time > b.Time + }), + } +} + +func (r *IntegerBottomReducer) AggregateInteger(p *IntegerPoint) { + if r.h.Len() == cap(r.h.points) { + // Compare the minimum point and the aggregated point. If our value is + // larger, replace the current min value. + if !r.h.cmp(&r.h.points[0], p) { + return + } + r.h.points[0] = *p + heap.Fix(r.h, 0) + return + } + heap.Push(r.h, *p) +} + +func (r *IntegerBottomReducer) Emit() []IntegerPoint { + // Ensure the points are sorted with the maximum value last. While the + // first point may be the minimum value, the rest is not guaranteed to be + // in any particular order while it is a heap. + points := make([]IntegerPoint, len(r.h.points)) + for i, p := range r.h.points { + p.Aggregated = 0 + points[i] = p + } + h := integerPointsByFunc{points: points, cmp: r.h.cmp} + sort.Sort(sort.Reverse(&h)) + return points +} diff --git a/influxql/select.go b/influxql/select.go index c743449c077..d9c40b7490c 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -474,57 +474,107 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { interval := opt.IntegralInterval() return newIntegralIterator(input, opt, interval) case "top": - var tags []int if len(expr.Args) < 2 { return nil, fmt.Errorf("top() requires 2 or more arguments, got %d", len(expr.Args)) - } else if len(expr.Args) > 2 { - // We need to find the indices of where the tag values are stored in Aux - // This section is O(n^2), but for what should be a low value. + } + + var input Iterator + if len(expr.Args) > 2 { + // Create a max iterator using the groupings in the arguments. + dims := make(map[string]struct{}, len(expr.Args)-2) for i := 1; i < len(expr.Args)-1; i++ { ref := expr.Args[i].(*VarRef) - for index, aux := range b.opt.Aux { - if aux.Val == ref.Val { - tags = append(tags, index) - break - } - } + dims[ref.Val] = struct{}{} + } + for dim := range opt.GroupBy { + dims[dim] = struct{}{} } - } - opt := b.opt - opt.Ordered = true - input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) - if err != nil { - return nil, err + call := &Call{ + Name: "max", + Args: expr.Args[:1], + } + callOpt := opt + callOpt.Expr = call + callOpt.GroupBy = dims + callOpt.Fill = NoFill + + builder := *b + builder.opt = callOpt + builder.selector = true + + i, err := builder.callIterator(call, callOpt) + if err != nil { + return nil, err + } + input = i + } else { + // There are no arguments so do not organize the points by tags. + builder := *b + builder.opt.Expr = expr.Args[0] + builder.selector = true + + ref := expr.Args[0].(*VarRef) + i, err := builder.buildVarRefIterator(ref) + if err != nil { + return nil, err + } + input = i } + n := expr.Args[len(expr.Args)-1].(*IntegerLiteral) - return newTopIterator(input, opt, n, tags) + return newTopIterator(input, opt, int(n.Val)) case "bottom": - var tags []int if len(expr.Args) < 2 { return nil, fmt.Errorf("bottom() requires 2 or more arguments, got %d", len(expr.Args)) - } else if len(expr.Args) > 2 { - // We need to find the indices of where the tag values are stored in Aux - // This section is O(n^2), but for what should be a low value. + } + + var input Iterator + if len(expr.Args) > 2 { + // Create a max iterator using the groupings in the arguments. + dims := make(map[string]struct{}, len(expr.Args)-2) for i := 1; i < len(expr.Args)-1; i++ { ref := expr.Args[i].(*VarRef) - for index, aux := range b.opt.Aux { - if aux.Val == ref.Val { - tags = append(tags, index) - break - } - } + dims[ref.Val] = struct{}{} + } + for dim := range opt.GroupBy { + dims[dim] = struct{}{} } - } - opt := b.opt - opt.Ordered = true - input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) - if err != nil { - return nil, err + call := &Call{ + Name: "min", + Args: expr.Args[:1], + } + callOpt := opt + callOpt.Expr = call + callOpt.GroupBy = dims + callOpt.Fill = NoFill + + builder := *b + builder.opt = callOpt + builder.selector = true + + i, err := builder.callIterator(call, callOpt) + if err != nil { + return nil, err + } + input = i + } else { + // There are no arguments so do not organize the points by tags. + builder := *b + builder.opt.Expr = nil + builder.selector = true + + ref := expr.Args[0].(*VarRef) + i, err := builder.buildVarRefIterator(ref) + if err != nil { + return nil, err + } + input = i } + n := expr.Args[len(expr.Args)-1].(*IntegerLiteral) - return newBottomIterator(input, b.opt, n, tags) + return newBottomIterator(input, b.opt, int(n.Val)) } itr, err := func() (Iterator, error) { @@ -542,48 +592,7 @@ func (b *exprIteratorBuilder) buildCallIterator(expr *Call) (Iterator, error) { } fallthrough case "min", "max", "sum", "first", "last", "mean": - inputs := make([]Iterator, 0, len(b.sources)) - if err := func() error { - for _, source := range b.sources { - switch source := source.(type) { - case *Measurement: - input, err := b.ic.CreateIterator(source, opt) - if err != nil { - return err - } - inputs = append(inputs, input) - case *SubQuery: - // Identify the name of the field we are using. - arg0 := expr.Args[0].(*VarRef) - - input, err := buildExprIterator(arg0, b.ic, []Source{source}, opt, b.selector) - if err != nil { - return err - } - - // Wrap the result in a call iterator. - i, err := NewCallIterator(input, opt) - if err != nil { - input.Close() - return err - } - inputs = append(inputs, i) - } - } - return nil - }(); err != nil { - Iterators(inputs).Close() - return nil, err - } - - itr, err := Iterators(inputs).Merge(opt) - if err != nil { - Iterators(inputs).Close() - return nil, err - } else if itr == nil { - itr = &nilFloatIterator{} - } - return itr, nil + return b.callIterator(expr, opt) case "median": opt.Ordered = true input, err := buildExprIterator(expr.Args[0].(*VarRef), b.ic, b.sources, opt, false) @@ -679,6 +688,51 @@ func (b *exprIteratorBuilder) buildBinaryExprIterator(expr *BinaryExpr) (Iterato } } +func (b *exprIteratorBuilder) callIterator(expr *Call, opt IteratorOptions) (Iterator, error) { + inputs := make([]Iterator, 0, len(b.sources)) + if err := func() error { + for _, source := range b.sources { + switch source := source.(type) { + case *Measurement: + input, err := b.ic.CreateIterator(source, opt) + if err != nil { + return err + } + inputs = append(inputs, input) + case *SubQuery: + // Identify the name of the field we are using. + arg0 := expr.Args[0].(*VarRef) + + input, err := buildExprIterator(arg0, b.ic, []Source{source}, opt, b.selector) + if err != nil { + return err + } + + // Wrap the result in a call iterator. + i, err := NewCallIterator(input, opt) + if err != nil { + input.Close() + return err + } + inputs = append(inputs, i) + } + } + return nil + }(); err != nil { + Iterators(inputs).Close() + return nil, err + } + + itr, err := Iterators(inputs).Merge(opt) + if err != nil { + Iterators(inputs).Close() + return nil, err + } else if itr == nil { + itr = &nilFloatIterator{} + } + return itr, nil +} + func buildRHSTransformIterator(lhs Iterator, rhs Literal, op Token, opt IteratorOptions) (Iterator, error) { fn := binaryExprFunc(iteratorDataType(lhs), literalDataType(rhs), op) switch fn := fn.(type) { diff --git a/influxql/select_test.go b/influxql/select_test.go index b56ebda6252..cfe90f2f3c2 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -2,6 +2,7 @@ package influxql_test import ( "fmt" + "math/rand" "reflect" "testing" "time" @@ -766,24 +767,27 @@ func TestSelect_Top_Tags_Float(t *testing.T) { if m.Name != "cpu" { t.Fatalf("unexpected source: %s", m.Name) } + if !reflect.DeepEqual(opt.Expr, MustParseExpr(`max(value::float)`)) { + t.Fatalf("unexpected expr: %s", spew.Sdump(opt.Expr)) + } return influxql.Iterators{ - &FloatIterator{Points: []influxql.FloatPoint{ + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, - }}, - &FloatIterator{Points: []influxql.FloatPoint{ + }}, opt), + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, - }}, - &FloatIterator{Points: []influxql.FloatPoint{ + }}, opt), + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, - }}, + }}, opt), }.Merge(opt) } @@ -878,24 +882,27 @@ func TestSelect_Top_GroupByTags_Float(t *testing.T) { if m.Name != "cpu" { t.Fatalf("unexpected source: %s", m.Name) } + if !reflect.DeepEqual(opt.Expr, MustParseExpr(`max(value::float)`)) { + t.Fatalf("unexpected expr: %s", spew.Sdump(opt.Expr)) + } return influxql.Iterators{ - &FloatIterator{Points: []influxql.FloatPoint{ + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, - }}, - &FloatIterator{Points: []influxql.FloatPoint{ + }}, opt), + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, - }}, - &FloatIterator{Points: []influxql.FloatPoint{ + }}, opt), + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, - }}, + }}, opt), }.Merge(opt) } @@ -930,24 +937,27 @@ func TestSelect_Top_GroupByTags_Integer(t *testing.T) { if m.Name != "cpu" { t.Fatalf("unexpected source: %s", m.Name) } + if !reflect.DeepEqual(opt.Expr, MustParseExpr(`max(value::integer)`)) { + t.Fatalf("unexpected expr: %s", spew.Sdump(opt.Expr)) + } return influxql.Iterators{ - &IntegerIterator{Points: []influxql.IntegerPoint{ + MustCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, - }}, - &IntegerIterator{Points: []influxql.IntegerPoint{ + }}, opt), + MustCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, - }}, - &IntegerIterator{Points: []influxql.IntegerPoint{ + }}, opt), + MustCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, - }}, + }}, opt), }.Merge(opt) } @@ -1074,24 +1084,27 @@ func TestSelect_Bottom_Tags_Float(t *testing.T) { if m.Name != "cpu" { t.Fatalf("unexpected source: %s", m.Name) } + if !reflect.DeepEqual(opt.Expr, MustParseExpr(`min(value::float)`)) { + t.Fatalf("unexpected expr: %s", spew.Sdump(opt.Expr)) + } return influxql.Iterators{ - &FloatIterator{Points: []influxql.FloatPoint{ + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, - }}, - &FloatIterator{Points: []influxql.FloatPoint{ + }}, opt), + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, - }}, - &FloatIterator{Points: []influxql.FloatPoint{ + }}, opt), + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, - }}, + }}, opt), }.Merge(opt) } @@ -1130,24 +1143,27 @@ func TestSelect_Bottom_Tags_Integer(t *testing.T) { if m.Name != "cpu" { t.Fatalf("unexpected source: %s", m.Name) } + if !reflect.DeepEqual(opt.Expr, MustParseExpr(`min(value::integer)`)) { + t.Fatalf("unexpected expr: %s", spew.Sdump(opt.Expr)) + } return influxql.Iterators{ - &IntegerIterator{Points: []influxql.IntegerPoint{ + MustCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, - }}, - &IntegerIterator{Points: []influxql.IntegerPoint{ + }}, opt), + MustCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, - }}, - &IntegerIterator{Points: []influxql.IntegerPoint{ + }}, opt), + MustCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, - }}, + }}, opt), }.Merge(opt) } @@ -1186,24 +1202,27 @@ func TestSelect_Bottom_GroupByTags_Float(t *testing.T) { if m.Name != "cpu" { t.Fatalf("unexpected source: %s", m.Name) } + if !reflect.DeepEqual(opt.Expr, MustParseExpr(`min(value::float)`)) { + t.Fatalf("unexpected expr: %s", spew.Sdump(opt.Expr)) + } return influxql.Iterators{ - &FloatIterator{Points: []influxql.FloatPoint{ + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, - }}, - &FloatIterator{Points: []influxql.FloatPoint{ + }}, opt), + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, - }}, - &FloatIterator{Points: []influxql.FloatPoint{ + }}, opt), + MustCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, - }}, + }}, opt), }.Merge(opt) } @@ -1238,24 +1257,27 @@ func TestSelect_Bottom_GroupByTags_Integer(t *testing.T) { if m.Name != "cpu" { t.Fatalf("unexpected source: %s", m.Name) } + if !reflect.DeepEqual(opt.Expr, MustParseExpr(`min(value::float)`)) { + t.Fatalf("unexpected expr: %s", spew.Sdump(opt.Expr)) + } return influxql.Iterators{ - &IntegerIterator{Points: []influxql.IntegerPoint{ + MustCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100, Aux: []interface{}{"A"}}, - }}, - &IntegerIterator{Points: []influxql.IntegerPoint{ + }}, opt), + MustCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4, Aux: []interface{}{"B"}}, {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5, Aux: []interface{}{"B"}}, - }}, - &IntegerIterator{Points: []influxql.IntegerPoint{ + }}, opt), + MustCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19, Aux: []interface{}{"A"}}, {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2, Aux: []interface{}{"A"}}, - }}, + }}, opt), }.Merge(opt) } @@ -3866,3 +3888,32 @@ func benchmarkSelectDedupe(b *testing.B, seriesN, pointsPerSeries int) { } func BenchmarkSelect_Dedupe_1K(b *testing.B) { benchmarkSelectDedupe(b, 1000, 100) } + +func benchmarkSelectTop(b *testing.B, seriesN, pointsPerSeries int) { + stmt := MustParseSelectStatement(`SELECT top(sval, 10) FROM cpu`) + + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + b.Fatalf("unexpected source: %s", m.Name) + } + if !reflect.DeepEqual(opt.Expr, MustParseExpr(`sval`)) { + b.Fatalf("unexpected expr: %s", spew.Sdump(opt.Expr)) + } + + p := influxql.FloatPoint{ + Name: "cpu", + } + + return &FloatPointGenerator{N: seriesN * pointsPerSeries, Fn: func(i int) *influxql.FloatPoint { + p.Value = float64(rand.Int63()) + p.Time = int64(time.Duration(i) * (10 * time.Second)) + return &p + }}, nil + } + + b.ResetTimer() + benchmarkSelect(b, stmt, &ic) +} + +func BenchmarkSelect_Top_1K(b *testing.B) { benchmarkSelectTop(b, 1000, 1000) }