diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8e751885aae..ec007636b24 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -157,7 +157,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix "ID" event generator of Google Cloud module {issue}17160[17160] {pull}17608[17608] - Add privileged option for Auditbeat in Openshift {pull}17637[17637] - Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624] -- Add a switch to the driver definition on SQL module to use pretty names {pull}17378[17378] +- Add a switch to the driver definition on SQL module to use pretty names. {pull}17378[17378] +- Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753] *Packetbeat* diff --git a/x-pack/metricbeat/module/prometheus/collector/counter.go b/x-pack/metricbeat/module/prometheus/collector/counter.go index f39e6de1763..6f0f72d80eb 100644 --- a/x-pack/metricbeat/module/prometheus/collector/counter.go +++ b/x-pack/metricbeat/module/prometheus/collector/counter.go @@ -22,12 +22,14 @@ type CounterCache interface { Stop() // RateUint64 returns, for a given counter name, the difference between the given value - // and the value that was given in a previous call. It will return 0 on the first call - RateUint64(counterName string, value uint64) uint64 + // and the value that was given in a previous call, and true if a previous value existed. + // It will return 0 and false on the first call. + RateUint64(counterName string, value uint64) (uint64, bool) // RateFloat64 returns, for a given counter name, the difference between the given value - // and the value that was given in a previous call. It will return 0.0 on the first call - RateFloat64(counterName string, value float64) float64 + // and the value that was given in a previous call, and true if a previous value existed. + // It will return 0 and false on the first call. + RateFloat64(counterName string, value float64) (float64, bool) } type counterCache struct { @@ -47,35 +49,37 @@ func NewCounterCache(timeout time.Duration) CounterCache { } // RateUint64 returns, for a given counter name, the difference between the given value -// and the value that was given in a previous call. It will return 0 on the first call -func (c *counterCache) RateUint64(counterName string, value uint64) uint64 { +// and the value that was given in a previous call, and true if a previous value existed. +// It will return 0 and false on the first call. +func (c *counterCache) RateUint64(counterName string, value uint64) (uint64, bool) { prev := c.ints.PutWithTimeout(counterName, value, c.timeout) if prev != nil { if prev.(uint64) > value { // counter reset - return 0 + return 0, true } - return value - prev.(uint64) + return value - prev.(uint64), true } // first put for this value, return rate of 0 - return 0 + return 0, false } // RateFloat64 returns, for a given counter name, the difference between the given value -// and the value that was given in a previous call. It will return 0.0 on the first call -func (c *counterCache) RateFloat64(counterName string, value float64) float64 { +// and the value that was given in a previous call, and true if a previous value existed. +// It will return 0 and false on the first call. +func (c *counterCache) RateFloat64(counterName string, value float64) (float64, bool) { prev := c.floats.PutWithTimeout(counterName, value, c.timeout) if prev != nil { if prev.(float64) > value { // counter reset - return 0 + return 0, true } - return value - prev.(float64) + return value - prev.(float64), true } // first put for this value, return rate of 0 - return 0 + return 0, false } // Start the cache cleanup worker. It mus be called once before start using diff --git a/x-pack/metricbeat/module/prometheus/collector/counter_test.go b/x-pack/metricbeat/module/prometheus/collector/counter_test.go index 3aed4a8d017..dc4e9cd6423 100644 --- a/x-pack/metricbeat/module/prometheus/collector/counter_test.go +++ b/x-pack/metricbeat/module/prometheus/collector/counter_test.go @@ -50,13 +50,13 @@ func Test_CounterCache(t *testing.T) { t.Run(tt.name, func(t *testing.T) { for i, val := range tt.valuesUint64 { want := tt.expectedUin64[i] - if got := tt.counterCache.RateUint64(tt.counterName, val); got != want { + if got, _ := tt.counterCache.RateUint64(tt.counterName, val); got != want { t.Errorf("counterCache.RateUint64() = %v, want %v", got, want) } } for i, val := range tt.valuesFloat64 { want := tt.expectedFloat64[i] - if got := tt.counterCache.RateFloat64(tt.counterName, val); got != want { + if got, _ := tt.counterCache.RateFloat64(tt.counterName, val); got != want { t.Errorf("counterCache.RateFloat64() = %v, want %v", got, want) } } diff --git a/x-pack/metricbeat/module/prometheus/collector/data.go b/x-pack/metricbeat/module/prometheus/collector/data.go index 8e205b66a2d..8f747ce16ff 100644 --- a/x-pack/metricbeat/module/prometheus/collector/data.go +++ b/x-pack/metricbeat/module/prometheus/collector/data.go @@ -171,7 +171,7 @@ func (g *typedGenerator) rateCounterUint64(name string, labels common.MapStr, va } if g.rateCounters { - d["rate"] = g.counterCache.RateUint64(name+labels.String(), value) + d["rate"], _ = g.counterCache.RateUint64(name+labels.String(), value) } return d @@ -184,7 +184,7 @@ func (g *typedGenerator) rateCounterFloat64(name string, labels common.MapStr, v } if g.rateCounters { - d["rate"] = g.counterCache.RateFloat64(name+labels.String(), value) + d["rate"], _ = g.counterCache.RateFloat64(name+labels.String(), value) } return d diff --git a/x-pack/metricbeat/module/prometheus/collector/histogram.go b/x-pack/metricbeat/module/prometheus/collector/histogram.go index 8a62cbf8b97..63ed3bf69ce 100644 --- a/x-pack/metricbeat/module/prometheus/collector/histogram.go +++ b/x-pack/metricbeat/module/prometheus/collector/histogram.go @@ -33,7 +33,7 @@ func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histo // calculate centroids and rated counts var lastUpper, prevUpper float64 - var sumCount uint64 + var sumCount, prevCount uint64 for _, bucket := range histogram.GetBucket() { // Ignore non-numbers if bucket.GetCumulativeCount() == uint64(math.NaN()) || bucket.GetCumulativeCount() == uint64(math.Inf(0)) { @@ -50,10 +50,25 @@ func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histo lastUpper = bucket.GetUpperBound() } - // take count for this period (rate) + deacumulate - count := cc.RateUint64(name+labels.String()+fmt.Sprintf("%f", bucket.GetUpperBound()), bucket.GetCumulativeCount()) - sumCount - counts = append(counts, count) - sumCount += count + // Take count for this period (rate) + countRate, found := cc.RateUint64(name+labels.String()+fmt.Sprintf("%f", bucket.GetUpperBound()), bucket.GetCumulativeCount()) + + switch { + case !found: + // This is a new bucket, consider it zero by now, but still increase the + // sum to don't deviate following buckets that are not new. + counts = append(counts, 0) + sumCount += bucket.GetCumulativeCount() - prevCount + case countRate < sumCount: + // This should never happen, this means something is wrong in the + // prometheus response. Handle it to avoid overflowing when deaccumulating. + counts = append(counts, 0) + default: + // Store the deaccumulated count. + counts = append(counts, countRate-sumCount) + sumCount = countRate + } + prevCount = bucket.GetCumulativeCount() } res := common.MapStr{ diff --git a/x-pack/metricbeat/module/prometheus/collector/histogram_test.go b/x-pack/metricbeat/module/prometheus/collector/histogram_test.go new file mode 100644 index 00000000000..b0906068e76 --- /dev/null +++ b/x-pack/metricbeat/module/prometheus/collector/histogram_test.go @@ -0,0 +1,406 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package collector + +import ( + "testing" + "time" + + "github.com/golang/protobuf/proto" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// TestPromHistogramToES tests that calling promHistogramToES multiple +// times with the same cache produces each time the expected results. +func TestPromHistogramToES(t *testing.T) { + type sample struct { + histogram dto.Histogram + expected common.MapStr + } + + cases := map[string]struct { + samples []sample + }{ + "one histogram": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0}, + "values": []float64{0.495}, + }, + }, + }, + }, + "two histogram": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0}, + "values": []float64{0.495}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(12), + SampleSum: proto.Float64(10.123), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(12), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{2}, + "values": []float64{0.495}, + }, + }, + }, + }, + "new bucket on the go": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0}, + "values": []float64{0.495}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(13), + SampleSum: proto.Float64(15.23), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(12), + }, + // New bucket on the go + { + UpperBound: proto.Float64(9.99), + CumulativeCount: proto.Uint64(13), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{2, 0}, + "values": []float64{0.495, 5.49}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(15), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(13), + }, + { + UpperBound: proto.Float64(9.99), + CumulativeCount: proto.Uint64(15), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 1}, + "values": []float64{0.495, 5.49}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(16), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(13), + }, + { + UpperBound: proto.Float64(9.99), + CumulativeCount: proto.Uint64(16), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0, 1}, + "values": []float64{0.495, 5.49}, + }, + }, + }, + }, + "new smaller bucket on the go": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0}, + "values": []float64{0.495}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(13), + SampleSum: proto.Float64(15.23), + Bucket: []*dto.Bucket{ + // New bucket on the go + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(1), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(13), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0, 2}, + "values": []float64{0.045, 0.54}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(15), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(2), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(15), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 1}, + "values": []float64{0.045, 0.54}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(16), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(3), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(16), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 0}, + "values": []float64{0.045, 0.54}, + }, + }, + }, + }, + "new bucket between two other buckets on the go": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(0), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0, 0}, + "values": []float64{0.045, 0.54}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(13), + SampleSum: proto.Float64(15.23), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(1), + }, + // New bucket + { + UpperBound: proto.Float64(0.49), + CumulativeCount: proto.Uint64(2), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(13), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 0, 1}, + "values": []float64{0.045, 0.29000000000000004, 0.74}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(16), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(2), + }, + { + UpperBound: proto.Float64(0.49), + CumulativeCount: proto.Uint64(4), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(16), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 1, 1}, + "values": []float64{0.045, 0.29000000000000004, 0.74}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(18), + SampleSum: proto.Float64(16.33), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(3), + }, + { + UpperBound: proto.Float64(0.49), + CumulativeCount: proto.Uint64(5), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(18), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{1, 0, 1}, + "values": []float64{0.045, 0.29000000000000004, 0.74}, + }, + }, + }, + }, + "wrong buckets": { + samples: []sample{ + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(10), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(8), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{0, 0}, + "values": []float64{0.045, 0.54}, + }, + }, + { + histogram: dto.Histogram{ + SampleCount: proto.Uint64(12), + SampleSum: proto.Float64(10.45), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.09), + CumulativeCount: proto.Uint64(12), + }, + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(8), + }, + }, + }, + expected: common.MapStr{ + "counts": []uint64{2, 0}, + "values": []float64{0.045, 0.54}, + }, + }, + }, + }, + } + + metricName := "somemetric" + labels := common.MapStr{} + + for title, c := range cases { + t.Run(title, func(t *testing.T) { + cache := NewCounterCache(120 * time.Minute) + + for i, s := range c.samples { + t.Logf("#%d: %+v", i, s.histogram) + result := promHistogramToES(cache, metricName, labels, &s.histogram) + assert.EqualValues(t, s.expected, result) + } + }) + } +}