Skip to content

Commit

Permalink
add test for misalignments
Browse files Browse the repository at this point in the history
Signed-off-by: xchen <xchen@axon.com>
  • Loading branch information
hkfgo committed Jun 27, 2023
1 parent 504e3b5 commit 98c0090
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 18 deletions.
23 changes: 10 additions & 13 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,16 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco

switch histogram.AggregationTemporality() {
case pmetric.AggregationTemporalityDelta:
if ip.StartTimestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
a.logger.Warn(zap.String("Dropping misaligned histogram datapoint for time series ", signature))
continue
}
// assuming an application restart and reset counter
if ip.StartTimestamp().AsTime().After(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
if ip.StartTimestamp().AsTime() != mv.value.Histogram().DataPoints().At(0).StartTimestamp().AsTime() {
// treat misalgnment as restart and reset or violation of single-writer principle and drop
if ip.StartTimestamp().AsTime().After(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
} else {
a.logger.With(
zap.String("time_series", signature),
).Warn("Dropped misaligned histogram datapoint")
continue
}
} else {
accumulateHistogramValues(mv.value.Histogram().DataPoints().At(0), ip, m.Histogram().DataPoints().AppendEmpty())
}
Expand Down Expand Up @@ -347,11 +349,6 @@ func copyMetricMetadata(metric pmetric.Metric) pmetric.Metric {
}

func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
//if current.StartTimestamp().AsTime().Before(prev.StartTimestamp().AsTime()) {
// dest.SetStartTimestamp(current.StartTimestamp())
//} else {
// dest.SetStartTimestamp(prev.StartTimestamp())
//}
dest.SetStartTimestamp(prev.StartTimestamp())

older := prev
Expand Down
69 changes: 64 additions & 5 deletions exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestAccumulateHistogram(t *testing.T) {
appendHistogram := func(ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
appendHistogram := func(startTs time.Time, ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
Expand All @@ -40,20 +40,25 @@ func TestAccumulateHistogram(t *testing.T) {
dp.Attributes().PutStr("label_1", "1")
dp.Attributes().PutStr("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTs))
}

startTs1 := time.Now().Add(-6 * time.Second)
startTs2 := time.Now().Add(-5 * time.Second)
startTs3 := time.Now()
ts1 := time.Now().Add(-4 * time.Second)
ts2 := time.Now().Add(-3 * time.Second)
ts3 := time.Now().Add(-2 * time.Second)
ts4 := time.Now().Add(-1 * time.Second)
ts5 := time.Now().Add(1 * time.Second)

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)

resourceMetrics1 := pmetric.NewResourceMetrics()
ilm1 := resourceMetrics1.ScopeMetrics().AppendEmpty()
ilm1.Scope().SetName("test")
appendHistogram(ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
appendHistogram(ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
appendHistogram(startTs2, ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
appendHistogram(startTs2, ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())

m3 := ilm1.Metrics().At(0).Histogram().DataPoints().At(0)
m2 := ilm1.Metrics().At(1).Histogram().DataPoints().At(0)
Expand All @@ -63,16 +68,30 @@ func TestAccumulateHistogram(t *testing.T) {
resourceMetrics2 := pmetric.NewResourceMetrics()
ilm2 := resourceMetrics2.ScopeMetrics().AppendEmpty()
ilm2.Scope().SetName("test")
appendHistogram(ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics())
appendHistogram(startTs2, ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics())

// add extra buckets
resourceMetrics3 := pmetric.NewResourceMetrics()
ilm3 := resourceMetrics3.ScopeMetrics().AppendEmpty()
ilm3.Scope().SetName("test")
appendHistogram(ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics())
appendHistogram(startTs2, ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics())

m4 := ilm3.Metrics().At(0).Histogram().DataPoints().At(0)

// misaligned start timestamp, drop
resourceMetrics4 := pmetric.NewResourceMetrics()
ilm4 := resourceMetrics4.ScopeMetrics().AppendEmpty()
ilm4.Scope().SetName("test")
appendHistogram(startTs1, ts5, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics())
appendHistogram(ts3, ts5, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm4.Metrics())

// misaligned start timestamp, treat as restart
resourceMetrics5 := pmetric.NewResourceMetrics()
ilm5 := resourceMetrics5.ScopeMetrics().AppendEmpty()
ilm5.Scope().SetName("test")
appendHistogram(startTs3, ts5, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm5.Metrics())
m5 := ilm5.Metrics().At(0).Histogram().DataPoints().At(0)

t.Run("Accumulate", func(t *testing.T) {
n := a.Accumulate(resourceMetrics1)
require.Equal(t, 2, n)
Expand Down Expand Up @@ -133,6 +152,46 @@ func TestAccumulateHistogram(t *testing.T) {
require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("MisalignedTimestamps/Drop", func(t *testing.T) {
// should reset when different buckets arrive
n := a.Accumulate(resourceMetrics4)
require.Equal(t, 0, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m4.Sum(), v.Sum())
require.Equal(t, m4.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("MisalignedTimestamps/Reset", func(t *testing.T) {
// reset when start timestamp skips ahead
n := a.Accumulate(resourceMetrics5)
require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m5.Sum(), v.Sum())
require.Equal(t, m5.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m5.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m5.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
}

func TestAccumulateMetrics(t *testing.T) {
Expand Down

0 comments on commit 98c0090

Please sign in to comment.