Skip to content

Commit

Permalink
Implement fill(mean)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdf committed Jul 20, 2015
1 parent 5206c0e commit 3644da7
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 1 deletion.
6 changes: 6 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2504,6 +2504,12 @@ func TestServer_Query_Fill(t *testing.T) {
exp: `{"results":[{"series":[{"name":"fills","columns":["time","count"],"values":[["2009-11-10T23:00:00Z",2],["2009-11-10T23:00:05Z",1],["2009-11-10T23:00:10Z",1234],["2009-11-10T23:00:15Z",1]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "fill with mean",
command: `select mean(val) from fills where time >= '2009-11-10T23:00:00Z' and time < '2009-11-10T23:00:20Z' group by time(5s) FILL(mean)`,
exp: `{"results":[{"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:10Z",7],["2009-11-10T23:00:15Z",10]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)

for i, query := range test.queries {
Expand Down
2 changes: 2 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ const (
NumberFill
// PreviousFill means that empty aggregate windows will be filled with whatever the previous aggregate window had
PreviousFill
// MeanFill means that empty aggregate windows will be filled with values derived from the previous and next windows
MeanFill
)

// SelectStatement represents a command for extracting data from the database.
Expand Down
4 changes: 3 additions & 1 deletion influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ func (p *Parser) parseFill() (FillOption, interface{}, error) {
return NullFill, nil, nil
}
if len(lit.Args) != 1 {
return NullFill, nil, errors.New("fill requires an argument, e.g.: 0, null, none, previous")
return NullFill, nil, errors.New("fill requires an argument, e.g.: 0, null, none, previous, mean")
}
switch lit.Args[0].String() {
case "null":
Expand All @@ -1679,6 +1679,8 @@ func (p *Parser) parseFill() (FillOption, interface{}, error) {
return NoFill, nil, nil
case "previous":
return PreviousFill, nil, nil
case "mean":
return MeanFill, nil, nil
default:
num, ok := lit.Args[0].(*NumberLiteral)
if !ok {
Expand Down
19 changes: 19 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,25 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// SELECT statement with mean fill
{
s: fmt.Sprintf(`SELECT mean(value) FROM cpu where time < '%s' GROUP BY time(5m) FILL(mean)`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{
Expr: &influxql.Call{
Name: "mean",
Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Condition: &influxql.BinaryExpr{
Op: influxql.LT,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.TimeLiteral{Val: now.UTC()},
},
Dimensions: []*influxql.Dimension{{Expr: &influxql.Call{Name: "time", Args: []influxql.Expr{&influxql.DurationLiteral{Val: 5 * time.Minute}}}}},
Fill: influxql.MeanFill,
},
},

// DELETE statement
{
s: `DELETE FROM myseries WHERE host = 'hosta.influxdb.org'`,
Expand Down
44 changes: 44 additions & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,50 @@ func (e *Executor) processFill(results [][]interface{}) [][]interface{} {
return newResults
}

// mean fill
if e.stmt.Fill == influxql.MeanFill {
var (
nullFirst bool
nullCount = make(map[int]int)
)

for i, vals := range results {
// start at 1 because the first value is always time
for j := 1; j < len(vals); j++ {
if vals[j] == nil {
// record the number of nulls since the last measurement for this value
nullCount[j]++
if i == 0 {
nullFirst = true
}
continue
}
if nullCount[j] > 0 {
// back-fill null values with the mean over those windows
if i-nullCount[j] <= 0 {
// can't fill without previous value
nullCount[j] = 0
continue
}
step := (int64toFloat64(vals[j]) - int64toFloat64(results[i-nullCount[j]-1][j])) / float64(nullCount[j]+1)
current := int64toFloat64(vals[j])
var count int
if nullFirst {
count = i
nullFirst = false
} else {
count = nullCount[j]
}
for k := 1; k <= count; k++ {
current -= step
results[i-k][j] = current
}
nullCount[j] = 0
}
}
}
}

// They're either filling with previous values or a specific number
for i, vals := range results {
// start at 1 because the first value is always time
Expand Down

0 comments on commit 3644da7

Please sign in to comment.