Skip to content

Commit

Permalink
Add sample function to query language
Browse files Browse the repository at this point in the history
First Pass at implementing sample

Add sample iterators for all types

Remove size from sample struct

Fix off by one error when generating random number

Add benchmarks for sample iterator

Add test and associated fixes for off by one error

Add test for sample function

Remove NumericLiteral from sample function call

Make clear that the counter is incr w/ each call

Rename IsRandom to AllSamplesSeen

Add a rng for each reducer that is created

The default rng that comes with math/rand has a global lock. To avoid
having to worry about any contention on the lock, each reducer now has
its own time seeded rng.

Add sample function to changelog
  • Loading branch information
desa committed Oct 6, 2016
1 parent fc57c0f commit f9b8129
Show file tree
Hide file tree
Showing 10 changed files with 569 additions and 4 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

### Features

- [#7403](https://github.com/influxdata/influxdb/pull/7403): Add `fill(linear)` to query language
- [#7415](https://github.com/influxdata/influxdb/pull/7415): Add sample function to query language.
- [#7403](https://github.com/influxdata/influxdb/pull/7403): Add `fill(linear)` to query language.
- [#7120](https://github.com/influxdata/influxdb/issues/7120): Add additional statistics to query executor.
- [#7135](https://github.com/influxdata/influxdb/pull/7135): Support enable HTTP service over unix domain socket. Thanks @oiooj
- [#3634](https://github.com/influxdata/influxdb/issues/3634): Support mixed duration units.
Expand Down
30 changes: 29 additions & 1 deletion influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,7 +1544,7 @@ func (s *SelectStatement) validSelectWithAggregate() error {
onlySelectors := true
for k := range calls {
switch k {
case "top", "bottom", "max", "min", "first", "last", "percentile":
case "top", "bottom", "max", "min", "first", "last", "percentile", "sample":
default:
onlySelectors = false
break
Expand Down Expand Up @@ -1615,6 +1615,30 @@ func (s *SelectStatement) validPercentileAggr(expr *Call) error {
}
}

// validPercentileAggr determines if PERCENTILE have valid arguments.
func (s *SelectStatement) validSampleAggr(expr *Call) error {
if err := s.validSelectWithAggregate(); err != nil {
return err
}
if exp, got := 2, len(expr.Args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
}

switch expr.Args[0].(type) {
case *VarRef:
// do nothing
default:
return fmt.Errorf("expected field argument in sample()")
}

switch expr.Args[1].(type) {
case *IntegerLiteral:
return nil
default:
return fmt.Errorf("expected integer argument in sample()")
}
}

func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
for _, f := range s.Fields {
for _, expr := range walkFunctionCalls(f.Expr) {
Expand Down Expand Up @@ -1709,6 +1733,10 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
if err := s.validPercentileAggr(expr); err != nil {
return err
}
case "sample":
if err := s.validSampleAggr(expr); err != nil {
return err
}
case "holt_winters", "holt_winters_with_fit":
if exp, got := 3, len(expr.Args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
Expand Down
37 changes: 37 additions & 0 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,3 +1246,40 @@ func newHoltWintersIterator(input Iterator, opt IteratorOptions, h, m int, inclu
return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
}
}

// NewSampleIterator returns an iterator
func NewSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) {
return newSampleIterator(input, opt, size)
}

// newSampleIterator returns an iterator
func newSampleIterator(input Iterator, opt IteratorOptions, size int) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSampleReducer(size)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSampleReducer(size)
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
fn := NewBooleanSampleReducer(size)
return fn, fn
}
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringSampleReducer(size)
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported elapsed iterator type: %T", input)
}
}
27 changes: 27 additions & 0 deletions influxql/call_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,33 @@ func benchmarkCallIterator(b *testing.B, opt influxql.IteratorOptions, pointN in
}
}

func BenchmarkSampleIterator_1k(b *testing.B) { benchmarkSampleIterator(b, 1000) }
func BenchmarkSampleIterator_100k(b *testing.B) { benchmarkSampleIterator(b, 100000) }
func BenchmarkSampleIterator_1M(b *testing.B) { benchmarkSampleIterator(b, 1000000) }

func benchmarkSampleIterator(b *testing.B, pointN int) {
b.ReportAllocs()

// Create a lightweight point generator.
p := influxql.FloatPoint{Name: "cpu"}
input := FloatPointGenerator{
N: pointN,
Fn: func(i int) *influxql.FloatPoint {
p.Value = float64(i)
return &p
},
}

for i := 0; i < b.N; i++ {
// Execute call against input.
itr, err := influxql.NewSampleIterator(&input, influxql.IteratorOptions{}, 100)
if err != nil {
b.Fatal(err)
}
influxql.DrainIterator(itr)
}
}

func BenchmarkDistinctIterator_1K(b *testing.B) { benchmarkDistinctIterator(b, 1000) }
func BenchmarkDistinctIterator_100K(b *testing.B) { benchmarkDistinctIterator(b, 100000) }
func BenchmarkDistinctIterator_1M(b *testing.B) { benchmarkDistinctIterator(b, 1000000) }
Expand Down
186 changes: 185 additions & 1 deletion influxql/functions.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

package influxql

import "sort"
import (
"math/rand"
"sort"
"time"
)

// FloatPointAggregator aggregates points to produce a single point.
type FloatPointAggregator interface {
Expand Down Expand Up @@ -377,6 +381,51 @@ func (r *FloatElapsedReducer) Emit() []IntegerPoint {
return nil
}

// FloatSampleReduces implements a reservoir sampling to calculate a random subset of points
type FloatSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer

points floatPoints // the reservoir
}

// NewFloatSampleReducer creates a new FloatSampleReducer
func NewFloatSampleReducer(size int) *FloatSampleReducer {
return &FloatSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(floatPoints, size),
}
}

// AggregateFloat aggregates a point into the reducer.
func (r *FloatSampleReducer) AggregateFloat(p *FloatPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
r.points[r.count-1] = *p
return
}

// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := rand.Intn(r.count)
if rnd < len(r.points) {
r.points[rnd] = *p
}
}

// Emit emits the reservoir sample as many points.
func (r *FloatSampleReducer) Emit() []FloatPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}

// IntegerPointAggregator aggregates points to produce a single point.
type IntegerPointAggregator interface {
AggregateInteger(p *IntegerPoint)
Expand Down Expand Up @@ -746,6 +795,51 @@ func (r *IntegerElapsedReducer) Emit() []IntegerPoint {
return nil
}

// IntegerSampleReduces implements a reservoir sampling to calculate a random subset of points
type IntegerSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer

points integerPoints // the reservoir
}

// NewIntegerSampleReducer creates a new IntegerSampleReducer
func NewIntegerSampleReducer(size int) *IntegerSampleReducer {
return &IntegerSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(integerPoints, size),
}
}

// AggregateInteger aggregates a point into the reducer.
func (r *IntegerSampleReducer) AggregateInteger(p *IntegerPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
r.points[r.count-1] = *p
return
}

// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := rand.Intn(r.count)
if rnd < len(r.points) {
r.points[rnd] = *p
}
}

// Emit emits the reservoir sample as many points.
func (r *IntegerSampleReducer) Emit() []IntegerPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}

// StringPointAggregator aggregates points to produce a single point.
type StringPointAggregator interface {
AggregateString(p *StringPoint)
Expand Down Expand Up @@ -1115,6 +1209,51 @@ func (r *StringElapsedReducer) Emit() []IntegerPoint {
return nil
}

// StringSampleReduces implements a reservoir sampling to calculate a random subset of points
type StringSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer

points stringPoints // the reservoir
}

// NewStringSampleReducer creates a new StringSampleReducer
func NewStringSampleReducer(size int) *StringSampleReducer {
return &StringSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(stringPoints, size),
}
}

// AggregateString aggregates a point into the reducer.
func (r *StringSampleReducer) AggregateString(p *StringPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
r.points[r.count-1] = *p
return
}

// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := rand.Intn(r.count)
if rnd < len(r.points) {
r.points[rnd] = *p
}
}

// Emit emits the reservoir sample as many points.
func (r *StringSampleReducer) Emit() []StringPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}

// BooleanPointAggregator aggregates points to produce a single point.
type BooleanPointAggregator interface {
AggregateBoolean(p *BooleanPoint)
Expand Down Expand Up @@ -1483,3 +1622,48 @@ func (r *BooleanElapsedReducer) Emit() []IntegerPoint {
}
return nil
}

// BooleanSampleReduces implements a reservoir sampling to calculate a random subset of points
type BooleanSampleReducer struct {
count int // how many points we've iterated over
rng *rand.Rand // random number generator for each reducer

points booleanPoints // the reservoir
}

// NewBooleanSampleReducer creates a new BooleanSampleReducer
func NewBooleanSampleReducer(size int) *BooleanSampleReducer {
return &BooleanSampleReducer{
rng: rand.New(rand.NewSource(time.Now().UnixNano())), // seed with current time as suggested by https://golang.org/pkg/math/rand/
points: make(booleanPoints, size),
}
}

// AggregateBoolean aggregates a point into the reducer.
func (r *BooleanSampleReducer) AggregateBoolean(p *BooleanPoint) {
r.count++
// Fill the reservoir with the first n points
if r.count-1 < len(r.points) {
r.points[r.count-1] = *p
return
}

// Generate a random integer between 1 and the count and
// if that number is less than the length of the slice
// replace the point at that index rnd with p.
rnd := rand.Intn(r.count)
if rnd < len(r.points) {
r.points[rnd] = *p
}
}

// Emit emits the reservoir sample as many points.
func (r *BooleanSampleReducer) Emit() []BooleanPoint {
min := len(r.points)
if r.count < min {
min = r.count
}
pts := r.points[:min]
sort.Sort(pts)
return pts
}
Loading

0 comments on commit f9b8129

Please sign in to comment.