Skip to content

Commit

Permalink
Fix prometheus histogram rate overflows (elastic#17753)
Browse files Browse the repository at this point in the history
Fix some overflows on Prometheus histogram rate calculations.
They could be caused by:
* New buckets added to existing histograms on runtime, this happens at
  least with CockroachDB (see elastic#17736).
* Buckets with bigger upper limits have lower counters. This is wrong and
  has been only reproduced this on tests, but handling it just in case to avoid
  losing other data if this happens with some service.

Rate calculation methods return now also a boolean to be able to differenciate
if a zero value is caused because it was the first call, or because it the rate is
actually zero.
  • Loading branch information
jsoriano committed Apr 17, 2020
1 parent 15ad134 commit 0afffa8
Show file tree
Hide file tree
Showing 6 changed files with 450 additions and 24 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix "ID" event generator of Google Cloud module {issue}17160[17160] {pull}17608[17608]
- Add privileged option for Auditbeat in Openshift {pull}17637[17637]
- Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624]
- Add a switch to the driver definition on SQL module to use pretty names {pull}17378[17378]
- Add a switch to the driver definition on SQL module to use pretty names. {pull}17378[17378]
- Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753]

*Packetbeat*

Expand Down
32 changes: 18 additions & 14 deletions x-pack/metricbeat/module/prometheus/collector/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ type CounterCache interface {
Stop()

// RateUint64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0 on the first call
RateUint64(counterName string, value uint64) uint64
// and the value that was given in a previous call, and true if a previous value existed.
// It will return 0 and false on the first call.
RateUint64(counterName string, value uint64) (uint64, bool)

// RateFloat64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0.0 on the first call
RateFloat64(counterName string, value float64) float64
// and the value that was given in a previous call, and true if a previous value existed.
// It will return 0 and false on the first call.
RateFloat64(counterName string, value float64) (float64, bool)
}

type counterCache struct {
Expand All @@ -47,35 +49,37 @@ func NewCounterCache(timeout time.Duration) CounterCache {
}

// RateUint64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0 on the first call
func (c *counterCache) RateUint64(counterName string, value uint64) uint64 {
// and the value that was given in a previous call, and true if a previous value existed.
// It will return 0 and false on the first call.
func (c *counterCache) RateUint64(counterName string, value uint64) (uint64, bool) {
prev := c.ints.PutWithTimeout(counterName, value, c.timeout)
if prev != nil {
if prev.(uint64) > value {
// counter reset
return 0
return 0, true
}
return value - prev.(uint64)
return value - prev.(uint64), true
}

// first put for this value, return rate of 0
return 0
return 0, false
}

// RateFloat64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0.0 on the first call
func (c *counterCache) RateFloat64(counterName string, value float64) float64 {
// and the value that was given in a previous call, and true if a previous value existed.
// It will return 0 and false on the first call.
func (c *counterCache) RateFloat64(counterName string, value float64) (float64, bool) {
prev := c.floats.PutWithTimeout(counterName, value, c.timeout)
if prev != nil {
if prev.(float64) > value {
// counter reset
return 0
return 0, true
}
return value - prev.(float64)
return value - prev.(float64), true
}

// first put for this value, return rate of 0
return 0
return 0, false
}

// Start the cache cleanup worker. It mus be called once before start using
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/prometheus/collector/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ func Test_CounterCache(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
for i, val := range tt.valuesUint64 {
want := tt.expectedUin64[i]
if got := tt.counterCache.RateUint64(tt.counterName, val); got != want {
if got, _ := tt.counterCache.RateUint64(tt.counterName, val); got != want {
t.Errorf("counterCache.RateUint64() = %v, want %v", got, want)
}
}
for i, val := range tt.valuesFloat64 {
want := tt.expectedFloat64[i]
if got := tt.counterCache.RateFloat64(tt.counterName, val); got != want {
if got, _ := tt.counterCache.RateFloat64(tt.counterName, val); got != want {
t.Errorf("counterCache.RateFloat64() = %v, want %v", got, want)
}
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/prometheus/collector/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (g *typedGenerator) rateCounterUint64(name string, labels common.MapStr, va
}

if g.rateCounters {
d["rate"] = g.counterCache.RateUint64(name+labels.String(), value)
d["rate"], _ = g.counterCache.RateUint64(name+labels.String(), value)
}

return d
Expand All @@ -184,7 +184,7 @@ func (g *typedGenerator) rateCounterFloat64(name string, labels common.MapStr, v
}

if g.rateCounters {
d["rate"] = g.counterCache.RateFloat64(name+labels.String(), value)
d["rate"], _ = g.counterCache.RateFloat64(name+labels.String(), value)
}

return d
Expand Down
25 changes: 20 additions & 5 deletions x-pack/metricbeat/module/prometheus/collector/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histo

// calculate centroids and rated counts
var lastUpper, prevUpper float64
var sumCount uint64
var sumCount, prevCount uint64
for _, bucket := range histogram.GetBucket() {
// Ignore non-numbers
if bucket.GetCumulativeCount() == uint64(math.NaN()) || bucket.GetCumulativeCount() == uint64(math.Inf(0)) {
Expand All @@ -50,10 +50,25 @@ func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histo
lastUpper = bucket.GetUpperBound()
}

// take count for this period (rate) + deacumulate
count := cc.RateUint64(name+labels.String()+fmt.Sprintf("%f", bucket.GetUpperBound()), bucket.GetCumulativeCount()) - sumCount
counts = append(counts, count)
sumCount += count
// Take count for this period (rate)
countRate, found := cc.RateUint64(name+labels.String()+fmt.Sprintf("%f", bucket.GetUpperBound()), bucket.GetCumulativeCount())

switch {
case !found:
// This is a new bucket, consider it zero by now, but still increase the
// sum to don't deviate following buckets that are not new.
counts = append(counts, 0)
sumCount += bucket.GetCumulativeCount() - prevCount
case countRate < sumCount:
// This should never happen, this means something is wrong in the
// prometheus response. Handle it to avoid overflowing when deaccumulating.
counts = append(counts, 0)
default:
// Store the deaccumulated count.
counts = append(counts, countRate-sumCount)
sumCount = countRate
}
prevCount = bucket.GetCumulativeCount()
}

res := common.MapStr{
Expand Down
Loading

0 comments on commit 0afffa8

Please sign in to comment.