Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mode() function & tests #7199

Merged
merged 2 commits into from
Aug 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 127 additions & 1 deletion cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,17 @@ cpu value=25 1278010023000000000
command: `SELECT derivative(median(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit default (2s) group by time",
command: `SELECT derivative(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit 4s group by time",
command: `SELECT derivative(mode(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",20]]}]}]}`,
},

&Query{
name: "calculate derivative of sum with unit default (2s) group by time",
command: `SELECT derivative(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
Expand Down Expand Up @@ -1806,6 +1817,26 @@ cpu value=20 1278010021000000000
command: `SELECT derivative(median(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit default (2s) group by time with fill 0",
command: `SELECT derivative(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit 4s group by time with fill 0",
command: `SELECT derivative(mode(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit default (2s) group by time with fill previous",
command: `SELECT derivative(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit 4s group by time with fill previous",
command: `SELECT derivative(mode(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate derivative of sum with unit default (2s) group by time with fill 0",
command: `SELECT derivative(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
Expand Down Expand Up @@ -1977,6 +2008,11 @@ cpu value=25 1278010023000000000
command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of mode",
command: `SELECT difference(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of sum",
command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
Expand Down Expand Up @@ -2027,7 +2063,7 @@ cpu value=25 1278010023000000000
}
}

// Ensure the server can handle various group by time difference queries.
// Ensure the server can handle various group by time difference queries with fill.
func TestServer_Query_SelectGroupByTimeDifferenceWithFill(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
Expand Down Expand Up @@ -2071,6 +2107,16 @@ cpu value=20 1278010021000000000
command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of mode with fill 0",
command: `SELECT difference(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`,
},
&Query{
name: "calculate difference of mode with fill previous",
command: `SELECT difference(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of sum with fill 0",
command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
Expand Down Expand Up @@ -2184,6 +2230,11 @@ cpu value=35 1278010025000000000
command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",17.5],["2010-07-01T18:47:04Z",27.5]]}]}]}`,
},
&Query{
name: "calculate moving average of mode",
command: `SELECT moving_average(mode(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",15],["2010-07-01T18:47:04Z",25]]}]}]}`,
},
&Query{
name: "calculate moving average of sum",
command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
Expand Down Expand Up @@ -2280,6 +2331,16 @@ cpu value=35 1278010025000000000
command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",12.5],["2010-07-01T18:47:04Z",22.5]]}]}]}`,
},
&Query{
name: "calculate moving average of mode with fill 0",
command: `SELECT moving_average(mode(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",5],["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`,
},
&Query{
name: "calculate moving average of mode with fill previous",
command: `SELECT moving_average(mode(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",10],["2010-07-01T18:47:04Z",20]]}]}]}`,
},
&Query{
name: "calculate moving average of sum with fill 0",
command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
Expand Down Expand Up @@ -2377,6 +2438,11 @@ func TestServer_Query_MathWithFill(t *testing.T) {
command: `SELECT 4*mean(value) FROM db0.rp0.cpu WHERE time >= '2010-07-01 18:47:00' AND time < '2010-07-01 18:48:30' GROUP BY time(30s) FILL(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","mean"],"values":[["2010-07-01T18:47:00Z",60],["2010-07-01T18:47:30Z",60],["2010-07-01T18:48:00Z",60]]}]}]}`,
},
&Query{
name: "multiplication of mode value with fill previous",
command: `SELECT 4*mode(value) FROM db0.rp0.cpu WHERE time >= '2010-07-01 18:47:00' AND time < '2010-07-01 18:48:30' GROUP BY time(30s) FILL(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","mode"],"values":[["2010-07-01T18:47:00Z",60],["2010-07-01T18:47:30Z",60],["2010-07-01T18:48:00Z",60]]}]}]}`,
},
}...)

for i, query := range test.queries {
Expand Down Expand Up @@ -2730,6 +2796,18 @@ func TestServer_Query_Aggregates_IntMany(t *testing.T) {
command: `SELECT MEDIAN(value) FROM intmany where time < '2000-01-01T00:01:10Z'`,
exp: `{"results":[{"series":[{"name":"intmany","columns":["time","median"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "mode - single - int",
params: url.Values{"db": []string{"db0"}},
command: `SELECT MODE(value) FROM intmany`,
exp: `{"results":[{"series":[{"name":"intmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "mode - multiple - int",
params: url.Values{"db": []string{"db0"}},
command: `SELECT MODE(value) FROM intmany where time < '2000-01-01T00:01:10Z'`,
exp: `{"results":[{"series":[{"name":"intmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "distinct as call - int",
params: url.Values{"db": []string{"db0"}},
Expand Down Expand Up @@ -3104,6 +3182,18 @@ func TestServer_Query_Aggregates_FloatMany(t *testing.T) {
command: `SELECT MEDIAN(value) FROM floatmany where time < '2000-01-01T00:01:10Z'`,
exp: `{"results":[{"series":[{"name":"floatmany","columns":["time","median"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "mode - single - float",
params: url.Values{"db": []string{"db0"}},
command: `SELECT MODE(value) FROM floatmany`,
exp: `{"results":[{"series":[{"name":"floatmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "mode - multiple - float",
params: url.Values{"db": []string{"db0"}},
command: `SELECT MODE(value) FROM floatmany where time < '2000-01-01T00:00:10Z'`,
exp: `{"results":[{"series":[{"name":"floatmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",2]]}]}]}`,
},
&Query{
name: "distinct as call - float",
params: url.Values{"db": []string{"db0"}},
Expand Down Expand Up @@ -3692,6 +3782,42 @@ func TestServer_Query_AggregateSelectors(t *testing.T) {
command: `SELECT tx, median(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "mode - baseline 30s",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"results":[{"series":[{"name":"network","columns":["time","mode"],"values":[["2000-01-01T00:00:00Z",40],["2000-01-01T00:00:30Z",50],["2000-01-01T00:01:00Z",5]]}]}]}`,
},
&Query{
name: "mode - time",
params: url.Values{"db": []string{"db0"}},
command: `SELECT time, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "mode - tx",
params: url.Values{"db": []string{"db0"}},
command: `SELECT tx, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "mode - baseline 30s",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"results":[{"series":[{"name":"network","columns":["time","mode"],"values":[["2000-01-01T00:00:00Z",40],["2000-01-01T00:00:30Z",50],["2000-01-01T00:01:00Z",5]]}]}]}`,
},
&Query{
name: "mode - time",
params: url.Values{"db": []string{"db0"}},
command: `SELECT time, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "mode - tx",
params: url.Values{"db": []string{"db0"}},
command: `SELECT tx, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "spread - baseline 30s",
params: url.Values{"db": []string{"db0"}},
Expand Down
4 changes: 2 additions & 2 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ func (s *SelectStatement) RewriteFields(ic IteratorCreator) (*SelectStatement, e

// Add additional types for certain functions.
switch call.Name {
case "count", "first", "last", "distinct", "elapsed":
case "count", "first", "last", "distinct", "elapsed", "mode":
supportedTypes[String] = struct{}{}
supportedTypes[Boolean] = struct{}{}
case "stddev":
Expand Down Expand Up @@ -3215,7 +3215,7 @@ func (c *Call) Fields() []string {
}
}
return keys
case "min", "max", "first", "last", "sum", "mean":
case "min", "max", "first", "last", "sum", "mean", "mode":
// maintain the order the user specified in the query
keyMap := make(map[string]struct{})
keys := []string{}
Expand Down
162 changes: 162 additions & 0 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,168 @@ func IntegerMedianReduceSlice(a []IntegerPoint) []FloatPoint {
return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}}
}

// newModeIterator returns an iterator for operating on a mode() call.
func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) {

switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(FloatModeReduceSlice)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(IntegerModeReduceSlice)
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringSliceFuncReducer(StringModeReduceSlice)
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
fn := NewBooleanSliceFuncReducer(BooleanModeReduceSlice)
return fn, fn
}
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil

default:
return nil, fmt.Errorf("unsupported median iterator type: %T", input)
}
}

// FloatModeReduceSlice returns the mode value within a window.
func FloatModeReduceSlice(a []FloatPoint) []FloatPoint {
if len(a) == 1 {
return a
}

// fmt.Println(a[0])
sort.Sort(floatPointsByValue(a))

mostFreq := 0
currFreq := 0
currMode := a[0].Value
mostMode := a[0].Value
mostTime := a[0].Time
currTime := a[0].Time

for _, p := range a {
if p.Value != currMode {
currFreq = 1
currMode = p.Value
currTime = p.Time
continue
}
currFreq++
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
continue
}
mostFreq = currFreq
mostMode = p.Value
mostTime = p.Time
}

return []FloatPoint{{Time: ZeroTime, Value: mostMode}}
}

// IntegerModeReduceSlice returns the mode value within a window.
func IntegerModeReduceSlice(a []IntegerPoint) []IntegerPoint {
if len(a) == 1 {
return a
}
sort.Sort(integerPointsByValue(a))

mostFreq := 0
currFreq := 0
currMode := a[0].Value
mostMode := a[0].Value
mostTime := a[0].Time
currTime := a[0].Time

for _, p := range a {
if p.Value != currMode {
currFreq = 1
currMode = p.Value
currTime = p.Time
continue
}
currFreq++
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
continue
}
mostFreq = currFreq
mostMode = p.Value
mostTime = p.Time
}

return []IntegerPoint{{Time: ZeroTime, Value: mostMode}}
}

// StringModeReduceSlice returns the mode value within a window.
func StringModeReduceSlice(a []StringPoint) []StringPoint {
if len(a) == 1 {
return a
}

sort.Sort(stringPointsByValue(a))

mostFreq := 0
currFreq := 0
currMode := a[0].Value
mostMode := a[0].Value
mostTime := a[0].Time
currTime := a[0].Time

for _, p := range a {
if p.Value != currMode {
currFreq = 1
currMode = p.Value
currTime = p.Time
continue
}
currFreq++
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
continue
}
mostFreq = currFreq
mostMode = p.Value
mostTime = p.Time
}

return []StringPoint{{Time: ZeroTime, Value: mostMode}}
}

// BooleanModeReduceSlice returns the mode value within a window.
func BooleanModeReduceSlice(a []BooleanPoint) []BooleanPoint {
if len(a) == 1 {
return a
}

trueFreq := 0
falsFreq := 0
mostMode := false

for _, p := range a {
if p.Value {
trueFreq++
} else {
falsFreq++
}
}
// In case either of true or false are mode then retuned mode value wont be
// of metric with oldest timestamp
if trueFreq >= falsFreq {
mostMode = true
}

return []BooleanPoint{{Time: ZeroTime, Value: mostMode}}
}

// newStddevIterator returns an iterator for operating on a stddev() call.
func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
Expand Down
Loading