From fddf49870fd99b8ea0f1453b78a7f21e4984ebeb Mon Sep 17 00:00:00 2001 From: Hyunuk Lim Date: Tue, 4 Jan 2022 16:02:07 -0800 Subject: [PATCH] fix: drop a metric if it has a staleness flag (#6977) --- CHANGELOG.md | 1 + exporter/prometheusexporter/accumulator.go | 16 +++ .../prometheusexporter/accumulator_test.go | 105 ++++++++++++++++++ 3 files changed, 122 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf0f57730892..c5d044ffdae0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ## 💡 Enhancements 💡 - `prometheusremotewriteexporter`: Handling Staleness flag from OTLP (#6679) +- `prometheusexporter`: Handling Staleness flag from OTLP (#6805) - `mysqlreceiver`: Add Integration test (#6916) - `datadogexporter`: Add compatibility with ECS Fargate semantic conventions (#6670) - `k8s_observer`: discover k8s.node endpoints (#6820) diff --git a/exporter/prometheusexporter/accumulator.go b/exporter/prometheusexporter/accumulator.go index bcc380503811..7a865298cf01 100644 --- a/exporter/prometheusexporter/accumulator.go +++ b/exporter/prometheusexporter/accumulator.go @@ -105,6 +105,10 @@ func (a *lastValueAccumulator) accumulateSummary(metric pdata.Metric, il pdata.I ip := dps.At(i) signature := timeseriesSignature(il.Name(), metric, ip.Attributes()) + if ip.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + a.registeredMetrics.Delete(signature) + return 0 + } v, ok := a.registeredMetrics.Load(signature) stalePoint := ok && @@ -130,6 +134,10 @@ func (a *lastValueAccumulator) accumulateGauge(metric pdata.Metric, il pdata.Ins ip := dps.At(i) signature := timeseriesSignature(il.Name(), metric, ip.Attributes()) + if ip.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + a.registeredMetrics.Delete(signature) + return 0 + } v, ok := a.registeredMetrics.Load(signature) if !ok { @@ -167,6 +175,10 @@ func (a *lastValueAccumulator) accumulateSum(metric pdata.Metric, il pdata.Instr ip := dps.At(i) signature := timeseriesSignature(il.Name(), metric, ip.Attributes()) + if ip.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + a.registeredMetrics.Delete(signature) + return 0 + } v, ok := a.registeredMetrics.Load(signature) if !ok { @@ -208,6 +220,10 @@ func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il ip := dps.At(i) signature := timeseriesSignature(il.Name(), metric, ip.Attributes()) + if ip.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { + a.registeredMetrics.Delete(signature) + return 0 + } v, ok := a.registeredMetrics.Load(signature) if !ok { diff --git a/exporter/prometheusexporter/accumulator_test.go b/exporter/prometheusexporter/accumulator_test.go index e704a80acd92..1d1790ced845 100644 --- a/exporter/prometheusexporter/accumulator_test.go +++ b/exporter/prometheusexporter/accumulator_test.go @@ -16,6 +16,7 @@ package prometheusexporter import ( "log" + "strings" "testing" "time" @@ -216,6 +217,102 @@ func TestAccumulateMetrics(t *testing.T) { dp.SetTimestamp(pdata.NewTimestampFromTime(ts)) }, }, + { + name: "Summary", + metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("test_metric") + metric.SetDataType(pdata.MetricDataTypeSummary) + metric.SetDescription("test description") + dp := metric.Summary().DataPoints().AppendEmpty() + dp.SetCount(10) + dp.SetSum(0.012) + dp.SetCount(10) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetTimestamp(pdata.NewTimestampFromTime(ts)) + fillQuantileValue := func(pN, value float64, dest pdata.ValueAtQuantile) { + dest.SetQuantile(pN) + dest.SetValue(value) + } + fillQuantileValue(0.50, 190, dp.QuantileValues().AppendEmpty()) + fillQuantileValue(0.99, 817, dp.QuantileValues().AppendEmpty()) + }, + }, + { + name: "StalenessMarkerGauge", + metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("test_metric") + metric.SetDataType(pdata.MetricDataTypeGauge) + metric.SetDescription("test description") + dp := metric.Gauge().DataPoints().AppendEmpty() + dp.SetDoubleVal(v) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetTimestamp(pdata.NewTimestampFromTime(ts)) + dp.SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue)) + }, + }, + { + name: "StalenessMarkerSum", + metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("test_metric") + metric.SetDataType(pdata.MetricDataTypeSum) + metric.SetDescription("test description") + metric.Sum().SetIsMonotonic(false) + metric.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetDoubleVal(v) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetTimestamp(pdata.NewTimestampFromTime(ts)) + dp.SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue)) + }, + }, + { + name: "StalenessMarkerHistogram", + metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("test_metric") + metric.SetDataType(pdata.MetricDataTypeHistogram) + metric.Histogram().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + metric.SetDescription("test description") + dp := metric.Histogram().DataPoints().AppendEmpty() + dp.SetBucketCounts([]uint64{5, 2}) + dp.SetCount(7) + dp.SetExplicitBounds([]float64{3.5, 10.0}) + dp.SetSum(v) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetTimestamp(pdata.NewTimestampFromTime(ts)) + dp.SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue)) + }, + }, + { + name: "StalenessMarkerSummary", + metric: func(ts time.Time, v float64, metrics pdata.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("test_metric") + metric.SetDataType(pdata.MetricDataTypeSummary) + metric.SetDescription("test description") + dp := metric.Summary().DataPoints().AppendEmpty() + dp.SetCount(10) + dp.SetSum(0.012) + dp.SetCount(10) + dp.Attributes().InsertString("label_1", "1") + dp.Attributes().InsertString("label_2", "2") + dp.SetTimestamp(pdata.NewTimestampFromTime(ts)) + dp.SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue)) + fillQuantileValue := func(pN, value float64, dest pdata.ValueAtQuantile) { + dest.SetQuantile(pN) + dest.SetValue(value) + } + fillQuantileValue(0.50, 190, dp.QuantileValues().AppendEmpty()) + fillQuantileValue(0.99, 817, dp.QuantileValues().AppendEmpty()) + }, + }, } for _, tt := range tests { @@ -235,6 +332,10 @@ func TestAccumulateMetrics(t *testing.T) { // 2 metric arrived n := a.Accumulate(resourceMetrics2) + if strings.HasPrefix(tt.name, "StalenessMarker") { + require.Equal(t, 0, n) + return + } require.Equal(t, 1, n) m2Labels, _, m2Value, m2Temporality, m2IsMonotonic := getMetricProperties(ilm2.Metrics().At(0)) @@ -322,6 +423,10 @@ func getMetricProperties(metric pdata.Metric) ( value = metric.Histogram().DataPoints().At(0).Sum() temporality = metric.Histogram().AggregationTemporality() isMonotonic = true + case pdata.MetricDataTypeSummary: + attributes = metric.Summary().DataPoints().At(0).Attributes() + ts = metric.Summary().DataPoints().At(0).Timestamp().AsTime() + value = metric.Summary().DataPoints().At(0).Sum() default: log.Panicf("Invalid data type %s", metric.DataType().String()) }