diff --git a/exporter/prometheusexporter/accumulator.go b/exporter/prometheusexporter/accumulator.go index 1fd431bd6b68..e7353bee673e 100644 --- a/exporter/prometheusexporter/accumulator.go +++ b/exporter/prometheusexporter/accumulator.go @@ -230,13 +230,13 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco for i := 0; i < dps.Len(); i++ { ip := dps.At(i) - signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs) // uniquely idenity this time series you are accumulating for + signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs) if ip.Flags().NoRecordedValue() { a.registeredMetrics.Delete(signature) return 0 } - v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series + v, ok := a.registeredMetrics.Load(signature) if !ok { // first data point m := copyMetricMetadata(metric) @@ -253,18 +253,24 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco switch histogram.AggregationTemporality() { case pmetric.AggregationTemporalityDelta: - if ip.StartTimestamp().AsTime() != mv.value.Histogram().DataPoints().At(0).StartTimestamp().AsTime() { - // treat misalgnment as restart and reset or violation of single-writer principle and drop - if ip.StartTimestamp().AsTime().After(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) { + pp := mv.value.Histogram().DataPoints().At(0) // previous aggregated value for time range + if ip.StartTimestamp().AsTime() != pp.StartTimestamp().AsTime() { + // treat misalgnment as restart and reset, or violation of single-writer principle and drop + if ip.StartTimestamp().AsTime().After(pp.Timestamp().AsTime()) { ip.CopyTo(m.Histogram().DataPoints().AppendEmpty()) } else { a.logger.With( - zap.String("time_series", signature), + zap.String("metric_name", metric.Name()), ).Warn("Dropped misaligned histogram datapoint") continue } + } else if ip.HasSum() != pp.HasSum() { + a.logger.With( + zap.String("metric_name", metric.Name()), + ).Warn("Dropped histogram data point due to disagreement on sum tracking") + continue } else { - accumulateHistogramValues(mv.value.Histogram().DataPoints().At(0), ip, m.Histogram().DataPoints().AppendEmpty()) + accumulateHistogramValues(pp, ip, m.Histogram().DataPoints().AppendEmpty()) } case pmetric.AggregationTemporalityCumulative: if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) { @@ -351,21 +357,14 @@ func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) { dest.SetTimestamp(newer.Timestamp()) // checking for bucket boundary alignment, optionally re-aggregate on newer boundaries - match := true - if older.ExplicitBounds().Len() == newer.ExplicitBounds().Len() { - for i := 0; i < newer.ExplicitBounds().Len(); i++ { - if older.ExplicitBounds().At(i) != newer.ExplicitBounds().At(i) { - match = false - break - } - } - } else { - match = false + match := older.ExplicitBounds().Len() == newer.ExplicitBounds().Len() + for i := 0; match && i < newer.ExplicitBounds().Len(); i++ { + match = older.ExplicitBounds().At(i) == newer.ExplicitBounds().At(i) } if match { dest.SetCount(newer.Count() + older.Count()) - if newer.HasSum() && older.HasSum() { + if newer.HasSum() { dest.SetSum(newer.Sum() + older.Sum()) } diff --git a/exporter/prometheusexporter/accumulator_test.go b/exporter/prometheusexporter/accumulator_test.go index e05e7ea4b8f5..176a442024ab 100644 --- a/exporter/prometheusexporter/accumulator_test.go +++ b/exporter/prometheusexporter/accumulator_test.go @@ -15,175 +15,6 @@ import ( "go.uber.org/zap" ) -func TestAccumulateHistogram(t *testing.T) { - appendHistogram := func(startTs time.Time, ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) { - metric := metrics.AppendEmpty() - metric.SetName("test_metric") - metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) - metric.SetDescription("test description") - dp := metric.Histogram().DataPoints().AppendEmpty() - dp.ExplicitBounds().FromRaw(bounds) - dp.BucketCounts().FromRaw(counts) - dp.SetCount(count) - dp.SetSum(sum) - dp.Attributes().PutStr("label_1", "1") - dp.Attributes().PutStr("label_2", "2") - dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) - dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTs)) - } - - startTs1 := time.Now().Add(-6 * time.Second) - startTs2 := time.Now().Add(-5 * time.Second) - startTs3 := time.Now() - ts1 := time.Now().Add(-4 * time.Second) - ts2 := time.Now().Add(-3 * time.Second) - ts3 := time.Now().Add(-2 * time.Second) - ts4 := time.Now().Add(-1 * time.Second) - ts5 := time.Now().Add(1 * time.Second) - - a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) - - resourceMetrics1 := pmetric.NewResourceMetrics() - ilm1 := resourceMetrics1.ScopeMetrics().AppendEmpty() - ilm1.Scope().SetName("test") - // counts is one more than explicit bounds to account for the one implicit count/bucket for <=inf - appendHistogram(startTs2, ts3, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics()) - appendHistogram(startTs2, ts2, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics()) - - m3 := ilm1.Metrics().At(0).Histogram().DataPoints().At(0) - m2 := ilm1.Metrics().At(1).Histogram().DataPoints().At(0) - signature := timeseriesSignature(ilm1.Scope().Name(), ilm1.Metrics().At(0), m2.Attributes(), pcommon.NewMap()) - - // different buckets from the past - resourceMetrics2 := pmetric.NewResourceMetrics() - ilm2 := resourceMetrics2.ScopeMetrics().AppendEmpty() - ilm2.Scope().SetName("test") - appendHistogram(startTs2, ts1, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics()) - - // add extra buckets - resourceMetrics3 := pmetric.NewResourceMetrics() - ilm3 := resourceMetrics3.ScopeMetrics().AppendEmpty() - ilm3.Scope().SetName("test") - appendHistogram(startTs2, ts4, 7, 5, []uint64{3, 1, 1, 0, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics()) - - m4 := ilm3.Metrics().At(0).Histogram().DataPoints().At(0) - - // misaligned start timestamp, drop - resourceMetrics4 := pmetric.NewResourceMetrics() - ilm4 := resourceMetrics4.ScopeMetrics().AppendEmpty() - ilm4.Scope().SetName("test") - appendHistogram(startTs1, ts5, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics()) - appendHistogram(ts3, ts5, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics()) - - // misaligned start timestamp, treat as restart - resourceMetrics5 := pmetric.NewResourceMetrics() - ilm5 := resourceMetrics5.ScopeMetrics().AppendEmpty() - ilm5.Scope().SetName("test") - appendHistogram(startTs3, ts5, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm5.Metrics()) - m5 := ilm5.Metrics().At(0).Histogram().DataPoints().At(0) - - t.Run("Accumulate", func(t *testing.T) { - n := a.Accumulate(resourceMetrics1) - require.Equal(t, 2, n) - - m, ok := a.registeredMetrics.Load(signature) - v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) - require.True(t, ok) - - require.Equal(t, m3.Sum()+m2.Sum(), v.Sum()) - require.Equal(t, m3.Count()+m2.Count(), v.Count()) - - for i := 0; i < v.BucketCounts().Len(); i++ { - require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i)) - } - - for i := 0; i < v.ExplicitBounds().Len(); i++ { - require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) - } - }) - t.Run("ResetBuckets/Ignore", func(t *testing.T) { - // should ignore metric from the past - n := a.Accumulate(resourceMetrics2) - - require.Equal(t, 1, n) - - m, ok := a.registeredMetrics.Load(signature) - v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) - require.True(t, ok) - - require.Equal(t, m3.Sum()+m2.Sum(), v.Sum()) - require.Equal(t, m3.Count()+m2.Count(), v.Count()) - - for i := 0; i < v.BucketCounts().Len(); i++ { - require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i)) - } - - for i := 0; i < v.ExplicitBounds().Len(); i++ { - require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) - } - }) - t.Run("ResetBuckets/Perform", func(t *testing.T) { - // should reset when different buckets arrive - n := a.Accumulate(resourceMetrics3) - require.Equal(t, 1, n) - - m, ok := a.registeredMetrics.Load(signature) - v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) - require.True(t, ok) - - require.Equal(t, m4.Sum(), v.Sum()) - require.Equal(t, m4.Count(), v.Count()) - - for i := 0; i < v.BucketCounts().Len(); i++ { - require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i)) - } - - for i := 0; i < v.ExplicitBounds().Len(); i++ { - require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) - } - }) - t.Run("MisalignedTimestamps/Drop", func(t *testing.T) { - // should reset when different buckets arrive - n := a.Accumulate(resourceMetrics4) - require.Equal(t, 0, n) - - m, ok := a.registeredMetrics.Load(signature) - v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) - require.True(t, ok) - - require.Equal(t, m4.Sum(), v.Sum()) - require.Equal(t, m4.Count(), v.Count()) - - for i := 0; i < v.BucketCounts().Len(); i++ { - require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i)) - } - - for i := 0; i < v.ExplicitBounds().Len(); i++ { - require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) - } - }) - t.Run("MisalignedTimestamps/Reset", func(t *testing.T) { - // reset when start timestamp skips ahead - n := a.Accumulate(resourceMetrics5) - require.Equal(t, 1, n) - - m, ok := a.registeredMetrics.Load(signature) - v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) - require.True(t, ok) - - require.Equal(t, m5.Sum(), v.Sum()) - require.Equal(t, m5.Count(), v.Count()) - - for i := 0; i < v.BucketCounts().Len(); i++ { - require.Equal(t, m5.BucketCounts().At(i), v.BucketCounts().At(i)) - } - - for i := 0; i < v.ExplicitBounds().Len(); i++ { - require.Equal(t, m5.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) - } - }) -} - func TestAccumulateMetrics(t *testing.T) { tests := []struct { name string @@ -455,7 +286,7 @@ func TestAccumulateMetrics(t *testing.T) { } } -func TestAccumulateDeltaToCumulative(t *testing.T) { +func TestAccumulateDeltaToCumulativeSum(t *testing.T) { tests := []struct { name string metric func(time.Time, time.Time, float64, pmetric.MetricSlice) @@ -549,6 +380,194 @@ func TestAccumulateDeltaToCumulative(t *testing.T) { } } +func TestAccumulateDeltaToCumulativeHistogram(t *testing.T) { + appendDeltaHistogram := func(startTs time.Time, ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("test_metric") + metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + metric.SetDescription("test description") + dp := metric.Histogram().DataPoints().AppendEmpty() + dp.ExplicitBounds().FromRaw(bounds) + dp.BucketCounts().FromRaw(counts) + dp.SetCount(count) + dp.SetSum(sum) + dp.Attributes().PutStr("label_1", "1") + dp.Attributes().PutStr("label_2", "2") + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTs)) + } + + t.Run("AccumulateHappyPath", func(t *testing.T) { + startTs := time.Now().Add(-5 * time.Second) + ts1 := time.Now().Add(-4 * time.Second) + ts2 := time.Now().Add(-3 * time.Second) + resourceMetrics := pmetric.NewResourceMetrics() + ilm := resourceMetrics.ScopeMetrics().AppendEmpty() + ilm.Scope().SetName("test") + appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) + appendDeltaHistogram(startTs, ts2, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) + + m1 := ilm.Metrics().At(0).Histogram().DataPoints().At(0) + m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0) + signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap()) + + a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) + n := a.Accumulate(resourceMetrics) + require.Equal(t, 2, n) + + m, ok := a.registeredMetrics.Load(signature) + v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) + require.True(t, ok) + + require.Equal(t, m1.Sum()+m2.Sum(), v.Sum()) + require.Equal(t, m1.Count()+m2.Count(), v.Count()) + + for i := 0; i < v.BucketCounts().Len(); i++ { + require.Equal(t, m1.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i)) + } + + for i := 0; i < v.ExplicitBounds().Len(); i++ { + require.Equal(t, m2.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) + } + }) + t.Run("ResetBuckets/Ignore", func(t *testing.T) { + startTs := time.Now().Add(-5 * time.Second) + ts1 := time.Now().Add(-3 * time.Second) + ts2 := time.Now().Add(-4 * time.Second) + resourceMetrics := pmetric.NewResourceMetrics() + ilm := resourceMetrics.ScopeMetrics().AppendEmpty() + ilm.Scope().SetName("test") + appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) + appendDeltaHistogram(startTs, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics()) + + m1 := ilm.Metrics().At(0).Histogram().DataPoints().At(0) + m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0) + signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap()) + + // should ignore metric with different buckets from the past + a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) + n := a.Accumulate(resourceMetrics) + require.Equal(t, 2, n) + + m, ok := a.registeredMetrics.Load(signature) + v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) + require.True(t, ok) + + require.Equal(t, m1.Sum(), v.Sum()) + require.Equal(t, m1.Count(), v.Count()) + + for i := 0; i < v.BucketCounts().Len(); i++ { + require.Equal(t, m1.BucketCounts().At(i), v.BucketCounts().At(i)) + } + + for i := 0; i < v.ExplicitBounds().Len(); i++ { + require.Equal(t, m1.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) + } + }) + t.Run("ResetBuckets/Perform", func(t *testing.T) { + // should reset when different buckets arrive + startTs := time.Now().Add(-5 * time.Second) + ts1 := time.Now().Add(-3 * time.Second) + ts2 := time.Now().Add(-2 * time.Second) + resourceMetrics := pmetric.NewResourceMetrics() + ilm := resourceMetrics.ScopeMetrics().AppendEmpty() + ilm.Scope().SetName("test") + appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) + appendDeltaHistogram(startTs, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics()) + + m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0) + signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap()) + + // should ignore metric with different buckets from the past + a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) + n := a.Accumulate(resourceMetrics) + require.Equal(t, 2, n) + + m, ok := a.registeredMetrics.Load(signature) + v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) + require.True(t, ok) + + require.Equal(t, m2.Sum(), v.Sum()) + require.Equal(t, m2.Count(), v.Count()) + + for i := 0; i < v.BucketCounts().Len(); i++ { + require.Equal(t, m2.BucketCounts().At(i), v.BucketCounts().At(i)) + } + + for i := 0; i < v.ExplicitBounds().Len(); i++ { + require.Equal(t, m2.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) + } + }) + t.Run("MisalignedTimestamps/Drop", func(t *testing.T) { + // should drop data points with different start time that's before latest timestamp + startTs1 := time.Now().Add(-5 * time.Second) + startTs2 := time.Now().Add(-4 * time.Second) + ts1 := time.Now().Add(-3 * time.Second) + ts2 := time.Now().Add(-2 * time.Second) + resourceMetrics := pmetric.NewResourceMetrics() + ilm := resourceMetrics.ScopeMetrics().AppendEmpty() + ilm.Scope().SetName("test") + appendDeltaHistogram(startTs1, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) + appendDeltaHistogram(startTs2, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics()) + + m1 := ilm.Metrics().At(0).Histogram().DataPoints().At(0) + signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m1.Attributes(), pcommon.NewMap()) + + a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) + n := a.Accumulate(resourceMetrics) + require.Equal(t, 1, n) + + m, ok := a.registeredMetrics.Load(signature) + v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) + require.True(t, ok) + + require.Equal(t, m1.Sum(), v.Sum()) + require.Equal(t, m1.Count(), v.Count()) + + for i := 0; i < v.BucketCounts().Len(); i++ { + require.Equal(t, m1.BucketCounts().At(i), v.BucketCounts().At(i)) + } + + for i := 0; i < v.ExplicitBounds().Len(); i++ { + require.Equal(t, m1.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) + } + }) + t.Run("MisalignedTimestamps/Reset", func(t *testing.T) { + // reset when start timestamp skips ahead + startTs1 := time.Now().Add(-5 * time.Second) + startTs2 := time.Now().Add(-2 * time.Second) + ts1 := time.Now().Add(-3 * time.Second) + ts2 := time.Now().Add(-1 * time.Second) + resourceMetrics := pmetric.NewResourceMetrics() + ilm := resourceMetrics.ScopeMetrics().AppendEmpty() + ilm.Scope().SetName("test") + appendDeltaHistogram(startTs1, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) + appendDeltaHistogram(startTs2, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics()) + + m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0) + signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap()) + + a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) + n := a.Accumulate(resourceMetrics) + require.Equal(t, 2, n) + + m, ok := a.registeredMetrics.Load(signature) + v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) + require.True(t, ok) + + require.Equal(t, m2.Sum(), v.Sum()) + require.Equal(t, m2.Count(), v.Count()) + + for i := 0; i < v.BucketCounts().Len(); i++ { + require.Equal(t, m2.BucketCounts().At(i), v.BucketCounts().At(i)) + } + + for i := 0; i < v.ExplicitBounds().Len(); i++ { + require.Equal(t, m2.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) + } + }) +} + func TestAccumulateDroppedMetrics(t *testing.T) { tests := []struct { name string