diff --git a/exporter/prometheusexporter/accumulator.go b/exporter/prometheusexporter/accumulator.go index 059701447bb8..c4ee1a85c42c 100644 --- a/exporter/prometheusexporter/accumulator.go +++ b/exporter/prometheusexporter/accumulator.go @@ -224,7 +224,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) { histogram := metric.Histogram() - + a.logger.Debug("Accumulate histogram.....") dps := histogram.DataPoints() for i := 0; i < dps.Len(); i++ { @@ -239,6 +239,7 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series if !ok { // first data point + a.logger.Debug("Accumulate first histogram data point") m := copyMetricMetadata(metric) ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty()) m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) @@ -254,9 +255,16 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco switch histogram.AggregationTemporality() { case pmetric.AggregationTemporalityDelta: pp := mv.value.Histogram().DataPoints().At(0) // previous aggregated value for time range - if ip.StartTimestamp().AsTime() != pp.StartTimestamp().AsTime() { + if ip.StartTimestamp().AsTime() != pp.Timestamp().AsTime() { // treat misalgnment as restart and reset, or violation of single-writer principle and drop + a.logger.With( + zap.String("ip_start_time", ip.StartTimestamp().String()), + zap.String("pp_start_time", pp.StartTimestamp().String()), + zap.String("pp_timestamp", pp.Timestamp().String()), + zap.String("ip_timestamp", ip.Timestamp().String()), + ).Warn("Misaligned starting timestamps") if ip.StartTimestamp().AsTime().After(pp.Timestamp().AsTime()) { + a.logger.Debug("treating it like reset") ip.CopyTo(m.Histogram().DataPoints().AppendEmpty()) } else { a.logger.With( @@ -265,6 +273,7 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco continue } } else { + a.logger.Debug("Accumulate another histogram datapoint") accumulateHistogramValues(pp, ip, m.Histogram().DataPoints().AppendEmpty()) } case pmetric.AggregationTemporalityCumulative: @@ -358,6 +367,7 @@ func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) { } if match { + dest.SetCount(newer.Count() + older.Count()) dest.SetSum(newer.Sum() + older.Sum()) diff --git a/exporter/prometheusexporter/accumulator_test.go b/exporter/prometheusexporter/accumulator_test.go index 27638e614ef3..ab4ac4399d6a 100644 --- a/exporter/prometheusexporter/accumulator_test.go +++ b/exporter/prometheusexporter/accumulator_test.go @@ -15,115 +15,6 @@ import ( "go.uber.org/zap" ) -func TestAccumulateHistogram(t *testing.T) { - appendHistogram := func(ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) { - metric := metrics.AppendEmpty() - metric.SetName("test_metric") - metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) - metric.SetDescription("test description") - dp := metric.Histogram().DataPoints().AppendEmpty() - dp.ExplicitBounds().FromRaw(bounds) - dp.BucketCounts().FromRaw(counts) - dp.SetCount(count) - dp.SetSum(sum) - dp.Attributes().PutStr("label_1", "1") - dp.Attributes().PutStr("label_2", "2") - dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) - } - - ts1 := time.Now().Add(-4 * time.Second) - ts2 := time.Now().Add(-3 * time.Second) - ts3 := time.Now().Add(-2 * time.Second) - ts4 := time.Now().Add(-1 * time.Second) - - a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) - - resourceMetrics1 := pmetric.NewResourceMetrics() - ilm1 := resourceMetrics1.ScopeMetrics().AppendEmpty() - ilm1.Scope().SetName("test") - appendHistogram(ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics()) - appendHistogram(ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics()) - - m3 := ilm1.Metrics().At(0).Histogram().DataPoints().At(0) - m2 := ilm1.Metrics().At(1).Histogram().DataPoints().At(0) - signature := timeseriesSignature(ilm1.Scope().Name(), ilm1.Metrics().At(0), m2.Attributes(), pcommon.NewMap()) - - // different buckets from the past - resourceMetrics2 := pmetric.NewResourceMetrics() - ilm2 := resourceMetrics2.ScopeMetrics().AppendEmpty() - ilm2.Scope().SetName("test") - appendHistogram(ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics()) - - // add extra buckets - resourceMetrics3 := pmetric.NewResourceMetrics() - ilm3 := resourceMetrics3.ScopeMetrics().AppendEmpty() - ilm3.Scope().SetName("test") - appendHistogram(ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics()) - - m4 := ilm3.Metrics().At(0).Histogram().DataPoints().At(0) - - t.Run("Accumulate", func(t *testing.T) { - n := a.Accumulate(resourceMetrics1) - require.Equal(t, 2, n) - - m, ok := a.registeredMetrics.Load(signature) - v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) - require.True(t, ok) - - require.Equal(t, m3.Sum()+m2.Sum(), v.Sum()) - require.Equal(t, m3.Count()+m2.Count(), v.Count()) - - for i := 0; i < v.BucketCounts().Len(); i++ { - require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i)) - } - - for i := 0; i < v.ExplicitBounds().Len(); i++ { - require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) - } - }) - t.Run("ResetBuckets/Ignore", func(t *testing.T) { - // should ignore metric from the past - n := a.Accumulate(resourceMetrics2) - - require.Equal(t, 1, n) - - m, ok := a.registeredMetrics.Load(signature) - v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) - require.True(t, ok) - - require.Equal(t, m3.Sum()+m2.Sum(), v.Sum()) - require.Equal(t, m3.Count()+m2.Count(), v.Count()) - - for i := 0; i < v.BucketCounts().Len(); i++ { - require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i)) - } - - for i := 0; i < v.ExplicitBounds().Len(); i++ { - require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) - } - }) - t.Run("ResetBuckets/Perform", func(t *testing.T) { - // should reset when different buckets arrive - n := a.Accumulate(resourceMetrics3) - require.Equal(t, 1, n) - - m, ok := a.registeredMetrics.Load(signature) - v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) - require.True(t, ok) - - require.Equal(t, m4.Sum(), v.Sum()) - require.Equal(t, m4.Count(), v.Count()) - - for i := 0; i < v.BucketCounts().Len(); i++ { - require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i)) - } - - for i := 0; i < v.ExplicitBounds().Len(); i++ { - require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i)) - } - }) -} - func TestAccumulateMetrics(t *testing.T) { tests := []struct { name string @@ -514,7 +405,7 @@ func TestAccumulateDeltaToCumulativeHistogram(t *testing.T) { ilm := resourceMetrics.ScopeMetrics().AppendEmpty() ilm.Scope().SetName("test") appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) - appendDeltaHistogram(startTs, ts2, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) + appendDeltaHistogram(ts1, ts2, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) m1 := ilm.Metrics().At(0).Histogram().DataPoints().At(0) m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0) @@ -556,7 +447,7 @@ func TestAccumulateDeltaToCumulativeHistogram(t *testing.T) { // should ignore metric with different buckets from the past a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator) n := a.Accumulate(resourceMetrics) - require.Equal(t, 2, n) + require.Equal(t, 1, n) m, ok := a.registeredMetrics.Load(signature) v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0) @@ -582,7 +473,7 @@ func TestAccumulateDeltaToCumulativeHistogram(t *testing.T) { ilm := resourceMetrics.ScopeMetrics().AppendEmpty() ilm.Scope().SetName("test") appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics()) - appendDeltaHistogram(startTs, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics()) + appendDeltaHistogram(ts1, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics()) m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0) signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap())