diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index dc13725eff3..8db15e7a24b 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -1652,6 +1652,17 @@ cpu value=25 1278010023000000000 command: `SELECT derivative(median(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",20]]}]}]}`, }, + &Query{ + name: "calculate derivative of mode with unit default (2s) group by time", + command: `SELECT derivative(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`, + }, + &Query{ + name: "calculate derivative of mode with unit 4s group by time", + command: `SELECT derivative(mode(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",20]]}]}]}`, + }, + &Query{ name: "calculate derivative of sum with unit default (2s) group by time", command: `SELECT derivative(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, @@ -1806,6 +1817,26 @@ cpu value=20 1278010021000000000 command: `SELECT derivative(median(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`, }, + &Query{ + name: "calculate derivative of mode with unit default (2s) group by time with fill 0", + command: `SELECT derivative(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`, + }, + &Query{ + name: "calculate derivative of mode with unit 4s group by time with fill 0", + command: `SELECT derivative(mode(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`, + }, + &Query{ + name: "calculate derivative of mode with unit default (2s) group by time with fill previous", + command: `SELECT derivative(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`, + }, + &Query{ + name: "calculate derivative of mode with unit 4s group by time with fill previous", + command: `SELECT derivative(mode(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`, + }, &Query{ name: "calculate derivative of sum with unit default (2s) group by time with fill 0", command: `SELECT derivative(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, @@ -1977,6 +2008,11 @@ cpu value=25 1278010023000000000 command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`, }, + &Query{ + name: "calculate difference of mode", + command: `SELECT difference(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`, + }, &Query{ name: "calculate difference of sum", command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, @@ -2027,7 +2063,7 @@ cpu value=25 1278010023000000000 } } -// Ensure the server can handle various group by time difference queries. +// Ensure the server can handle various group by time difference queries with fill. func TestServer_Query_SelectGroupByTimeDifferenceWithFill(t *testing.T) { t.Parallel() s := OpenServer(NewConfig()) @@ -2071,6 +2107,16 @@ cpu value=20 1278010021000000000 command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`, }, + &Query{ + name: "calculate difference of mode with fill 0", + command: `SELECT difference(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`, + }, + &Query{ + name: "calculate difference of mode with fill previous", + command: `SELECT difference(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`, + }, &Query{ name: "calculate difference of sum with fill 0", command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, @@ -2184,6 +2230,11 @@ cpu value=35 1278010025000000000 command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",17.5],["2010-07-01T18:47:04Z",27.5]]}]}]}`, }, + &Query{ + name: "calculate moving average of mode", + command: `SELECT moving_average(mode(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",15],["2010-07-01T18:47:04Z",25]]}]}]}`, + }, &Query{ name: "calculate moving average of sum", command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`, @@ -2280,6 +2331,16 @@ cpu value=35 1278010025000000000 command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",12.5],["2010-07-01T18:47:04Z",22.5]]}]}]}`, }, + &Query{ + name: "calculate moving average of mode with fill 0", + command: `SELECT moving_average(mode(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",5],["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`, + }, + &Query{ + name: "calculate moving average of mode with fill previous", + command: `SELECT moving_average(mode(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",10],["2010-07-01T18:47:04Z",20]]}]}]}`, + }, &Query{ name: "calculate moving average of sum with fill 0", command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`, @@ -2377,6 +2438,11 @@ func TestServer_Query_MathWithFill(t *testing.T) { command: `SELECT 4*mean(value) FROM db0.rp0.cpu WHERE time >= '2010-07-01 18:47:00' AND time < '2010-07-01 18:48:30' GROUP BY time(30s) FILL(previous)`, exp: `{"results":[{"series":[{"name":"cpu","columns":["time","mean"],"values":[["2010-07-01T18:47:00Z",60],["2010-07-01T18:47:30Z",60],["2010-07-01T18:48:00Z",60]]}]}]}`, }, + &Query{ + name: "multiplication of mode value with fill previous", + command: `SELECT 4*mode(value) FROM db0.rp0.cpu WHERE time >= '2010-07-01 18:47:00' AND time < '2010-07-01 18:48:30' GROUP BY time(30s) FILL(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","mode"],"values":[["2010-07-01T18:47:00Z",60],["2010-07-01T18:47:30Z",60],["2010-07-01T18:48:00Z",60]]}]}]}`, + }, }...) for i, query := range test.queries { @@ -2730,6 +2796,18 @@ func TestServer_Query_Aggregates_IntMany(t *testing.T) { command: `SELECT MEDIAN(value) FROM intmany where time < '2000-01-01T00:01:10Z'`, exp: `{"results":[{"series":[{"name":"intmany","columns":["time","median"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`, }, + &Query{ + name: "mode - single - int", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT MODE(value) FROM intmany`, + exp: `{"results":[{"series":[{"name":"intmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`, + }, + &Query{ + name: "mode - multiple - int", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT MODE(value) FROM intmany where time < '2000-01-01T00:01:10Z'`, + exp: `{"results":[{"series":[{"name":"intmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`, + }, &Query{ name: "distinct as call - int", params: url.Values{"db": []string{"db0"}}, @@ -3104,6 +3182,18 @@ func TestServer_Query_Aggregates_FloatMany(t *testing.T) { command: `SELECT MEDIAN(value) FROM floatmany where time < '2000-01-01T00:01:10Z'`, exp: `{"results":[{"series":[{"name":"floatmany","columns":["time","median"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`, }, + &Query{ + name: "mode - single - float", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT MODE(value) FROM floatmany`, + exp: `{"results":[{"series":[{"name":"floatmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`, + }, + &Query{ + name: "mode - multiple - float", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT MODE(value) FROM floatmany where time < '2000-01-01T00:00:10Z'`, + exp: `{"results":[{"series":[{"name":"floatmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",2]]}]}]}`, + }, &Query{ name: "distinct as call - float", params: url.Values{"db": []string{"db0"}}, @@ -3692,6 +3782,42 @@ func TestServer_Query_AggregateSelectors(t *testing.T) { command: `SELECT tx, median(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`, exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`, }, + &Query{ + name: "mode - baseline 30s", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`, + exp: `{"results":[{"series":[{"name":"network","columns":["time","mode"],"values":[["2000-01-01T00:00:00Z",40],["2000-01-01T00:00:30Z",50],["2000-01-01T00:01:00Z",5]]}]}]}`, + }, + &Query{ + name: "mode - time", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT time, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`, + exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`, + }, + &Query{ + name: "mode - tx", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT tx, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`, + exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`, + }, + &Query{ + name: "mode - baseline 30s", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`, + exp: `{"results":[{"series":[{"name":"network","columns":["time","mode"],"values":[["2000-01-01T00:00:00Z",40],["2000-01-01T00:00:30Z",50],["2000-01-01T00:01:00Z",5]]}]}]}`, + }, + &Query{ + name: "mode - time", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT time, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`, + exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`, + }, + &Query{ + name: "mode - tx", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT tx, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`, + exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`, + }, &Query{ name: "spread - baseline 30s", params: url.Values{"db": []string{"db0"}}, diff --git a/influxql/ast.go b/influxql/ast.go index 06e211b8d6e..a0b0fb6a0e3 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1157,7 +1157,7 @@ func (s *SelectStatement) RewriteFields(ic IteratorCreator) (*SelectStatement, e // Add additional types for certain functions. switch call.Name { - case "count", "first", "last", "distinct", "elapsed": + case "count", "first", "last", "distinct", "elapsed", "mode": supportedTypes[String] = struct{}{} supportedTypes[Boolean] = struct{}{} case "stddev": @@ -3215,7 +3215,7 @@ func (c *Call) Fields() []string { } } return keys - case "min", "max", "first", "last", "sum", "mean": + case "min", "max", "first", "last", "sum", "mean", "mode": // maintain the order the user specified in the query keyMap := make(map[string]struct{}) keys := []string{} diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index 7480416a2ca..049e8d3e054 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -500,6 +500,168 @@ func IntegerMedianReduceSlice(a []IntegerPoint) []FloatPoint { return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}} } +// newModeIterator returns an iterator for operating on a mode() call. +func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) { + + switch input := input.(type) { + case FloatIterator: + createFn := func() (FloatPointAggregator, FloatPointEmitter) { + fn := NewFloatSliceFuncReducer(FloatModeReduceSlice) + return fn, fn + } + return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil + case IntegerIterator: + createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { + fn := NewIntegerSliceFuncReducer(IntegerModeReduceSlice) + return fn, fn + } + return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil + case StringIterator: + createFn := func() (StringPointAggregator, StringPointEmitter) { + fn := NewStringSliceFuncReducer(StringModeReduceSlice) + return fn, fn + } + return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil + case BooleanIterator: + createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { + fn := NewBooleanSliceFuncReducer(BooleanModeReduceSlice) + return fn, fn + } + return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil + + default: + return nil, fmt.Errorf("unsupported median iterator type: %T", input) + } +} + +// FloatModeReduceSlice returns the mode value within a window. +func FloatModeReduceSlice(a []FloatPoint) []FloatPoint { + if len(a) == 1 { + return a + } + + // fmt.Println(a[0]) + sort.Sort(floatPointsByValue(a)) + + mostFreq := 0 + currFreq := 0 + currMode := a[0].Value + mostMode := a[0].Value + mostTime := a[0].Time + currTime := a[0].Time + + for _, p := range a { + if p.Value != currMode { + currFreq = 1 + currMode = p.Value + currTime = p.Time + continue + } + currFreq++ + if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) { + continue + } + mostFreq = currFreq + mostMode = p.Value + mostTime = p.Time + } + + return []FloatPoint{{Time: ZeroTime, Value: mostMode}} +} + +// IntegerModeReduceSlice returns the mode value within a window. +func IntegerModeReduceSlice(a []IntegerPoint) []IntegerPoint { + if len(a) == 1 { + return a + } + sort.Sort(integerPointsByValue(a)) + + mostFreq := 0 + currFreq := 0 + currMode := a[0].Value + mostMode := a[0].Value + mostTime := a[0].Time + currTime := a[0].Time + + for _, p := range a { + if p.Value != currMode { + currFreq = 1 + currMode = p.Value + currTime = p.Time + continue + } + currFreq++ + if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) { + continue + } + mostFreq = currFreq + mostMode = p.Value + mostTime = p.Time + } + + return []IntegerPoint{{Time: ZeroTime, Value: mostMode}} +} + +// StringModeReduceSlice returns the mode value within a window. +func StringModeReduceSlice(a []StringPoint) []StringPoint { + if len(a) == 1 { + return a + } + + sort.Sort(stringPointsByValue(a)) + + mostFreq := 0 + currFreq := 0 + currMode := a[0].Value + mostMode := a[0].Value + mostTime := a[0].Time + currTime := a[0].Time + + for _, p := range a { + if p.Value != currMode { + currFreq = 1 + currMode = p.Value + currTime = p.Time + continue + } + currFreq++ + if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) { + continue + } + mostFreq = currFreq + mostMode = p.Value + mostTime = p.Time + } + + return []StringPoint{{Time: ZeroTime, Value: mostMode}} +} + +// BooleanModeReduceSlice returns the mode value within a window. +func BooleanModeReduceSlice(a []BooleanPoint) []BooleanPoint { + if len(a) == 1 { + return a + } + + trueFreq := 0 + falsFreq := 0 + mostMode := false + + for _, p := range a { + if p.Value { + trueFreq++ + } else { + falsFreq++ + } + } + // In case either of true or false are mode then retuned mode value wont be + // of metric with oldest timestamp + if trueFreq >= falsFreq { + mostMode = true + } + + return []BooleanPoint{{Time: ZeroTime, Value: mostMode}} +} + // newStddevIterator returns an iterator for operating on a stddev() call. func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { diff --git a/influxql/call_iterator_test.go b/influxql/call_iterator_test.go index b32320198db..78b5a4ead74 100644 --- a/influxql/call_iterator_test.go +++ b/influxql/call_iterator_test.go @@ -659,6 +659,146 @@ func TestCallIterator_Last_Boolean(t *testing.T) { } } +// Ensure that a float iterator can be created for a mode() call. +func TestCallIterator_Mode_Float(t *testing.T) { + itr, _ := influxql.NewModeIterator(&FloatIterator{Points: []influxql.FloatPoint{ + {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 3, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 4, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 7, Value: 21, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 8, Value: 21, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 22, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 24, Value: 25, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + influxql.IteratorOptions{ + Expr: MustParseExpr(`mode("value")`), + Dimensions: []string{"host"}, + Interval: influxql.Interval{Duration: 5 * time.Nanosecond}, + }, + ) + + if a, err := Iterators([]influxql.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA"), Aggregated: 0}}, + {&influxql.FloatPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB"), Aggregated: 0}}, + {&influxql.FloatPoint{Time: 5, Value: 21, Tags: ParseTags("host=hostA"), Aggregated: 0}}, + {&influxql.FloatPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB"), Aggregated: 0}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +// Ensure that a integer iterator can be created for a mode() call. +func TestCallIterator_Mode_Integer(t *testing.T) { + itr, _ := influxql.NewModeIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ + {Time: 0, Value: 15, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: 11, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 1, Value: 10, Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 2, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 3, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 4, Value: 10, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 6, Value: 20, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 7, Value: 21, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 8, Value: 21, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 22, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: 8, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 24, Value: 25, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + influxql.IteratorOptions{ + Expr: MustParseExpr(`mode("value")`), + Dimensions: []string{"host"}, + Interval: influxql.Interval{Duration: 5 * time.Nanosecond}, + }, + ) + + if a, err := Iterators([]influxql.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Time: 0, Value: 10, Tags: ParseTags("host=hostA")}}, + {&influxql.IntegerPoint{Time: 1, Value: 11, Tags: ParseTags("host=hostB")}}, + {&influxql.IntegerPoint{Time: 5, Value: 21, Tags: ParseTags("host=hostA")}}, + {&influxql.IntegerPoint{Time: 20, Value: 8, Tags: ParseTags("host=hostB")}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +// Ensure that a string iterator can be created for a mode() call. +func TestCallIterator_Mode_String(t *testing.T) { + itr, _ := influxql.NewModeIterator(&StringIterator{Points: []influxql.StringPoint{ + {Time: 0, Value: "15", Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: "11", Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 1, Value: "10", Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 2, Value: "10", Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 3, Value: "10", Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 4, Value: "10", Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 6, Value: "20", Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 7, Value: "21", Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 7, Value: "21", Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 22, Value: "8", Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: "8", Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 24, Value: "25", Tags: ParseTags("region=us-west,host=hostB")}, + }}, + influxql.IteratorOptions{ + Expr: MustParseExpr(`mode("value")`), + Dimensions: []string{"host"}, + Interval: influxql.Interval{Duration: 5 * time.Nanosecond}, + }, + ) + + if a, err := Iterators([]influxql.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.StringPoint{Time: 0, Value: "10", Tags: ParseTags("host=hostA")}}, + {&influxql.StringPoint{Time: 1, Value: "11", Tags: ParseTags("host=hostB")}}, + {&influxql.StringPoint{Time: 5, Value: "21", Tags: ParseTags("host=hostA")}}, + {&influxql.StringPoint{Time: 20, Value: "8", Tags: ParseTags("host=hostB")}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +// Ensure that a boolean iterator can be created for a modBooleanl. +func TestCallIterator_Mode_Boolean(t *testing.T) { + itr, _ := influxql.NewModeIterator(&BooleanIterator{Points: []influxql.BooleanPoint{ + {Time: 0, Value: true, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 1, Value: false, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 1, Value: true, Tags: ParseTags("region=us-west,host=hostA")}, + {Time: 2, Value: true, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 3, Value: true, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 4, Value: false, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 6, Value: false, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 7, Value: false, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 8, Value: false, Tags: ParseTags("region=us-east,host=hostA")}, + {Time: 22, Value: false, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 23, Value: true, Tags: ParseTags("region=us-west,host=hostB")}, + {Time: 24, Value: true, Tags: ParseTags("region=us-west,host=hostB")}, + }}, + influxql.IteratorOptions{ + Expr: MustParseExpr(`mode("value")`), + Dimensions: []string{"host"}, + Interval: influxql.Interval{Duration: 5 * time.Nanosecond}, + }, + ) + + if a, err := Iterators([]influxql.Iterator{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.BooleanPoint{Time: 0, Value: true, Tags: ParseTags("host=hostA")}}, + {&influxql.BooleanPoint{Time: 1, Value: false, Tags: ParseTags("host=hostB")}}, + {&influxql.BooleanPoint{Time: 5, Value: false, Tags: ParseTags("host=hostA")}}, + {&influxql.BooleanPoint{Time: 20, Value: true, Tags: ParseTags("host=hostB")}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + func TestNewCallIterator_UnsupportedExprName(t *testing.T) { _, err := influxql.NewCallIterator( &FloatIterator{}, @@ -731,11 +871,11 @@ func benchmarkDistinctIterator(b *testing.B, pointN int) { } } -func BenchmarkMedianIterator_1K(b *testing.B) { benchmarkMedianIterator(b, 1000) } -func BenchmarkMedianIterator_100K(b *testing.B) { benchmarkMedianIterator(b, 100000) } -func BenchmarkMedianIterator_1M(b *testing.B) { benchmarkMedianIterator(b, 1000000) } +func BenchmarkModeIterator_1K(b *testing.B) { benchmarkModeIterator(b, 1000) } +func BenchmarkModeIterator_100K(b *testing.B) { benchmarkModeIterator(b, 100000) } +func BenchmarkModeIterator_1M(b *testing.B) { benchmarkModeIterator(b, 1000000) } -func benchmarkMedianIterator(b *testing.B, pointN int) { +func benchmarkModeIterator(b *testing.B, pointN int) { b.ReportAllocs() for i := 0; i < b.N; i++ { @@ -744,13 +884,13 @@ func benchmarkMedianIterator(b *testing.B, pointN int) { input := FloatPointGenerator{ N: pointN, Fn: func(i int) *influxql.FloatPoint { - p.Value = float64(i % 10) + p.Value = float64(10) return &p }, } // Execute call against input. - itr, err := influxql.NewMedianIterator(&input, influxql.IteratorOptions{}) + itr, err := influxql.NewModeIterator(&input, influxql.IteratorOptions{}) if err != nil { b.Fatal(err) } diff --git a/influxql/parser_test.go b/influxql/parser_test.go index b514cb57f7f..d10ea2af830 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -2044,6 +2044,7 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT last(max(value)) FROM myseries`, err: `expected field argument in last()`}, {s: `SELECT mean(max(value)) FROM myseries`, err: `expected field argument in mean()`}, {s: `SELECT median(max(value)) FROM myseries`, err: `expected field argument in median()`}, + {s: `SELECT mode(max(value)) FROM myseries`, err: `expected field argument in mode()`}, {s: `SELECT stddev(max(value)) FROM myseries`, err: `expected field argument in stddev()`}, {s: `SELECT spread(max(value)) FROM myseries`, err: `expected field argument in spread()`}, {s: `SELECT top() FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 0`}, diff --git a/influxql/select.go b/influxql/select.go index 261c90eba71..48825a44012 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -343,6 +343,12 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec return nil, err } return newMedianIterator(input, opt) + case "mode": + input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt, false) + if err != nil { + return nil, err + } + return NewModeIterator(input, opt) case "stddev": input, err := buildExprIterator(expr.Args[0].(*VarRef), ic, opt, false) if err != nil { diff --git a/influxql/select_test.go b/influxql/select_test.go index 0d0343f47d2..8c7ff82f3b3 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -393,6 +393,145 @@ func TestSelect_Median_Boolean(t *testing.T) { } } +// Ensure a SELECT mode() query can be executed. +func TestSelect_Mode_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 5}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mode(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 10}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 10}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 100}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 1}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +// Ensure a SELECT mode() query can be executed. +func TestSelect_Mode_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 50 * Second, Value: 1}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 51 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 52 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 53 * Second, Value: 4}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 54 * Second, Value: 5}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mode(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: 10}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: 10}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 100}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Value: 1}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +// Ensure a SELECT mode() query cannot be executed on strings. +func TestSelect_Mode_String(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &StringIterator{Points: []influxql.StringPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: "a"}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 1 * Second, Value: "a"}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: "cxxx"}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 6 * Second, Value: "zzzz"}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 7 * Second, Value: "zzzz"}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 8 * Second, Value: "zxxx"}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: "b"}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: "d"}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 11 * Second, Value: "d"}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 12 * Second, Value: "d"}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mode(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected point: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: "a"}}, + {&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: "zzzz"}}, + {&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: "d"}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +// Ensure a SELECT mode() query cannot be executed on booleans. +func TestSelect_Mode_Boolean(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &BooleanIterator{Points: []influxql.BooleanPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: true}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 1 * Second, Value: false}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 2 * Second, Value: false}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: true}, + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 6 * Second, Value: false}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: false}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: true}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 11 * Second, Value: false}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 12 * Second, Value: true}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mode(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected point: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Value: false}}, + {&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Value: true}}, + {&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: true}}, + }) { + t.Errorf("unexpected points: %s", spew.Sdump(a)) + } +} + // Ensure a SELECT top() query can be executed. func TestSelect_Top_NoTags_Float(t *testing.T) { var ic IteratorCreator