Skip to content

Commit

Permalink
Fix #780 so that fill works with all aggregates
Browse files Browse the repository at this point in the history
Fix issue #780 and add integration tests for each aggregate
to test filling with null and 0.
  • Loading branch information
dgnorton committed Sep 9, 2014
1 parent fddcdaa commit 92cee9a
Show file tree
Hide file tree
Showing 3 changed files with 510 additions and 14 deletions.
41 changes: 27 additions & 14 deletions engine/aggregator_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,7 @@ func (self *CumulativeArithmeticAggregator) ColumnNames() []string {

func (self *CumulativeArithmeticAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
if state == nil {
return [][]*protocol.FieldValue{
{
{DoubleValue: &self.initialValue},
},
}
return [][]*protocol.FieldValue{[]*protocol.FieldValue{self.defaultValue},}
}

return [][]*protocol.FieldValue{
Expand Down Expand Up @@ -319,7 +315,7 @@ func (self *StandardDeviationAggregator) ColumnNames() []string {
func (self *StandardDeviationAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
r, ok := state.(*StandardDeviationRunning)
if !ok {
return nil
return [][]*protocol.FieldValue{[]*protocol.FieldValue{self.defaultValue},}
}

eX := r.totalX / float64(r.count)
Expand Down Expand Up @@ -415,8 +411,11 @@ func (self *DerivativeAggregator) ColumnNames() []string {

func (self *DerivativeAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
s, ok := state.(*DerivativeAggregatorState)
if !ok {
return [][]*protocol.FieldValue{[]*protocol.FieldValue{self.defaultValue},}
}

if !(ok && s.firstValue != nil && s.lastValue != nil) {
if s.firstValue == nil || s.lastValue == nil {
return nil
}

Expand Down Expand Up @@ -513,9 +512,8 @@ func (self *DifferenceAggregator) ColumnNames() []string {

func (self *DifferenceAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
s, ok := state.(*DifferenceAggregatorState)

if !(ok && s.firstValue != nil && s.lastValue != nil) {
return nil
return [][]*protocol.FieldValue{[]*protocol.FieldValue{self.defaultValue},}
}

difference := *s.lastValue.Values[0].DoubleValue - *s.firstValue.Values[0].DoubleValue
Expand Down Expand Up @@ -562,6 +560,7 @@ type HistogramAggregator struct {
explicitBucketStart bool
bucketStopIdx int
columnNames []string
defaultValue *protocol.FieldValue
}

func (self *HistogramAggregator) AggregatePoint(state interface{}, p *protocol.Point) (interface{}, error) {
Expand Down Expand Up @@ -600,6 +599,14 @@ func (self *HistogramAggregator) ColumnNames() []string {

func (self *HistogramAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
returnValues := [][]*protocol.FieldValue{}
if state == nil {
_size := int64(0)
returnValues = append(returnValues, []*protocol.FieldValue{
self.defaultValue,
{Int64Value: &_size},
})
return returnValues
}
buckets := state.(HistogramAggregatorState)
for bucket, size := range buckets {
_bucket := float64(bucket)*self.bucketSize + self.bucketStart
Expand Down Expand Up @@ -939,9 +946,7 @@ func (self *PercentileAggregator) ColumnNames() []string {
func (self *PercentileAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
s, ok := state.(*PercentileAggregatorState)
if !ok {
return [][]*protocol.FieldValue{
{self.defaultValue},
}
return [][]*protocol.FieldValue{[]*protocol.FieldValue{self.defaultValue},}
}
return [][]*protocol.FieldValue{
{{DoubleValue: &s.percentileValue}},
Expand Down Expand Up @@ -1048,7 +1053,10 @@ func (self *ModeAggregator) ColumnNames() []string {
}

func (self *ModeAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
s := state.(*ModeAggregatorState)
s, ok := state.(*ModeAggregatorState)
if !ok {
return [][]*protocol.FieldValue{[]*protocol.FieldValue{self.defaultValue},}
}

counts := make([]int, len(s.counts))
countMap := make(map[int][]interface{}, len(s.counts))
Expand Down Expand Up @@ -1194,6 +1202,7 @@ func (self *DistinctAggregator) GetValues(state interface{}) [][]*protocol.Field
s, ok := state.(*DistinctAggregatorState)
if !ok || len(s.counts) == 0 {
returnValues = append(returnValues, []*protocol.FieldValue{self.defaultValue})
return returnValues
}

for value := range s.counts {
Expand Down Expand Up @@ -1258,7 +1267,11 @@ func (self *FirstOrLastAggregator) ColumnNames() []string {
}

func (self *FirstOrLastAggregator) GetValues(state interface{}) [][]*protocol.FieldValue {
s := state.(FirstOrLastAggregatorState)
s, ok := state.(FirstOrLastAggregatorState)
if !ok {
return [][]*protocol.FieldValue{[]*protocol.FieldValue{self.defaultValue},}
}

return [][]*protocol.FieldValue{
{
s,
Expand Down
Loading

0 comments on commit 92cee9a

Please sign in to comment.