Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #2845: fix panic on aggregate functions #2859

Merged
merged 2 commits into from
Jun 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 137 additions & 33 deletions influxql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ func (d distinctValues) Less(i, j int) bool {
switch v := val.(type) {
case uint64:
return intWeight, float64(v)
case int64:
return intWeight, float64(v)
case float64:
return floatWeight, v
case bool:
Expand Down Expand Up @@ -415,16 +417,35 @@ func ReduceCountDistinct(values []interface{}) interface{} {
return len(index)
}

type NumberType int8

const (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first glance this seems to overlap with the InspectDataType function in this same package. Not exactly the same, but it seems redundant.

Float64Type NumberType = iota
Int64Type
)

// MapSum computes the summation of values in an iterator.
func MapSum(itr Iterator) interface{} {
n := float64(0)
count := 0
var resultType NumberType
for _, k, v := itr.Next(); k != 0; _, k, v = itr.Next() {
count++
n += v.(float64)
switch n1 := v.(type) {
case float64:
n += n1
case int64:
n += float64(n1)
resultType = Int64Type
}
}
if count > 0 {
return n
switch resultType {
case Float64Type:
return n
case Int64Type:
return int64(n)
}
}
return nil
}
Expand All @@ -433,15 +454,27 @@ func MapSum(itr Iterator) interface{} {
func ReduceSum(values []interface{}) interface{} {
var n float64
count := 0
var resultType NumberType
for _, v := range values {
if v == nil {
continue
}
count++
n += v.(float64)
switch n1 := v.(type) {
case float64:
n += n1
case int64:
n += float64(n1)
resultType = Int64Type
}
}
if count > 0 {
return n
switch resultType {
case Float64Type:
return n
case Int64Type:
return int64(n)
}
}
return nil
}
Expand All @@ -452,7 +485,13 @@ func MapMean(itr Iterator) interface{} {

for _, k, v := itr.Next(); k != 0; _, k, v = itr.Next() {
out.Count++
out.Mean += (v.(float64) - out.Mean) / float64(out.Count)
switch n1 := v.(type) {
case float64:
out.Mean += (n1 - out.Mean) / float64(out.Count)
case int64:
out.Mean += (float64(n1) - out.Mean) / float64(out.Count)
out.ResultType = Int64Type
}
}

if out.Count > 0 {
Expand All @@ -463,8 +502,9 @@ func MapMean(itr Iterator) interface{} {
}

type meanMapOutput struct {
Count int
Mean float64
Count int
Mean float64
ResultType NumberType
}

// ReduceMean computes the mean of values for each key.
Expand Down Expand Up @@ -641,19 +681,33 @@ func partition(data []float64) (lows []float64, pivotValue float64, highs []floa
return data[1:low], pivotValue, data[high+1:]
}

type minMaxMapOut struct {
Val float64
Type NumberType
}

// MapMin collects the values to pass to the reducer
func MapMin(itr Iterator) interface{} {
var min float64
min := &minMaxMapOut{}

pointsYielded := false
var val float64

for _, k, v := itr.Next(); k != 0; _, k, v = itr.Next() {
val := v.(float64)
switch n := v.(type) {
case float64:
val = n
case int64:
val = float64(n)
min.Type = Int64Type
}

// Initialize min
if !pointsYielded {
min = val
min.Val = val
pointsYielded = true
}
min = math.Min(min, val)
min.Val = math.Min(min.Val, val)
}
if pointsYielded {
return min
Expand All @@ -663,41 +717,60 @@ func MapMin(itr Iterator) interface{} {

// ReduceMin computes the min of value.
func ReduceMin(values []interface{}) interface{} {
var min float64
min := &minMaxMapOut{}
pointsYielded := false

for _, v := range values {
if v == nil {
for _, value := range values {
if value == nil {
continue
}

v, ok := value.(*minMaxMapOut)
if !ok {
continue
}
val := v.(float64)

// Initialize min
if !pointsYielded {
min = val
min.Val = v.Val
min.Type = v.Type
pointsYielded = true
}
m := math.Min(min, val)
min = m
min.Val = math.Min(min.Val, v.Val)
}
if pointsYielded {
return min
switch min.Type {
case Float64Type:
return min.Val
case Int64Type:
return int64(min.Val)
}
}
return nil
}

// MapMax collects the values to pass to the reducer
func MapMax(itr Iterator) interface{} {
var max float64
max := &minMaxMapOut{}

pointsYielded := false
var val float64

for _, k, v := itr.Next(); k != 0; _, k, v = itr.Next() {
val := v.(float64)
switch n := v.(type) {
case float64:
val = n
case int64:
val = float64(n)
max.Type = Int64Type
}

// Initialize max
if !pointsYielded {
max = val
max.Val = val
pointsYielded = true
}
max = math.Max(max, val)
max.Val = math.Max(max.Val, val)
}
if pointsYielded {
return max
Expand All @@ -707,38 +780,58 @@ func MapMax(itr Iterator) interface{} {

// ReduceMax computes the max of value.
func ReduceMax(values []interface{}) interface{} {
var max float64
max := &minMaxMapOut{}
pointsYielded := false

for _, v := range values {
if v == nil {
for _, value := range values {
if value == nil {
continue
}
val := v.(float64)

v, ok := value.(*minMaxMapOut)
if !ok {
continue
}

// Initialize max
if !pointsYielded {
max = val
max.Val = v.Val
max.Type = v.Type
pointsYielded = true
}
max = math.Max(max, val)
max.Val = math.Max(max.Val, v.Val)
}
if pointsYielded {
return max
switch max.Type {
case Float64Type:
return max.Val
case Int64Type:
return int64(max.Val)
}
}
return nil
}

type spreadMapOutput struct {
Min, Max float64
Type NumberType
}

// MapSpread collects the values to pass to the reducer
func MapSpread(itr Iterator) interface{} {
out := &spreadMapOutput{}
pointsYielded := false
var val float64

for _, k, v := itr.Next(); k != 0; _, k, v = itr.Next() {
val := v.(float64)
switch n := v.(type) {
case float64:
val = n
case int64:
val = float64(n)
out.Type = Int64Type
}

// Initialize
if !pointsYielded {
out.Max = val
Expand Down Expand Up @@ -768,13 +861,19 @@ func ReduceSpread(values []interface{}) interface{} {
if !pointsYielded {
result.Max = val.Max
result.Min = val.Min
result.Type = val.Type
pointsYielded = true
}
result.Max = math.Max(result.Max, val.Max)
result.Min = math.Min(result.Min, val.Min)
}
if pointsYielded {
return result.Max - result.Min
switch result.Type {
case Float64Type:
return result.Max - result.Min
case Int64Type:
return int64(result.Max - result.Min)
}
}
return nil
}
Expand All @@ -784,7 +883,12 @@ func MapStddev(itr Iterator) interface{} {
var values []float64

for _, k, v := itr.Next(); k != 0; _, k, v = itr.Next() {
values = append(values, v.(float64))
switch n := v.(type) {
case float64:
values = append(values, n)
case int64:
values = append(values, float64(n))
}
}

return values
Expand Down
4 changes: 2 additions & 2 deletions influxql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func TestMapMean(t *testing.T) {
}{
{ // Single point
input: []point{point{"0", 1, 1.0}},
output: &meanMapOutput{1, 1},
output: &meanMapOutput{1, 1, Float64Type},
},
{ // Two points
input: []point{
point{"0", 1, 2.0},
point{"0", 2, 8.0},
},
output: &meanMapOutput{2, 5.0},
output: &meanMapOutput{2, 5.0, Float64Type},
},
}

Expand Down