Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
hkfgo committed Aug 28, 2023
2 parents 42a7098 + c7d17bb commit b6e66e3
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 3 deletions.
4 changes: 2 additions & 2 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,13 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs) // uniquely idenity this time series you are accumulating for
if ip.Flags().NoRecordedValue() {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series
if !ok {
// first data point
m := copyMetricMetadata(metric)
Expand Down
111 changes: 110 additions & 1 deletion exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,115 @@ import (
"go.uber.org/zap"
)

func TestAccumulateHistogram(t *testing.T) {
appendHistogram := func(ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
metric.SetDescription("test description")
dp := metric.Histogram().DataPoints().AppendEmpty()
dp.ExplicitBounds().FromRaw(bounds)
dp.BucketCounts().FromRaw(counts)
dp.SetCount(count)
dp.SetSum(sum)
dp.Attributes().PutStr("label_1", "1")
dp.Attributes().PutStr("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
}

ts1 := time.Now().Add(-4 * time.Second)
ts2 := time.Now().Add(-3 * time.Second)
ts3 := time.Now().Add(-2 * time.Second)
ts4 := time.Now().Add(-1 * time.Second)

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)

resourceMetrics1 := pmetric.NewResourceMetrics()
ilm1 := resourceMetrics1.ScopeMetrics().AppendEmpty()
ilm1.Scope().SetName("test")
appendHistogram(ts3, 5, 2.5, []uint64{1, 3, 1, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())
appendHistogram(ts2, 4, 8.3, []uint64{1, 1, 2, 0}, []float64{0.1, 0.5, 1, 10}, ilm1.Metrics())

m3 := ilm1.Metrics().At(0).Histogram().DataPoints().At(0)
m2 := ilm1.Metrics().At(1).Histogram().DataPoints().At(0)
signature := timeseriesSignature(ilm1.Scope().Name(), ilm1.Metrics().At(0), m2.Attributes(), pcommon.NewMap())

// different buckets from the past
resourceMetrics2 := pmetric.NewResourceMetrics()
ilm2 := resourceMetrics2.ScopeMetrics().AppendEmpty()
ilm2.Scope().SetName("test")
appendHistogram(ts1, 7, 5, []uint64{3, 1, 1, 0}, []float64{0.1, 0.2, 1, 10}, ilm2.Metrics())

// add extra buckets
resourceMetrics3 := pmetric.NewResourceMetrics()
ilm3 := resourceMetrics3.ScopeMetrics().AppendEmpty()
ilm3.Scope().SetName("test")
appendHistogram(ts4, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10, 15}, ilm3.Metrics())

m4 := ilm3.Metrics().At(0).Histogram().DataPoints().At(0)

t.Run("Accumulate", func(t *testing.T) {
n := a.Accumulate(resourceMetrics1)
require.Equal(t, 2, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m3.Sum()+m2.Sum(), v.Sum())
require.Equal(t, m3.Count()+m2.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("ResetBuckets/Ignore", func(t *testing.T) {
// should ignore metric from the past
n := a.Accumulate(resourceMetrics2)

require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m3.Sum()+m2.Sum(), v.Sum())
require.Equal(t, m3.Count()+m2.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m3.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m3.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("ResetBuckets/Perform", func(t *testing.T) {
// should reset when different buckets arrive
n := a.Accumulate(resourceMetrics3)
require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m4.Sum(), v.Sum())
require.Equal(t, m4.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m4.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m4.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
}

func TestAccumulateMetrics(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -286,7 +395,7 @@ func TestAccumulateMetrics(t *testing.T) {
}
}

func TestAccumulateDeltaToCumulativeSum(t *testing.T) {
func TestAccumulateDeltaToCumulative(t *testing.T) {
tests := []struct {
name string
metric func(time.Time, time.Time, float64, pmetric.MetricSlice)
Expand Down

0 comments on commit b6e66e3

Please sign in to comment.