Skip to content

Commit

Permalink
Merge pull request #154 from influxdata/nc-issue#153
Browse files Browse the repository at this point in the history
Fix panic with wrong field name for top/bottom MR functions.
  • Loading branch information
Nathaniel Cook committed Jan 20, 2016
2 parents 88cd92e + 0378df5 commit 33b10fe
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This way you can easily tell which versions of Telegraf, InfluxDB, Chronograf an
- [#139](https://github.com/influxdata/kapacitor/issues/139): Alerta.io support thanks! @md14454

### Bugfixes
- [#153](https://github.com/influxdata/kapacitor/issues/153): Fix panic if referencing non existant field in MapReduce function.

## v0.2.4 [2016-01-07]

Expand Down
2 changes: 1 addition & 1 deletion etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ data_dir = "/var/lib/kapacitor"
# Id -- the alert Id, NODE_NAME will be replaced with the name of the node being monitored.
id = "node 'NODE_NAME' in task '{{ .TaskName }}'"
# The message of the alert. INTERVAL will be replaced by the interval.
message = "{{ .ID }} is dead: {{ index .Fields \"collected\" }} points/INTERVAL"
message = "{{ .ID }} is {{ if eq .Level \"OK\" }}alive{{ else }}dead{{ end }}: {{ index .Fields \"collected\" | printf \"%0.3f\" }} points/INTERVAL."


[influxdb]
Expand Down
2 changes: 1 addition & 1 deletion integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ batch
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
.mapReduce(influxql.count('value'))
.mapReduce(influxql.count('mean'))
.window()
.period(20s)
.every(20s)
Expand Down
14 changes: 7 additions & 7 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ stream
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('idle'))
.mapReduce(influxql.count('value'))
.alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > infoThreshold)
Expand Down Expand Up @@ -1370,7 +1370,7 @@ stream
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('idle'))
.mapReduce(influxql.count('value'))
.alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
Expand Down Expand Up @@ -1447,7 +1447,7 @@ stream
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('idle'))
.mapReduce(influxql.count('value'))
.alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
Expand Down Expand Up @@ -1537,7 +1537,7 @@ stream
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('idle'))
.mapReduce(influxql.count('value'))
.alert()
.id('{{ index .Tags "host" }}')
.message('kapacitor/{{ .Name }}/{{ index .Tags "host" }} is {{ .Level }}')
Expand Down Expand Up @@ -1662,7 +1662,7 @@ stream
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('idle'))
.mapReduce(influxql.count('value'))
.alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
Expand Down Expand Up @@ -1738,7 +1738,7 @@ stream
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('idle'))
.mapReduce(influxql.count('value'))
.alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.message('{{ .Level }} alert for {{ .ID }}')
Expand Down Expand Up @@ -1821,7 +1821,7 @@ stream
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('idle'))
.mapReduce(influxql.count('value'))
.alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
Expand Down
10 changes: 9 additions & 1 deletion map_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,17 @@ func (m *MapNode) mapBatch(b models.Batch) error {
inputs := make([]tsdb.MapInput, m.parallel)
j := 0
for _, p := range b.Points {
value, ok := p.Fields[m.field]
if !ok {
fields := make([]string, 0, len(p.Fields))
for field := range p.Fields {
fields = append(fields, field)
}
return fmt.Errorf("unknown field %s, available fields %v", m.field, fields)
}
item := tsdb.MapItem{
Timestamp: p.Time.Unix(),
Value: p.Fields[m.field],
Value: value,
Fields: p.Fields,
Tags: p.Tags,
}
Expand Down

0 comments on commit 33b10fe

Please sign in to comment.