Skip to content

Commit

Permalink
handle timestamp bug found during end-to-end testing
Browse files Browse the repository at this point in the history
  • Loading branch information
hkfgo committed Aug 29, 2023
1 parent b6e66e3 commit 7d8b888
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 114 deletions.
14 changes: 12 additions & 2 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I

func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
histogram := metric.Histogram()

a.logger.Debug("Accumulate histogram.....")
dps := histogram.DataPoints()

for i := 0; i < dps.Len(); i++ {
Expand All @@ -239,6 +239,7 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series
if !ok {
// first data point
a.logger.Debug("Accumulate first histogram data point")
m := copyMetricMetadata(metric)
ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
Expand All @@ -254,9 +255,16 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
switch histogram.AggregationTemporality() {
case pmetric.AggregationTemporalityDelta:
pp := mv.value.Histogram().DataPoints().At(0) // previous aggregated value for time range
if ip.StartTimestamp().AsTime() != pp.StartTimestamp().AsTime() {
if ip.StartTimestamp().AsTime() != pp.Timestamp().AsTime() {
// treat misalgnment as restart and reset, or violation of single-writer principle and drop
a.logger.With(
zap.String("ip_start_time", ip.StartTimestamp().String()),
zap.String("pp_start_time", pp.StartTimestamp().String()),
zap.String("pp_timestamp", pp.Timestamp().String()),
zap.String("ip_timestamp", ip.Timestamp().String()),
).Warn("Misaligned starting timestamps")
if ip.StartTimestamp().AsTime().After(pp.Timestamp().AsTime()) {
a.logger.Debug("treating it like reset")
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
} else {
a.logger.With(
Expand All @@ -265,6 +273,7 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
continue
}
} else {
a.logger.Debug("Accumulate another histogram datapoint")
accumulateHistogramValues(pp, ip, m.Histogram().DataPoints().AppendEmpty())
}
case pmetric.AggregationTemporalityCumulative:
Expand Down Expand Up @@ -358,6 +367,7 @@ func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
}

if match {

dest.SetCount(newer.Count() + older.Count())
dest.SetSum(newer.Sum() + older.Sum())

Expand Down
115 changes: 3 additions & 112 deletions exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,115 +15,6 @@ import (
"go.uber.org/zap"
)

func TestAccumulateHistogram(t *testing.T) {
appendHistogram := func(ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
metric.SetDescription("test description")
dp := metric.Histogram().DataPoints().AppendEmpty()
dp.ExplicitBounds().FromRaw(bounds)
dp.BucketCounts().FromRaw(counts)
dp.SetCount(count)
dp.SetSum(sum)
dp.Attributes().PutStr("label_1", "1")
dp.Attributes().PutStr("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
}

ts1 := time.Now().Add(-4 * time.Second)
ts2 := time.Now().Add(-3 * time.Second)
ts3 := time.Now().Add(-2 * time.Second)
ts4 := time.Now().Add(-1 * time.Second)

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)

resourceMetrics1 := pmetric.NewResourceMetrics()
ilm1 := resourceMetrics1.ScopeMetrics().AppendEmpty()
ilm1.Scope().SetName("test")
appendHistogram(ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
appendHistogram(ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())

m3 := ilm1.Metrics().At(0).Histogram().DataPoints().At(0)
m2 := ilm1.Metrics().At(1).Histogram().DataPoints().At(0)
signature := timeseriesSignature(ilm1.Scope().Name(), ilm1.Metrics().At(0), m2.Attributes(), pcommon.NewMap())

// different buckets from the past
resourceMetrics2 := pmetric.NewResourceMetrics()
ilm2 := resourceMetrics2.ScopeMetrics().AppendEmpty()
ilm2.Scope().SetName("test")
appendHistogram(ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics())

// add extra buckets
resourceMetrics3 := pmetric.NewResourceMetrics()
ilm3 := resourceMetrics3.ScopeMetrics().AppendEmpty()
ilm3.Scope().SetName("test")
appendHistogram(ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics())

m4 := ilm3.Metrics().At(0).Histogram().DataPoints().At(0)

t.Run("Accumulate", func(t *testing.T) {
n := a.Accumulate(resourceMetrics1)
require.Equal(t, 2, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m3.Sum()+m2.Sum(), v.Sum())
require.Equal(t, m3.Count()+m2.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("ResetBuckets/Ignore", func(t *testing.T) {
// should ignore metric from the past
n := a.Accumulate(resourceMetrics2)

require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m3.Sum()+m2.Sum(), v.Sum())
require.Equal(t, m3.Count()+m2.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("ResetBuckets/Perform", func(t *testing.T) {
// should reset when different buckets arrive
n := a.Accumulate(resourceMetrics3)
require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m4.Sum(), v.Sum())
require.Equal(t, m4.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
}

func TestAccumulateMetrics(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -514,7 +405,7 @@ func TestAccumulateDeltaToCumulativeHistogram(t *testing.T) {
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())
appendDeltaHistogram(startTs, ts2, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())
appendDeltaHistogram(ts1, ts2, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())

m1 := ilm.Metrics().At(0).Histogram().DataPoints().At(0)
m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0)
Expand Down Expand Up @@ -556,7 +447,7 @@ func TestAccumulateDeltaToCumulativeHistogram(t *testing.T) {
// should ignore metric with different buckets from the past
a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 2, n)
require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
Expand All @@ -582,7 +473,7 @@ func TestAccumulateDeltaToCumulativeHistogram(t *testing.T) {
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())
appendDeltaHistogram(startTs, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics())
appendDeltaHistogram(ts1, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics())

m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0)
signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap())
Expand Down

0 comments on commit 7d8b888

Please sign in to comment.