From 98c00903cb6ef505cf17c95cd71609ee19070027 Mon Sep 17 00:00:00 2001 From: xchen Date: Sun, 25 Jun 2023 18:40:48 -0700 Subject: [PATCH] add test for misalignments Signed-off-by: xchen --- exporter/prometheusexporter/accumulator.go | 23 +++---- .../prometheusexporter/accumulator_test.go | 69 +++++++++++++++++-- 2 files changed, 74 insertions(+), 18 deletions(-) diff --git a/exporter/prometheusexporter/accumulator.go b/exporter/prometheusexporter/accumulator.go index 44c63c22acca..51fdbec10ba0 100644 --- a/exporter/prometheusexporter/accumulator.go +++ b/exporter/prometheusexporter/accumulator.go @@ -264,14 +264,16 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco switch histogram.AggregationTemporality() { case pmetric.AggregationTemporalityDelta: - if ip.StartTimestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) { - // only keep datapoint with latest timestamp - a.logger.Warn(zap.String("Dropping misaligned histogram datapoint for time series ", signature)) - continue - } - // assuming an application restart and reset counter - if ip.StartTimestamp().AsTime().After(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) { - ip.CopyTo(m.Histogram().DataPoints().AppendEmpty()) + if ip.StartTimestamp().AsTime() != mv.value.Histogram().DataPoints().At(0).StartTimestamp().AsTime() { + // treat misalgnment as restart and reset or violation of single-writer principle and drop + if ip.StartTimestamp().AsTime().After(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) { + ip.CopyTo(m.Histogram().DataPoints().AppendEmpty()) + } else { + a.logger.With( + zap.String("time_series", signature), + ).Warn("Dropped misaligned histogram datapoint") + continue + } } else { accumulateHistogramValues(mv.value.Histogram().DataPoints().At(0), ip, m.Histogram().DataPoints().AppendEmpty()) } @@ -347,11 +349,6 @@ func copyMetricMetadata(metric pmetric.Metric) pmetric.Metric { } func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) { - //if current.StartTimestamp().AsTime().Before(prev.StartTimestamp().AsTime()) { - // dest.SetStartTimestamp(current.StartTimestamp()) - //} else { - // dest.SetStartTimestamp(prev.StartTimestamp()) - //} dest.SetStartTimestamp(prev.StartTimestamp()) older := prev diff --git a/exporter/prometheusexporter/accumulator_test.go b/exporter/prometheusexporter/accumulator_test.go index 34911948dc4c..57c1e302e22e 100644 --- a/exporter/prometheusexporter/accumulator_test.go +++ b/exporter/prometheusexporter/accumulator_test.go @@ -27,7 +27,7 @@ import ( ) func TestAccumulateHistogram(t *testing.T) { - appendHistogram := func(ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) { + appendHistogram := func(startTs time.Time, ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) { metric := metrics.AppendEmpty() metric.SetName("test_metric") metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) @@ -40,20 +40,25 @@ func TestAccumulateHistogram(t *testing.T) { dp.Attributes().PutStr("label_1", "1") dp.Attributes().PutStr("label_2", "2") dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTs)) } + startTs1 := time.Now().Add(-6 * time.Second) + startTs2 := time.Now().Add(-5 * time.Second) + startTs3 := time.Now() ts1 := time.Now().Add(-4 * time.Second) ts2 := time.Now().Add(-3 * time.Second) ts3 := time.Now().Add(-2 * time.Second) ts4 := time.Now().Add(-1 * time.Second) + ts5 := time.Now().Add(1 * time.Second) a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) resourceMetrics1 := pmetric.NewResourceMetrics() ilm1 := resourceMetrics1.ScopeMetrics().AppendEmpty() ilm1.Scope().SetName("test") - appendHistogram(ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics()) - appendHistogram(ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics()) + appendHistogram(startTs2, ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics()) + appendHistogram(startTs2, ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics()) m3 := ilm1.Metrics().At(0).Histogram().DataPoints().At(0) m2 := ilm1.Metrics().At(1).Histogram().DataPoints().At(0) @@ -63,16 +68,30 @@ func TestAccumulateHistogram(t *testing.T) { resourceMetrics2 := pmetric.NewResourceMetrics() ilm2 := resourceMetrics2.ScopeMetrics().AppendEmpty() ilm2.Scope().SetName("test") - appendHistogram(ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics()) + appendHistogram(startTs2, ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics()) // add extra buckets resourceMetrics3 := pmetric.NewResourceMetrics() ilm3 := resourceMetrics3.ScopeMetrics().AppendEmpty() ilm3.Scope().SetName("test") - appendHistogram(ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics()) + appendHistogram(startTs2, ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics()) m4 := ilm3.Metrics().At(0).Histogram().DataPoints().At(0) + // misaligned start timestamp, drop + resourceMetrics4 := pmetric.NewResourceMetrics() + ilm4 := resourceMetrics4.ScopeMetrics().AppendEmpty() + ilm4.Scope().SetName("test") + appendHistogram(startTs1, ts5, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics()) + appendHistogram(ts3, ts5, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics()) + + // misaligned start timestamp, treat as restart + resourceMetrics5 := pmetric.NewResourceMetrics() + ilm5 := resourceMetrics5.ScopeMetrics().AppendEmpty() + ilm5.Scope().SetName("test") + appendHistogram(startTs3, ts5, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm5.Metrics()) + m5 := ilm5.Metrics().At(0).Histogram().DataPoints().At(0) + t.Run("Accumulate", func(t *testing.T) { n := a.Accumulate(resourceMetrics1) require.Equal(t, 2, n) @@ -133,6 +152,46 @@ func TestAccumulateHistogram(t *testing.T) { require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) } }) + t.Run("MisalignedTimestamps/Drop", func(t *testing.T) { + // should reset when different buckets arrive + n := a.Accumulate(resourceMetrics4) + require.Equal(t, 0, n) + + m, ok := a.registeredMetrics.Load(signature) + v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) + require.True(t, ok) + + require.Equal(t, m4.Sum(), v.Sum()) + require.Equal(t, m4.Count(), v.Count()) + + for i := 0; i < v.BucketCounts().Len(); i++ { + require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i)) + } + + for i := 0; i < v.ExplicitBounds().Len(); i++ { + require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) + } + }) + t.Run("MisalignedTimestamps/Reset", func(t *testing.T) { + // reset when start timestamp skips ahead + n := a.Accumulate(resourceMetrics5) + require.Equal(t, 1, n) + + m, ok := a.registeredMetrics.Load(signature) + v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) + require.True(t, ok) + + require.Equal(t, m5.Sum(), v.Sum()) + require.Equal(t, m5.Count(), v.Count()) + + for i := 0; i < v.BucketCounts().Len(); i++ { + require.Equal(t, m5.BucketCounts().At(i), v.BucketCounts().At(i)) + } + + for i := 0; i < v.ExplicitBounds().Len(); i++ { + require.Equal(t, m5.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) + } + }) } func TestAccumulateMetrics(t *testing.T) {