Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize top() and bottom() using an incremental aggregator #8394

Merged
merged 1 commit into from
May 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8384](https://github.com/influxdata/influxdb/pull/8384): Write and compaction stability
- [#7862](https://github.com/influxdata/influxdb/pull/7861): Add new profile endpoint for gathering all debug profiles and querues in single archive.
- [#8390](https://github.com/influxdata/influxdb/issues/8390): Add nanosecond duration literal support.
- [#8394](https://github.com/influxdata/influxdb/pull/8394): Optimize top() and bottom() using an incremental aggregator.

### Bugfixes

Expand Down
246 changes: 6 additions & 240 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package influxql

import (
"bytes"
"container/heap"
"fmt"
"math"
"sort"
Expand Down Expand Up @@ -783,19 +781,17 @@ func IntegerSpreadReduceSlice(a []IntegerPoint) []IntegerPoint {
return []IntegerPoint{{Time: ZeroTime, Value: max - min}}
}

func newTopIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, tags []int) (Iterator, error) {
func newTopIterator(input Iterator, opt IteratorOptions, n int) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
aggregateFn := NewFloatTopReduceSliceFunc(int(n.Val), tags, opt)
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(aggregateFn)
fn := NewFloatTopReducer(n)
return fn, fn
}
return newFloatReduceFloatIterator(input, opt, createFn), nil
case IntegerIterator:
aggregateFn := NewIntegerTopReduceSliceFunc(int(n.Val), tags, opt)
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(aggregateFn)
fn := NewIntegerTopReducer(n)
return fn, fn
}
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
Expand All @@ -804,103 +800,17 @@ func newTopIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, tags
}
}

// NewFloatTopReduceSliceFunc returns the top values within a window.
func NewFloatTopReduceSliceFunc(n int, tags []int, opt IteratorOptions) FloatReduceSliceFunc {
return func(a []FloatPoint) []FloatPoint {
// Filter by tags if they exist.
if len(tags) > 0 {
a = filterFloatByUniqueTags(a, tags, func(cur, p *FloatPoint) bool {
return p.Value > cur.Value || (p.Value == cur.Value && p.Time < cur.Time)
})
}

// If we ask for more elements than exist, restrict n to be the length of the array.
size := n
if size > len(a) {
size = len(a)
}

// Construct a heap preferring higher values and breaking ties
// based on the earliest time for a point.
h := floatPointsSortBy(a, func(a, b *FloatPoint) bool {
if a.Value != b.Value {
return a.Value > b.Value
}
return a.Time < b.Time
})
heap.Init(h)

// Pop the first n elements and then sort by time.
points := make([]FloatPoint, 0, size)
for i := 0; i < size; i++ {
p := heap.Pop(h).(FloatPoint)
points = append(points, p)
}

// Order the points by time if an ordered output was requested.
// Try to keep the original ordering if possible by using a stable sort.
if opt.Ordered {
sort.Stable(floatPointsByTime(points))
}
return points
}
}

// NewIntegerTopReduceSliceFunc returns the top values within a window.
func NewIntegerTopReduceSliceFunc(n int, tags []int, opt IteratorOptions) IntegerReduceSliceFunc {
return func(a []IntegerPoint) []IntegerPoint {
// Filter by tags if they exist.
if len(tags) > 0 {
a = filterIntegerByUniqueTags(a, tags, func(cur, p *IntegerPoint) bool {
return p.Value > cur.Value || (p.Value == cur.Value && p.Time < cur.Time)
})
}

// If we ask for more elements than exist, restrict n to be the length of the array.
size := n
if size > len(a) {
size = len(a)
}

// Construct a heap preferring higher values and breaking ties
// based on the earliest time for a point.
h := integerPointsSortBy(a, func(a, b *IntegerPoint) bool {
if a.Value != b.Value {
return a.Value > b.Value
}
return a.Time < b.Time
})
heap.Init(h)

// Pop the first n elements and then sort by time.
points := make([]IntegerPoint, 0, size)
for i := 0; i < size; i++ {
p := heap.Pop(h).(IntegerPoint)
points = append(points, p)
}

// Order the points by time if an ordered output was requested.
// Try to keep the original ordering if possible by using a stable sort.
if opt.Ordered {
sort.Stable(integerPointsByTime(points))
}
return points
}
}

func newBottomIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, tags []int) (Iterator, error) {
func newBottomIterator(input Iterator, opt IteratorOptions, n int) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
aggregateFn := NewFloatBottomReduceSliceFunc(int(n.Val), tags, opt)
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(aggregateFn)
fn := NewFloatBottomReducer(n)
return fn, fn
}
return newFloatReduceFloatIterator(input, opt, createFn), nil
case IntegerIterator:
aggregateFn := NewIntegerBottomReduceSliceFunc(int(n.Val), tags, opt)
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(aggregateFn)
fn := NewIntegerBottomReducer(n)
return fn, fn
}
return newIntegerReduceIntegerIterator(input, opt, createFn), nil
Expand All @@ -909,150 +819,6 @@ func newBottomIterator(input Iterator, opt IteratorOptions, n *IntegerLiteral, t
}
}

// NewFloatBottomReduceSliceFunc returns the bottom values within a window.
func NewFloatBottomReduceSliceFunc(n int, tags []int, opt IteratorOptions) FloatReduceSliceFunc {
return func(a []FloatPoint) []FloatPoint {
// Filter by tags if they exist.
if len(tags) > 0 {
a = filterFloatByUniqueTags(a, tags, func(cur, p *FloatPoint) bool {
return p.Value < cur.Value || (p.Value == cur.Value && p.Time < cur.Time)
})
}

// If we ask for more elements than exist, restrict n to be the length of the array.
size := n
if size > len(a) {
size = len(a)
}

// Construct a heap preferring lower values and breaking ties
// based on the earliest time for a point.
h := floatPointsSortBy(a, func(a, b *FloatPoint) bool {
if a.Value != b.Value {
return a.Value < b.Value
}
return a.Time < b.Time
})
heap.Init(h)

// Pop the first n elements and then sort by time.
points := make([]FloatPoint, 0, size)
for i := 0; i < size; i++ {
p := heap.Pop(h).(FloatPoint)
points = append(points, p)
}

// Order the points by time if an ordered output was requested.
// Try to keep the original ordering if possible by using a stable sort.
if opt.Ordered {
sort.Stable(floatPointsByTime(points))
}
return points
}
}

// NewIntegerBottomReduceSliceFunc returns the bottom values within a window.
func NewIntegerBottomReduceSliceFunc(n int, tags []int, opt IteratorOptions) IntegerReduceSliceFunc {
return func(a []IntegerPoint) []IntegerPoint {
// Filter by tags if they exist.
if len(tags) > 0 {
a = filterIntegerByUniqueTags(a, tags, func(cur, p *IntegerPoint) bool {
return p.Value < cur.Value || (p.Value == cur.Value && p.Time < cur.Time)
})
}

// If we ask for more elements than exist, restrict n to be the length of the array.
size := n
if size > len(a) {
size = len(a)
}

// Construct a heap preferring lower values and breaking ties
// based on the earliest time for a point.
h := integerPointsSortBy(a, func(a, b *IntegerPoint) bool {
if a.Value != b.Value {
return a.Value < b.Value
}
return a.Time < b.Time
})
heap.Init(h)

// Pop the first n elements and then sort by time.
points := make([]IntegerPoint, 0, size)
for i := 0; i < size; i++ {
p := heap.Pop(h).(IntegerPoint)
points = append(points, p)
}

// Order the points by time if an ordered output was requested.
// Try to keep the original ordering if possible by using a stable sort.
if opt.Ordered {
sort.Stable(integerPointsByTime(points))
}
return points
}
}

func filterFloatByUniqueTags(a []FloatPoint, tags []int, cmpFunc func(cur, p *FloatPoint) bool) []FloatPoint {
pointMap := make(map[string]FloatPoint)
for _, p := range a {
keyBuf := bytes.NewBuffer(nil)
for i, index := range tags {
if i > 0 {
keyBuf.WriteString(",")
}
fmt.Fprintf(keyBuf, "%s", p.Aux[index])
}
key := keyBuf.String()

cur, ok := pointMap[key]
if ok {
if cmpFunc(&cur, &p) {
pointMap[key] = p
}
} else {
pointMap[key] = p
}
}

// Recreate the original array with our new filtered list.
points := make([]FloatPoint, 0, len(pointMap))
for _, p := range pointMap {
points = append(points, p)
}
return points
}

func filterIntegerByUniqueTags(a []IntegerPoint, tags []int, cmpFunc func(cur, p *IntegerPoint) bool) []IntegerPoint {
pointMap := make(map[string]IntegerPoint)
for _, p := range a {
keyBuf := bytes.NewBuffer(nil)
for i, index := range tags {
if i > 0 {
keyBuf.WriteString(",")
}
fmt.Fprintf(keyBuf, "%s", p.Aux[index])
}
key := keyBuf.String()

cur, ok := pointMap[key]
if ok {
if cmpFunc(&cur, &p) {
pointMap[key] = p
}
} else {
pointMap[key] = p
}
}

// Recreate the original array with our new filtered list.
points := make([]IntegerPoint, 0, len(pointMap))
for _, p := range pointMap {
points = append(points, p)
}
return points
}

// newPercentileIterator returns an iterator for operating on a percentile() call.
func newPercentileIterator(input Iterator, opt IteratorOptions, percentile float64) (Iterator, error) {
switch input := input.(type) {
Expand Down
8 changes: 8 additions & 0 deletions influxql/call_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,3 +973,11 @@ func (g *FloatPointGenerator) Next() (*influxql.FloatPoint, error) {
g.i++
return p, nil
}

func MustCallIterator(input influxql.Iterator, opt influxql.IteratorOptions) influxql.Iterator {
itr, err := influxql.NewCallIterator(input, opt)
if err != nil {
panic(err)
}
return itr
}
Loading