Skip to content

Commit

Permalink
Optimize top() and bottom() using an incremental aggregator
Browse files Browse the repository at this point in the history
The previous version of `top()` and `bottom()` would gather all of the
points to use in a slice, filter them (if necessary), then use a
slightly modified heap sort to retrieve the top or bottom values.

This performed horrendously from the standpoint of memory. Since it
consumed so much memory and spent so much time in allocations (along
with sorting a potentially very large slice), this affected speed too.

These calls have now been modified so they keep the top or bottom points
in a min or max heap. For `top()`, a new point will read the minimum
value from the heap. If the new point is greater than the minimum point,
it will replace the minimum point and fix the heap with the new value.
If the new point is smaller, it discards that point. For `bottom()`, the
process is the opposite.

It will then sort the final result to ensure the correct ordering of the
selected points.

When `top()` or `bottom()` contain a tag to select, they have now been
modified so this query:

    SELECT top(value, host, 2) FROM cpu

Essentially becomes this query:

    SELECT top(value, 2), host FROM (
        SELECT max(value) FROM cpu GROUP BY host
    )

This should drastically increase the performance of all `top()` and
`bottom()` queries.
  • Loading branch information
jsternberg committed May 17, 2017
1 parent a5fed3d commit 4e68349
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 359 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8348](https://github.com/influxdata/influxdb/pull/8348): Add max concurrent compaction limits
- [#8366](https://github.com/influxdata/influxdb/pull/8366): Add TSI support tooling.
- [#8350](https://github.com/influxdata/influxdb/pull/8350): Track HTTP client requests for /write and /query with /debug/requests.
<<<<<<< 8f8ff0ec612e6e77ff618f572b302e069de8e5c3
- [#8384](https://github.com/influxdata/influxdb/pull/8384): Write and compaction stability
- [#7862](https://github.com/influxdata/influxdb/pull/7861): Add new profile endpoint for gathering all debug profiles and querues in single archive.
- [#8394](https://github.com/influxdata/influxdb/pull/8394): Optimize top() and bottom() using an incremental aggregator.

### Bugfixes

Expand Down
246 changes: 6 additions & 240 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package influxql

import (
"bytes"
"container/heap"
"fmt"
"math"
"sort"
Expand Down Expand Up @@ -783,19 +781,17 @@ func IntegerSpreadReduceSlice(a []IntegerPoint) []IntegerPoint {
return []IntegerPoint{{Time: ZeroTime, Value: max - min}}
}

func newTopIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, tags []int) (Iterator, error) {
func newTopIterator(input Iterator, opt IteratorOptions, n int) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
aggregateFn := NewFloatTopReduceSliceFunc(int(n.Val), tags, opt)
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(aggregateFn)
fn := NewFloatTopReducer(n)
return fn, fn
}
return newFloatReduceFloatIterator(input, opt, createFn), nil
case IntegerIterator:
aggregateFn := NewIntegerTopReduceSliceFunc(int(n.Val), tags, opt)
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(aggregateFn)
fn := NewIntegerTopReducer(n)
return fn, fn
}
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
Expand All @@ -804,103 +800,17 @@ func newTopIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, tags
}
}

// NewFloatTopReduceSliceFunc returns the top values within a window.
func NewFloatTopReduceSliceFunc(n int, tags []int, opt IteratorOptions) FloatReduceSliceFunc {
return func(a []FloatPoint) []FloatPoint {
// Filter by tags if they exist.
if len(tags) > 0 {
a = filterFloatByUniqueTags(a, tags, func(cur, p *FloatPoint) bool {
return p.Value > cur.Value || (p.Value == cur.Value && p.Time < cur.Time)
})
}

// If we ask for more elements than exist, restrict n to be the length of the array.
size := n
if size > len(a) {
size = len(a)
}

// Construct a heap preferring higher values and breaking ties
// based on the earliest time for a point.
h := floatPointsSortBy(a, func(a, b *FloatPoint) bool {
if a.Value != b.Value {
return a.Value > b.Value
}
return a.Time < b.Time
})
heap.Init(h)

// Pop the first n elements and then sort by time.
points := make([]FloatPoint, 0, size)
for i := 0; i < size; i++ {
p := heap.Pop(h).(FloatPoint)
points = append(points, p)
}

// Order the points by time if an ordered output was requested.
// Try to keep the original ordering if possible by using a stable sort.
if opt.Ordered {
sort.Stable(floatPointsByTime(points))
}
return points
}
}

// NewIntegerTopReduceSliceFunc returns the top values within a window.
func NewIntegerTopReduceSliceFunc(n int, tags []int, opt IteratorOptions) IntegerReduceSliceFunc {
return func(a []IntegerPoint) []IntegerPoint {
// Filter by tags if they exist.
if len(tags) > 0 {
a = filterIntegerByUniqueTags(a, tags, func(cur, p *IntegerPoint) bool {
return p.Value > cur.Value || (p.Value == cur.Value && p.Time < cur.Time)
})
}

// If we ask for more elements than exist, restrict n to be the length of the array.
size := n
if size > len(a) {
size = len(a)
}

// Construct a heap preferring higher values and breaking ties
// based on the earliest time for a point.
h := integerPointsSortBy(a, func(a, b *IntegerPoint) bool {
if a.Value != b.Value {
return a.Value > b.Value
}
return a.Time < b.Time
})
heap.Init(h)

// Pop the first n elements and then sort by time.
points := make([]IntegerPoint, 0, size)
for i := 0; i < size; i++ {
p := heap.Pop(h).(IntegerPoint)
points = append(points, p)
}

// Order the points by time if an ordered output was requested.
// Try to keep the original ordering if possible by using a stable sort.
if opt.Ordered {
sort.Stable(integerPointsByTime(points))
}
return points
}
}

func newBottomIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, tags []int) (Iterator, error) {
func newBottomIterator(input Iterator, opt IteratorOptions, n int) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
aggregateFn := NewFloatBottomReduceSliceFunc(int(n.Val), tags, opt)
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(aggregateFn)
fn := NewFloatBottomReducer(n)
return fn, fn
}
return newFloatReduceFloatIterator(input, opt, createFn), nil
case IntegerIterator:
aggregateFn := NewIntegerBottomReduceSliceFunc(int(n.Val), tags, opt)
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(aggregateFn)
fn := NewIntegerBottomReducer(n)
return fn, fn
}
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
Expand All @@ -909,150 +819,6 @@ func newBottomIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, t
}
}

// NewFloatBottomReduceSliceFunc returns the bottom values within a window.
func NewFloatBottomReduceSliceFunc(n int, tags []int, opt IteratorOptions) FloatReduceSliceFunc {
return func(a []FloatPoint) []FloatPoint {
// Filter by tags if they exist.
if len(tags) > 0 {
a = filterFloatByUniqueTags(a, tags, func(cur, p *FloatPoint) bool {
return p.Value < cur.Value || (p.Value == cur.Value && p.Time < cur.Time)
})
}

// If we ask for more elements than exist, restrict n to be the length of the array.
size := n
if size > len(a) {
size = len(a)
}

// Construct a heap preferring lower values and breaking ties
// based on the earliest time for a point.
h := floatPointsSortBy(a, func(a, b *FloatPoint) bool {
if a.Value != b.Value {
return a.Value < b.Value
}
return a.Time < b.Time
})
heap.Init(h)

// Pop the first n elements and then sort by time.
points := make([]FloatPoint, 0, size)
for i := 0; i < size; i++ {
p := heap.Pop(h).(FloatPoint)
points = append(points, p)
}

// Order the points by time if an ordered output was requested.
// Try to keep the original ordering if possible by using a stable sort.
if opt.Ordered {
sort.Stable(floatPointsByTime(points))
}
return points
}
}

// NewIntegerBottomReduceSliceFunc returns the bottom values within a window.
func NewIntegerBottomReduceSliceFunc(n int, tags []int, opt IteratorOptions) IntegerReduceSliceFunc {
return func(a []IntegerPoint) []IntegerPoint {
// Filter by tags if they exist.
if len(tags) > 0 {
a = filterIntegerByUniqueTags(a, tags, func(cur, p *IntegerPoint) bool {
return p.Value < cur.Value || (p.Value == cur.Value && p.Time < cur.Time)
})
}

// If we ask for more elements than exist, restrict n to be the length of the array.
size := n
if size > len(a) {
size = len(a)
}

// Construct a heap preferring lower values and breaking ties
// based on the earliest time for a point.
h := integerPointsSortBy(a, func(a, b *IntegerPoint) bool {
if a.Value != b.Value {
return a.Value < b.Value
}
return a.Time < b.Time
})
heap.Init(h)

// Pop the first n elements and then sort by time.
points := make([]IntegerPoint, 0, size)
for i := 0; i < size; i++ {
p := heap.Pop(h).(IntegerPoint)
points = append(points, p)
}

// Order the points by time if an ordered output was requested.
// Try to keep the original ordering if possible by using a stable sort.
if opt.Ordered {
sort.Stable(integerPointsByTime(points))
}
return points
}
}

func filterFloatByUniqueTags(a []FloatPoint, tags []int, cmpFunc func(cur, p *FloatPoint) bool) []FloatPoint {
pointMap := make(map[string]FloatPoint)
for _, p := range a {
keyBuf := bytes.NewBuffer(nil)
for i, index := range tags {
if i > 0 {
keyBuf.WriteString(",")
}
fmt.Fprintf(keyBuf, "%s", p.Aux[index])
}
key := keyBuf.String()

cur, ok := pointMap[key]
if ok {
if cmpFunc(&cur, &p) {
pointMap[key] = p
}
} else {
pointMap[key] = p
}
}

// Recreate the original array with our new filtered list.
points := make([]FloatPoint, 0, len(pointMap))
for _, p := range pointMap {
points = append(points, p)
}
return points
}

func filterIntegerByUniqueTags(a []IntegerPoint, tags []int, cmpFunc func(cur, p *IntegerPoint) bool) []IntegerPoint {
pointMap := make(map[string]IntegerPoint)
for _, p := range a {
keyBuf := bytes.NewBuffer(nil)
for i, index := range tags {
if i > 0 {
keyBuf.WriteString(",")
}
fmt.Fprintf(keyBuf, "%s", p.Aux[index])
}
key := keyBuf.String()

cur, ok := pointMap[key]
if ok {
if cmpFunc(&cur, &p) {
pointMap[key] = p
}
} else {
pointMap[key] = p
}
}

// Recreate the original array with our new filtered list.
points := make([]IntegerPoint, 0, len(pointMap))
for _, p := range pointMap {
points = append(points, p)
}
return points
}

// newPercentileIterator returns an iterator for operating on a percentile() call.
func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) (Iterator, error) {
switch input := input.(type) {
Expand Down
8 changes: 8 additions & 0 deletions influxql/call_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,3 +973,11 @@ func (g *FloatPointGenerator) Next() (*influxql.FloatPoint, error) {
g.i++
return p, nil
}

func MustCallIterator(input influxql.Iterator, opt influxql.IteratorOptions) influxql.Iterator {
itr, err := influxql.NewCallIterator(input, opt)
if err != nil {
panic(err)
}
return itr
}
Loading

0 comments on commit 4e68349

Please sign in to comment.