From de216a05f1dc1769adaae9e365c895fd9e220c49 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 3 Jun 2016 10:15:22 -0600 Subject: [PATCH 1/2] update batch group stats to be by len(b.Points) --- edge.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/edge.go b/edge.go index 1cf0612ee..ba449b16d 100644 --- a/edge.go +++ b/edge.go @@ -147,7 +147,7 @@ func (e *Edge) NextPoint() (p models.Point, ok bool) { case p, ok = <-e.stream: if ok { e.emitted.Add(1) - e.incEmitted(p.Group, p.Tags, p.Dimensions) + e.incEmitted(p.Group, p.Tags, p.Dimensions, 1) } } return @@ -159,7 +159,7 @@ func (e *Edge) NextBatch() (b models.Batch, ok bool) { case b, ok = <-e.batch: if ok { e.emitted.Add(1) - e.incEmitted(b.Group, b.Tags, b.PointDimensions()) + e.incEmitted(b.Group, b.Tags, b.PointDimensions(), int64(len(b.Points))) } } return @@ -167,7 +167,7 @@ func (e *Edge) NextBatch() (b models.Batch, ok bool) { func (e *Edge) CollectPoint(p models.Point) error { e.collected.Add(1) - e.incCollected(p.Group, p.Tags, p.Dimensions) + e.incCollected(p.Group, p.Tags, p.Dimensions, 1) select { case <-e.aborted: return ErrAborted @@ -178,7 +178,7 @@ func (e *Edge) CollectPoint(p models.Point) error { func (e *Edge) CollectBatch(b models.Batch) error { e.collected.Add(1) - e.incCollected(b.Group, b.Tags, b.PointDimensions()) + e.incCollected(b.Group, b.Tags, b.PointDimensions(), int64(len(b.Points))) select { case <-e.aborted: return ErrAborted @@ -188,17 +188,17 @@ func (e *Edge) CollectBatch(b models.Batch) error { } // Increment the emitted count of the group for this edge. -func (e *Edge) incEmitted(group models.GroupID, tags models.Tags, dims []string) { +func (e *Edge) incEmitted(group models.GroupID, tags models.Tags, dims []string, count int64) { // we are "manually" calling Unlock() and not using defer, because this method is called // in hot locations (NextPoint/CollectPoint) and defer have some performance penalty e.groupMu.Lock() if stats, ok := e.groupStats[group]; ok { - stats.emitted++ + stats.emitted += count e.groupMu.Unlock() } else { stats = &edgeStat{ - emitted: 1, + emitted: count, tags: tags, dims: dims, } @@ -208,17 +208,17 @@ func (e *Edge) incEmitted(group models.GroupID, tags models.Tags, dims []string) } // Increment the collected count of the group for this edge. -func (e *Edge) incCollected(group models.GroupID, tags models.Tags, dims []string) { +func (e *Edge) incCollected(group models.GroupID, tags models.Tags, dims []string, count int64) { // we are "manually" calling Unlock() and not using defer, because this method is called // in hot locations (NextPoint/CollectPoint) and defer have some performance penalty e.groupMu.Lock() if stats, ok := e.groupStats[group]; ok { - stats.collected++ + stats.collected += count e.groupMu.Unlock() } else { stats = &edgeStat{ - collected: 1, + collected: count, tags: tags, dims: dims, } From 719a0c95025b691623a077ef06a95c734c1e54f7 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 3 Jun 2016 13:59:16 -0600 Subject: [PATCH 2/2] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 982188066..4ecb2deac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -166,6 +166,8 @@ batch - [#586](https://github.com/influxdata/kapacitor/pull/586): Add spread stateful function. thanks @upccup! - [#600](https://github.com/influxdata/kapacitor/pull/600): Add close http response after handler laert post, thanks @jsvisa! - [#606](https://github.com/influxdata/kapacitor/pull/606): Add Holt-Winters forecasting method. +- [#605](https://github.com/influxdata/kapacitor/pull/605): BREAKING: StatsNode for batch edge now count the number of points in a batch instead of count batches as a whole. + This is only breaking if you have a deadman switch configured on a batch edge. ### Bugfixes