Skip to content

Commit

Permalink
Merge pull request #7199 from influxdata/js-mode-aggregate-function
Browse files Browse the repository at this point in the history
add mode() function & tests
  • Loading branch information
jsternberg authored Aug 24, 2016
2 parents a2470c9 + 993ac1c commit 76f7333
Show file tree
Hide file tree
Showing 7 changed files with 583 additions and 9 deletions.
128 changes: 127 additions & 1 deletion cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,17 @@ cpu value=25 1278010023000000000
command: `SELECT derivative(median(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit default (2s) group by time",
command: `SELECT derivative(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit 4s group by time",
command: `SELECT derivative(mode(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",20]]}]}]}`,
},

&Query{
name: "calculate derivative of sum with unit default (2s) group by time",
command: `SELECT derivative(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
Expand Down Expand Up @@ -1806,6 +1817,26 @@ cpu value=20 1278010021000000000
command: `SELECT derivative(median(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit default (2s) group by time with fill 0",
command: `SELECT derivative(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit 4s group by time with fill 0",
command: `SELECT derivative(mode(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",-20]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit default (2s) group by time with fill previous",
command: `SELECT derivative(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate derivative of mode with unit 4s group by time with fill previous",
command: `SELECT derivative(mode(value), 4s) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","derivative"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate derivative of sum with unit default (2s) group by time with fill 0",
command: `SELECT derivative(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
Expand Down Expand Up @@ -1977,6 +2008,11 @@ cpu value=25 1278010023000000000
command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of mode",
command: `SELECT difference(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of sum",
command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
Expand Down Expand Up @@ -2027,7 +2063,7 @@ cpu value=25 1278010023000000000
}
}

// Ensure the server can handle various group by time difference queries.
// Ensure the server can handle various group by time difference queries with fill.
func TestServer_Query_SelectGroupByTimeDifferenceWithFill(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
Expand Down Expand Up @@ -2071,6 +2107,16 @@ cpu value=20 1278010021000000000
command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of mode with fill 0",
command: `SELECT difference(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",-10]]}]}]}`,
},
&Query{
name: "calculate difference of mode with fill previous",
command: `SELECT difference(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of sum with fill 0",
command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
Expand Down Expand Up @@ -2184,6 +2230,11 @@ cpu value=35 1278010025000000000
command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",17.5],["2010-07-01T18:47:04Z",27.5]]}]}]}`,
},
&Query{
name: "calculate moving average of mode",
command: `SELECT moving_average(mode(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",15],["2010-07-01T18:47:04Z",25]]}]}]}`,
},
&Query{
name: "calculate moving average of sum",
command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s)`,
Expand Down Expand Up @@ -2280,6 +2331,16 @@ cpu value=35 1278010025000000000
command: `SELECT moving_average(median(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",12.5],["2010-07-01T18:47:04Z",22.5]]}]}]}`,
},
&Query{
name: "calculate moving average of mode with fill 0",
command: `SELECT moving_average(mode(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:00Z",5],["2010-07-01T18:47:02Z",5],["2010-07-01T18:47:04Z",15]]}]}]}`,
},
&Query{
name: "calculate moving average of mode with fill previous",
command: `SELECT moving_average(mode(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","moving_average"],"values":[["2010-07-01T18:47:02Z",10],["2010-07-01T18:47:04Z",20]]}]}]}`,
},
&Query{
name: "calculate moving average of sum with fill 0",
command: `SELECT moving_average(sum(value), 2) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:05' group by time(2s) fill(0)`,
Expand Down Expand Up @@ -2377,6 +2438,11 @@ func TestServer_Query_MathWithFill(t *testing.T) {
command: `SELECT 4*mean(value) FROM db0.rp0.cpu WHERE time >= '2010-07-01 18:47:00' AND time < '2010-07-01 18:48:30' GROUP BY time(30s) FILL(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","mean"],"values":[["2010-07-01T18:47:00Z",60],["2010-07-01T18:47:30Z",60],["2010-07-01T18:48:00Z",60]]}]}]}`,
},
&Query{
name: "multiplication of mode value with fill previous",
command: `SELECT 4*mode(value) FROM db0.rp0.cpu WHERE time >= '2010-07-01 18:47:00' AND time < '2010-07-01 18:48:30' GROUP BY time(30s) FILL(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","mode"],"values":[["2010-07-01T18:47:00Z",60],["2010-07-01T18:47:30Z",60],["2010-07-01T18:48:00Z",60]]}]}]}`,
},
}...)

for i, query := range test.queries {
Expand Down Expand Up @@ -2730,6 +2796,18 @@ func TestServer_Query_Aggregates_IntMany(t *testing.T) {
command: `SELECT MEDIAN(value) FROM intmany where time < '2000-01-01T00:01:10Z'`,
exp: `{"results":[{"series":[{"name":"intmany","columns":["time","median"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "mode - single - int",
params: url.Values{"db": []string{"db0"}},
command: `SELECT MODE(value) FROM intmany`,
exp: `{"results":[{"series":[{"name":"intmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "mode - multiple - int",
params: url.Values{"db": []string{"db0"}},
command: `SELECT MODE(value) FROM intmany where time < '2000-01-01T00:01:10Z'`,
exp: `{"results":[{"series":[{"name":"intmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "distinct as call - int",
params: url.Values{"db": []string{"db0"}},
Expand Down Expand Up @@ -3104,6 +3182,18 @@ func TestServer_Query_Aggregates_FloatMany(t *testing.T) {
command: `SELECT MEDIAN(value) FROM floatmany where time < '2000-01-01T00:01:10Z'`,
exp: `{"results":[{"series":[{"name":"floatmany","columns":["time","median"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "mode - single - float",
params: url.Values{"db": []string{"db0"}},
command: `SELECT MODE(value) FROM floatmany`,
exp: `{"results":[{"series":[{"name":"floatmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "mode - multiple - float",
params: url.Values{"db": []string{"db0"}},
command: `SELECT MODE(value) FROM floatmany where time < '2000-01-01T00:00:10Z'`,
exp: `{"results":[{"series":[{"name":"floatmany","columns":["time","mode"],"values":[["1970-01-01T00:00:00Z",2]]}]}]}`,
},
&Query{
name: "distinct as call - float",
params: url.Values{"db": []string{"db0"}},
Expand Down Expand Up @@ -3692,6 +3782,42 @@ func TestServer_Query_AggregateSelectors(t *testing.T) {
command: `SELECT tx, median(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "mode - baseline 30s",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"results":[{"series":[{"name":"network","columns":["time","mode"],"values":[["2000-01-01T00:00:00Z",40],["2000-01-01T00:00:30Z",50],["2000-01-01T00:01:00Z",5]]}]}]}`,
},
&Query{
name: "mode - time",
params: url.Values{"db": []string{"db0"}},
command: `SELECT time, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "mode - tx",
params: url.Values{"db": []string{"db0"}},
command: `SELECT tx, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "mode - baseline 30s",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"results":[{"series":[{"name":"network","columns":["time","mode"],"values":[["2000-01-01T00:00:00Z",40],["2000-01-01T00:00:30Z",50],["2000-01-01T00:01:00Z",5]]}]}]}`,
},
&Query{
name: "mode - time",
params: url.Values{"db": []string{"db0"}},
command: `SELECT time, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "mode - tx",
params: url.Values{"db": []string{"db0"}},
command: `SELECT tx, mode(rx) FROM network where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:29Z' group by time(30s)`,
exp: `{"error":"error parsing query: mixing aggregate and non-aggregate queries is not supported"}`,
},
&Query{
name: "spread - baseline 30s",
params: url.Values{"db": []string{"db0"}},
Expand Down
4 changes: 2 additions & 2 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ func (s *SelectStatement) RewriteFields(ic IteratorCreator) (*SelectStatement, e

// Add additional types for certain functions.
switch call.Name {
case "count", "first", "last", "distinct", "elapsed":
case "count", "first", "last", "distinct", "elapsed", "mode":
supportedTypes[String] = struct{}{}
supportedTypes[Boolean] = struct{}{}
case "stddev":
Expand Down Expand Up @@ -3215,7 +3215,7 @@ func (c *Call) Fields() []string {
}
}
return keys
case "min", "max", "first", "last", "sum", "mean":
case "min", "max", "first", "last", "sum", "mean", "mode":
// maintain the order the user specified in the query
keyMap := make(map[string]struct{})
keys := []string{}
Expand Down
162 changes: 162 additions & 0 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,168 @@ func IntegerMedianReduceSlice(a []IntegerPoint) []FloatPoint {
return []FloatPoint{{Time: ZeroTime, Value: float64(a[len(a)/2].Value)}}
}

// newModeIterator returns an iterator for operating on a mode() call.
func NewModeIterator(input Iterator, opt IteratorOptions) (Iterator, error) {

switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(FloatModeReduceSlice)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(IntegerModeReduceSlice)
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringSliceFuncReducer(StringModeReduceSlice)
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
fn := NewBooleanSliceFuncReducer(BooleanModeReduceSlice)
return fn, fn
}
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil

default:
return nil, fmt.Errorf("unsupported median iterator type: %T", input)
}
}

// FloatModeReduceSlice returns the mode value within a window.
func FloatModeReduceSlice(a []FloatPoint) []FloatPoint {
if len(a) == 1 {
return a
}

// fmt.Println(a[0])
sort.Sort(floatPointsByValue(a))

mostFreq := 0
currFreq := 0
currMode := a[0].Value
mostMode := a[0].Value
mostTime := a[0].Time
currTime := a[0].Time

for _, p := range a {
if p.Value != currMode {
currFreq = 1
currMode = p.Value
currTime = p.Time
continue
}
currFreq++
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
continue
}
mostFreq = currFreq
mostMode = p.Value
mostTime = p.Time
}

return []FloatPoint{{Time: ZeroTime, Value: mostMode}}
}

// IntegerModeReduceSlice returns the mode value within a window.
func IntegerModeReduceSlice(a []IntegerPoint) []IntegerPoint {
if len(a) == 1 {
return a
}
sort.Sort(integerPointsByValue(a))

mostFreq := 0
currFreq := 0
currMode := a[0].Value
mostMode := a[0].Value
mostTime := a[0].Time
currTime := a[0].Time

for _, p := range a {
if p.Value != currMode {
currFreq = 1
currMode = p.Value
currTime = p.Time
continue
}
currFreq++
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
continue
}
mostFreq = currFreq
mostMode = p.Value
mostTime = p.Time
}

return []IntegerPoint{{Time: ZeroTime, Value: mostMode}}
}

// StringModeReduceSlice returns the mode value within a window.
func StringModeReduceSlice(a []StringPoint) []StringPoint {
if len(a) == 1 {
return a
}

sort.Sort(stringPointsByValue(a))

mostFreq := 0
currFreq := 0
currMode := a[0].Value
mostMode := a[0].Value
mostTime := a[0].Time
currTime := a[0].Time

for _, p := range a {
if p.Value != currMode {
currFreq = 1
currMode = p.Value
currTime = p.Time
continue
}
currFreq++
if mostFreq > currFreq || (mostFreq == currFreq && currTime > mostTime) {
continue
}
mostFreq = currFreq
mostMode = p.Value
mostTime = p.Time
}

return []StringPoint{{Time: ZeroTime, Value: mostMode}}
}

// BooleanModeReduceSlice returns the mode value within a window.
func BooleanModeReduceSlice(a []BooleanPoint) []BooleanPoint {
if len(a) == 1 {
return a
}

trueFreq := 0
falsFreq := 0
mostMode := false

for _, p := range a {
if p.Value {
trueFreq++
} else {
falsFreq++
}
}
// In case either of true or false are mode then retuned mode value wont be
// of metric with oldest timestamp
if trueFreq >= falsFreq {
mostMode = true
}

return []BooleanPoint{{Time: ZeroTime, Value: mostMode}}
}

// newStddevIterator returns an iterator for operating on a stddev() call.
func newStddevIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
Expand Down
Loading

0 comments on commit 76f7333

Please sign in to comment.