From 9fb92ebd87b1ebbd331325959793390d663674f2 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 11 Feb 2022 11:05:02 +0100 Subject: [PATCH 1/4] [exporter/datadog] Modify sketches tests so that they only use MapMetrics --- .../model/translator/sketches_test.go | 56 ++++++++++++++----- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/exporter/datadogexporter/internal/model/translator/sketches_test.go b/exporter/datadogexporter/internal/model/translator/sketches_test.go index f0482c71160d..efab28f87d53 100644 --- a/exporter/datadogexporter/internal/model/translator/sketches_test.go +++ b/exporter/datadogexporter/internal/model/translator/sketches_test.go @@ -29,6 +29,7 @@ import ( var _ SketchConsumer = (*sketchConsumer)(nil) type sketchConsumer struct { + mockTimeSeriesConsumer sk *quantile.Sketch } @@ -44,6 +45,30 @@ func (c *sketchConsumer) ConsumeSketch( c.sk = sketch } +func newHistogramMetric(p pdata.HistogramDataPoint) pdata.Metrics { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + ilms := rm.InstrumentationLibraryMetrics() + ilm := ilms.AppendEmpty() + metricsArray := ilm.Metrics() + m := metricsArray.AppendEmpty() + m.SetDataType(pdata.MetricDataTypeHistogram) + m.SetName("test") + + // Copy Histogram point + m.Histogram().SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta) + dps := m.Histogram().DataPoints() + np := dps.AppendEmpty() + np.SetCount(p.Count()) + np.SetSum(p.Sum()) + np.SetBucketCounts(p.BucketCounts()) + np.SetExplicitBounds(p.ExplicitBounds()) + np.SetTimestamp(p.Timestamp()) + + return md +} + func TestHistogramSketches(t *testing.T) { N := 1_000 M := 50_000.0 @@ -52,7 +77,7 @@ func TestHistogramSketches(t *testing.T) { // with support [0, N], generate an OTLP Histogram data point with N buckets, // (-inf, 0], (0, 1], ..., (N-1, N], (N, inf) // which contains N*M uniform samples of the distribution. - fromCDF := func(cdf func(x float64) float64) pdata.HistogramDataPoint { + fromCDF := func(cdf func(x float64) float64) pdata.Metrics { p := pdata.NewHistogramDataPoint() bounds := make([]float64, N+1) buckets := make([]uint64, N+2) @@ -70,7 +95,7 @@ func TestHistogramSketches(t *testing.T) { p.SetExplicitBounds(bounds) p.SetBucketCounts(buckets) p.SetCount(count) - return p + return newHistogramMetric(p) } tests := []struct { @@ -106,12 +131,11 @@ func TestHistogramSketches(t *testing.T) { cfg := quantile.Default() ctx := context.Background() tr := newTranslator(t, zap.NewNop()) - dims := metricsDimensions{name: "test"} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p := fromCDF(test.cdf) + md := fromCDF(test.cdf) consumer := &sketchConsumer{} - tr.getSketchBuckets(ctx, consumer, dims, p, true) + tr.MapMetrics(ctx, md, consumer) sk := consumer.sk // Check the minimum is 0.0 @@ -129,6 +153,7 @@ func TestHistogramSketches(t *testing.T) { } cumulSum := uint64(0) + p := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Histogram().DataPoints().At(0) for i := 0; i < len(p.BucketCounts())-3; i++ { { q := float64(cumulSum) / float64(p.Count()) * (1 - tol) @@ -160,52 +185,53 @@ func TestInfiniteBounds(t *testing.T) { tests := []struct { name string - getHist func() pdata.HistogramDataPoint + getHist func() pdata.Metrics }{ { name: "(-inf, inf): 100", - getHist: func() pdata.HistogramDataPoint { + getHist: func() pdata.Metrics { p := pdata.NewHistogramDataPoint() p.SetExplicitBounds([]float64{}) p.SetBucketCounts([]uint64{100}) p.SetCount(100) p.SetSum(0) - return p + return newHistogramMetric(p) }, }, { name: "(-inf, 0]: 100, (0, +inf]: 100", - getHist: func() pdata.HistogramDataPoint { + getHist: func() pdata.Metrics { p := pdata.NewHistogramDataPoint() p.SetExplicitBounds([]float64{0}) p.SetBucketCounts([]uint64{100, 100}) p.SetCount(200) p.SetSum(0) - return p + return newHistogramMetric(p) }, }, { name: "(-inf, -1]: 100, (-1, 1]: 10, (1, +inf]: 100", - getHist: func() pdata.HistogramDataPoint { + getHist: func() pdata.Metrics { p := pdata.NewHistogramDataPoint() p.SetExplicitBounds([]float64{-1, 1}) p.SetBucketCounts([]uint64{100, 10, 100}) p.SetCount(210) p.SetSum(0) - return p + return newHistogramMetric(p) }, }, } ctx := context.Background() tr := newTranslator(t, zap.NewNop()) - dims := metricsDimensions{name: "test"} for _, testInstance := range tests { t.Run(testInstance.name, func(t *testing.T) { - p := testInstance.getHist() + md := testInstance.getHist() consumer := &sketchConsumer{} - tr.getSketchBuckets(ctx, consumer, dims, p, true) + tr.MapMetrics(ctx, md, consumer) sk := consumer.sk + + p := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Histogram().DataPoints().At(0) assert.InDelta(t, sk.Basic.Sum, p.Sum(), 1) assert.Equal(t, uint64(sk.Basic.Cnt), p.Count()) }) From 1338c49440df8a8f99d5ee7e3e3eb7d47ae555e0 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 11 Feb 2022 15:11:13 +0100 Subject: [PATCH 2/4] [exporter/datadog] Use exact sum, count and average on sketches --- .../model/translator/metrics_translator.go | 58 +++++--- .../translator/metrics_translator_test.go | 32 ++--- .../model/translator/sketches_test.go | 136 ++++++++++++++++++ 3 files changed, 193 insertions(+), 33 deletions(-) diff --git a/exporter/datadogexporter/internal/model/translator/metrics_translator.go b/exporter/datadogexporter/internal/model/translator/metrics_translator.go index 3aa230e47914..db869503dd8a 100644 --- a/exporter/datadogexporter/internal/model/translator/metrics_translator.go +++ b/exporter/datadogexporter/internal/model/translator/metrics_translator.go @@ -158,11 +158,21 @@ func getBounds(p pdata.HistogramDataPoint, idx int) (lowerBound float64, upperBo return } +type histogramInfo struct { + // sum of histogram (exact) + sum float64 + // count of histogram (exact) + count uint64 + // ok to use + ok bool +} + func (t *Translator) getSketchBuckets( ctx context.Context, consumer SketchConsumer, pointDims metricsDimensions, p pdata.HistogramDataPoint, + histInfo histogramInfo, delta bool, ) { startTs := uint64(p.StartTimestamp()) @@ -199,6 +209,12 @@ func (t *Translator) getSketchBuckets( sketch := as.Finish() if sketch != nil { + if histInfo.ok { + // override approximate sum, count and average in sketch with exact values if available. + sketch.Basic.Cnt = int64(histInfo.count) + sketch.Basic.Sum = histInfo.sum + sketch.Basic.Avg = sketch.Basic.Sum / float64(sketch.Basic.Cnt) + } consumer.ConsumeSketch(ctx, pointDims.name, ts, sketch, pointDims.tags, pointDims.host) } } @@ -257,33 +273,41 @@ func (t *Translator) mapHistogramMetrics( ts := uint64(p.Timestamp()) pointDims := dims.WithAttributeMap(p.Attributes()) - if t.cfg.SendCountSum { - count := float64(p.Count()) - countDims := pointDims.WithSuffix("count") + histInfo := histogramInfo{ok: true} + + countDims := pointDims.WithSuffix("count") + if delta { + histInfo.count = p.Count() + } else if dx, ok := t.prevPts.Diff(countDims, startTs, ts, float64(p.Count())); ok { + histInfo.count = uint64(dx) + } else { // not ok + histInfo.ok = false + } + + sumDims := pointDims.WithSuffix("sum") + if !t.isSkippable(sumDims.name, p.Sum()) { if delta { - consumer.ConsumeTimeSeries(ctx, countDims.name, Count, ts, count, countDims.tags, countDims.host) - } else if dx, ok := t.prevPts.Diff(countDims, startTs, ts, count); ok { - consumer.ConsumeTimeSeries(ctx, countDims.name, Count, ts, dx, countDims.tags, countDims.host) + histInfo.sum = p.Sum() + } else if dx, ok := t.prevPts.Diff(sumDims, startTs, ts, p.Sum()); ok { + histInfo.sum = dx + } else { // not ok + histInfo.ok = false } + } else { // skippable + histInfo.ok = false } - if t.cfg.SendCountSum { - sum := p.Sum() - sumDims := pointDims.WithSuffix("sum") - if !t.isSkippable(sumDims.name, p.Sum()) { - if delta { - consumer.ConsumeTimeSeries(ctx, sumDims.name, Count, ts, sum, sumDims.tags, sumDims.host) - } else if dx, ok := t.prevPts.Diff(sumDims, startTs, ts, sum); ok { - consumer.ConsumeTimeSeries(ctx, sumDims.name, Count, ts, dx, sumDims.tags, sumDims.host) - } - } + if t.cfg.SendCountSum && histInfo.ok { + // We only send the sum and count if both values were ok. + consumer.ConsumeTimeSeries(ctx, countDims.name, Count, ts, float64(histInfo.count), countDims.tags, countDims.host) + consumer.ConsumeTimeSeries(ctx, sumDims.name, Count, ts, histInfo.sum, sumDims.tags, sumDims.host) } switch t.cfg.HistMode { case HistogramModeCounters: t.getLegacyBuckets(ctx, consumer, pointDims, p, delta) case HistogramModeDistributions: - t.getSketchBuckets(ctx, consumer, pointDims, p, delta) + t.getSketchBuckets(ctx, consumer, pointDims, p, histInfo, delta) } } } diff --git a/exporter/datadogexporter/internal/model/translator/metrics_translator_test.go b/exporter/datadogexporter/internal/model/translator/metrics_translator_test.go index 163d2b60be05..136b0a823cc8 100644 --- a/exporter/datadogexporter/internal/model/translator/metrics_translator_test.go +++ b/exporter/datadogexporter/internal/model/translator/metrics_translator_test.go @@ -563,9 +563,9 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { newSketch(dims, uint64(ts), summary.Summary{ Min: 0, Max: 0, - Sum: 0, - Avg: 0, - Cnt: 20, + Sum: point.Sum(), + Avg: point.Sum() / float64(point.Count()), + Cnt: int64(point.Count()), }), } @@ -573,8 +573,8 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { newSketch(dimsTags, uint64(ts), summary.Summary{ Min: 0, Max: 0, - Sum: 0, - Avg: 0, + Sum: point.Sum(), + Avg: point.Sum() / float64(point.Count()), Cnt: 20, }), } @@ -716,8 +716,8 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { newSketch(dims, uint64(seconds(2)), summary.Summary{ Min: 0, Max: 0, - Sum: 0, - Avg: 0, + Sum: 20, + Avg: 20.0 / 30.0, Cnt: 30, }), } @@ -1183,8 +1183,8 @@ func TestMapMetrics(t *testing.T) { newSketchWithHostname("double.histogram", summary.Summary{ Min: 0, Max: 0, - Sum: 0, - Avg: 0, + Sum: math.Phi, + Avg: math.Phi / 20, Cnt: 20, }, attrTags), }, @@ -1213,8 +1213,8 @@ func TestMapMetrics(t *testing.T) { newSketchWithHostname("double.histogram", summary.Summary{ Min: 0, Max: 0, - Sum: 0, - Avg: 0, + Sum: math.Phi, + Avg: math.Phi / 20.0, Cnt: 20, }, attrTags), }, @@ -1243,8 +1243,8 @@ func TestMapMetrics(t *testing.T) { newSketchWithHostname("double.histogram", summary.Summary{ Min: 0, Max: 0, - Sum: 0, - Avg: 0, + Sum: math.Phi, + Avg: math.Phi / 20, Cnt: 20, }, append(attrTags, ilTags...)), }, @@ -1273,8 +1273,8 @@ func TestMapMetrics(t *testing.T) { newSketchWithHostname("double.histogram", summary.Summary{ Min: 0, Max: 0, - Sum: 0, - Avg: 0, + Sum: math.Phi, + Avg: math.Phi / 20, Cnt: 20, }, append(attrTags, ilTags...)), }, @@ -1428,5 +1428,5 @@ func TestNaNMetrics(t *testing.T) { }) // One metric type was unknown or unsupported - assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 6) + assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 7) } diff --git a/exporter/datadogexporter/internal/model/translator/sketches_test.go b/exporter/datadogexporter/internal/model/translator/sketches_test.go index efab28f87d53..4af95d824e3a 100644 --- a/exporter/datadogexporter/internal/model/translator/sketches_test.go +++ b/exporter/datadogexporter/internal/model/translator/sketches_test.go @@ -181,6 +181,142 @@ func TestHistogramSketches(t *testing.T) { } } +func TestExactSumCount(t *testing.T) { + tests := []struct { + name string + getHist func() pdata.Metrics + sum float64 + count uint64 + }{} + + // Add tests for issue 6129: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/6129 + tests = append(tests, + struct { + name string + getHist func() pdata.Metrics + sum float64 + count uint64 + }{ + name: "Uniform distribution (delta)", + getHist: func() pdata.Metrics { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + ilms := rm.InstrumentationLibraryMetrics() + ilm := ilms.AppendEmpty() + metricsArray := ilm.Metrics() + m := metricsArray.AppendEmpty() + m.SetDataType(pdata.MetricDataTypeHistogram) + m.SetName("test") + m.Histogram().SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta) + dp := m.Histogram().DataPoints() + p := dp.AppendEmpty() + p.SetExplicitBounds([]float64{0, 5_000, 10_000, 15_000, 20_000}) + // Points from contrib issue 6129: 0, 5_000, 10_000, 15_000, 20_000 + p.SetBucketCounts([]uint64{0, 1, 1, 1, 1, 1}) + p.SetCount(5) + p.SetSum(50_000) + return md + }, + sum: 50_000, + count: 5, + }, + + struct { + name string + getHist func() pdata.Metrics + sum float64 + count uint64 + }{ + name: "Uniform distribution (cumulative)", + getHist: func() pdata.Metrics { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + ilms := rm.InstrumentationLibraryMetrics() + ilm := ilms.AppendEmpty() + metricsArray := ilm.Metrics() + m := metricsArray.AppendEmpty() + m.SetDataType(pdata.MetricDataTypeHistogram) + m.SetName("test") + m.Histogram().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + dp := m.Histogram().DataPoints() + // Points from contrib issue 6129: 0, 5_000, 10_000, 15_000, 20_000 repeated. + bounds := []float64{0, 5_000, 10_000, 15_000, 20_000} + for i := 1; i <= 2; i++ { + p := dp.AppendEmpty() + p.SetExplicitBounds(bounds) + cnt := uint64(i) + p.SetBucketCounts([]uint64{0, cnt, cnt, cnt, cnt, cnt}) + p.SetCount(uint64(5 * i)) + p.SetSum(float64(50_000 * i)) + } + return md + }, + sum: 50_000, + count: 5, + }) + + // Add tests for issue 7065: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/7065 + for pos, val := range []float64{500, 5_000, 50_000} { + pos := pos + val := val + tests = append(tests, struct { + name string + getHist func() pdata.Metrics + sum float64 + count uint64 + }{ + name: fmt.Sprintf("Issue 7065 (%d, %f)", pos, val), + getHist: func() pdata.Metrics { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + ilms := rm.InstrumentationLibraryMetrics() + ilm := ilms.AppendEmpty() + metricsArray := ilm.Metrics() + m := metricsArray.AppendEmpty() + m.SetDataType(pdata.MetricDataTypeHistogram) + m.SetName("test") + + m.Histogram().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + bounds := []float64{1_000, 10_000, 100_000} + + dp := m.Histogram().DataPoints() + for i := 0; i < 2; i++ { + p := dp.AppendEmpty() + p.SetExplicitBounds(bounds) + counts := []uint64{0, 0, 0, 0} + counts[pos] = uint64(i) + t.Logf("pos: %d, val: %f, counts: %v", pos, val, counts) + p.SetBucketCounts(counts) + p.SetCount(uint64(i)) + p.SetSum(val * float64(i)) + } + return md + }, + sum: val, + count: 1, + }) + } + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + for _, testInstance := range tests { + t.Run(testInstance.name, func(t *testing.T) { + md := testInstance.getHist() + consumer := &sketchConsumer{} + tr.MapMetrics(ctx, md, consumer) + sk := consumer.sk + + assert.Equal(t, testInstance.count, uint64(sk.Basic.Cnt), "counts differ") + assert.Equal(t, testInstance.sum, sk.Basic.Sum, "sums differ") + avg := testInstance.sum / float64(testInstance.count) + assert.Equal(t, avg, sk.Basic.Avg, "averages differ") + }) + } +} + func TestInfiniteBounds(t *testing.T) { tests := []struct { From 9285a94819cc52ba365ed302b46489cbf670202b Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 11 Feb 2022 16:54:16 +0100 Subject: [PATCH 3/4] Add Changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee9fa9dfdce1..76e149d39ca5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - `coralogixexporter`: Update readme (#7785) - `awscloudwatchlogsexporter`: Remove name from aws cloudwatch logs exporter (#7554) - `hostreceiver/memoryscraper`: Add memory.utilization (#6221) +- `datadogexporter`: Use exact sum, count and average on Datadog distributions (#7830) ### 🛑 Breaking changes 🛑 From b064db3fb2a87f836ee89991791fa12eb2bf0c67 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 15 Feb 2022 11:34:53 +0100 Subject: [PATCH 4/4] Empty commit to re-trigger CI