Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delta to cumulative prometheus #9919

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
- `tailsamplingprocessor`: Fix composite sampler with inverse policy
- `awsprometheusremotewriteexporter`: Fix signing of empty request bodies. (#10578)
- `sigv4authextension`: Fix signing of empty request bodies. (#10578)
- `prometheusexporter`: Converting monotonic Delta to Cumulative sums (#9919)
- `statsdreceiver`: Update the lastIntervalTime for Counter metrics (#9919)

## v0.52.0

Expand Down
20 changes: 18 additions & 2 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,13 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon
func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
doubleSum := metric.Sum()

// Drop metrics with non-cumulative aggregations
if doubleSum.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative {
// Drop metrics with unspecified aggregations
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityUnspecified {
locmai marked this conversation as resolved.
Show resolved Hide resolved
return
}

// Drop non-monotonic and non-cumulative metrics
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta && !doubleSum.IsMonotonic() {
return
}

Expand Down Expand Up @@ -204,6 +209,17 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
continue
jmacd marked this conversation as resolved.
Show resolved Hide resolved
}

// Delta-to-Cumulative
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).StartTimestamp() {
ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
switch ip.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
ip.SetIntVal(ip.IntVal() + mv.value.Sum().DataPoints().At(0).IntVal())
case pmetric.NumberDataPointValueTypeDouble:
ip.SetDoubleVal(ip.DoubleVal() + mv.value.Sum().DataPoints().At(0).DoubleVal())
Comment on lines +215 to +219
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final value will be converted into float64 at the convertSum():

switch ip.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
value = float64(ip.IntVal())
case pmetric.NumberDataPointValueTypeDouble:
value = ip.DoubleVal()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a client were to mix number types, something irregular happens here, e.g., if the stored point has a floating point value and the new point has an integer value. I'm OK ignoring this case, since there are already caveats about what we're doing here. Instead of changing the code for corner cases, I recommend documenting what this will do.

As it stands, your change means that a Prometheus exporter can aggregate a single stream of delta temporality counter data into a single cumulative metric. This will be the case when there is one statsd receiver. If a single OTLP SDK exports delta temporality to this OTC, a single delta temporality counter metric will be correctly aggregated here.

However, if multiple statsd receivers or OTel SDKs generate the same stream using delta temporality, this code will not be able to correctly aggregate; the same is true if one stream contains mixed number types, but that hardly seems important given this other limitation.

To overcome the "Single stream" limitation, the exporter can either:

  1. Blindly apply all deltas, regardless of timing. As long as all the producers behave correctly, there is little opportunity for incorrectness except due to replayed data.
  2. Maintain a map of start-times already applied, possibly with resource information. If the same resource updates a cumulative metric repeatedly, that's when the start_time==end_time test adds correctness.

I think that this change is useful even with the caveat that it only supports one stream, for now. OTOH, blindly applying all deltas isn't very wrong and is very simple. What do you think?

Copy link
Contributor Author

@locmai locmai May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the specification of the single-writer principle:

Multiple writers for a metric stream is considered an error state, or misbehaving system. Receivers SHOULD presume a single writer was intended and eliminate overlap / deduplicate.

If I understood that correctly, I would consider that case is an error in the configuration and it should be the responsible of the receivers to handle that probably.

In this case, I would prefer to assume that all deltas comes to the exporter would be from a single stream and with correct timing.

Instead of blindly applying all the deltas, we could handle it:

OpenTelemetry collectors SHOULD export telemetry when they observe overlapping points in data streams, so that the user can monitor for erroneous configurations.

So if any data points fall into the overlapped case, we report it?

Copy link
Contributor

@jmacd jmacd May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your reasoning.
I was under the impression that the exporter at this point does not keep the Resource to distinguish the sender of the stream; it may be that two senders with different resources are producing the same metric and attributes--that's the case I was thinking of. In any case, you've done a good thing here and I don't want to block it, let's document what it does and move on!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice nice nice! thank you for helping me out with this one!

Hi @Aneurysm9 , could you also take a look? This is for the very old one that we have discussed before to fix the dropping metrics issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that documentation of the expected behavior here would be helpful. Also some tests that exercise this capability.

As for the case of two producers of the same metric with different resources, we do have the resource attributes available at this point but don't seem to include them in the timeseries signature. Would doing that remove the concern about improper accumulation? What knock-on effects might that have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By documenting the expected behavior, I believe I should make another PR to update the Prometheus data model specification here for this change?

With the resource attributes regardless the single write principle, we could identify the 2 producers and therefore could handle the accumulation probably for overlapping case or when the nextDataPoint.startTimestamp > lastDataPoint.Timestamp. I see no side-effect yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the spec update here: open-telemetry/opentelemetry-specification#2570

I'll start working the tests this week soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi folks @jmacd @Aneurysm9 , I just updated the specification (merged open-telemetry/opentelemetry-specification#2570) and the tests for this PR.

Bumped the coverage from 97.7% of statements to 99.3% of statements.

The original TestAccumulateDeltaAggregation simply test the cases where metrics got dropped for both detal aggregation for Sum and Histogram (since they are non-monotonic by default) so I moved some of them to a new one TestAccumulateDroppedMetrics alongside with the MetricAggregationTemporalityUnspecified cases.

}
}

m := createMetric(metric)
m.Sum().SetIsMonotonic(metric.Sum().IsMonotonic())
m.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
Expand Down
198 changes: 172 additions & 26 deletions exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,6 @@ func TestAccumulateDeltaAggregation(t *testing.T) {
name string
fillMetric func(time.Time, pmetric.Metric)
}{
{
name: "IntSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "Sum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(42.42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "Histogram",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
Expand Down Expand Up @@ -386,6 +360,178 @@ func TestAccumulateMetrics(t *testing.T) {
}
}

func TestAccumulateDeltaToCumulative(t *testing.T) {
tests := []struct {
name string
metric func(time.Time, time.Time, float64, pmetric.MetricSlice)
}{
{
name: "MonotonicDeltaIntSum",
metric: func(startTimestamp, ts time.Time, v float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.SetDescription("test description")
metric.Sum().SetIsMonotonic(true)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(int64(v))
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTimestamp))
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "MonotonicDeltaSum",
metric: func(startTimestamp, timestamp time.Time, v float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetIsMonotonic(true)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
metric.SetDescription("test description")
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(v)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTimestamp))
dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

ts1 := time.Now().Add(-3 * time.Second)
ts2 := time.Now().Add(-2 * time.Second)
ts3 := time.Now().Add(-1 * time.Second)

resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)

// The first point arrived
tt.metric(ts1, ts2, 11, ilm.Metrics())
n := a.Accumulate(resourceMetrics)

require.Equal(t, 1, n)

// The next point arrived
tt.metric(ts2, ts3, 31, ilm.Metrics())
n = a.Accumulate(resourceMetrics)

require.Equal(t, 2, n)

mLabels, _, mValue, _, mIsMonotonic := getMetricProperties(ilm.Metrics().At(1))
signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), mLabels)
m, ok := a.registeredMetrics.Load(signature)
require.True(t, ok)

v := m.(*accumulatedValue)
vLabels, vTS, vValue, vTemporality, vIsMonotonic := getMetricProperties(v.value)

require.Equal(t, v.scope.Name(), "test")
require.Equal(t, v.value.DataType(), ilm.Metrics().At(0).DataType())
require.Equal(t, v.value.DataType(), ilm.Metrics().At(1).DataType())

vLabels.Range(func(k string, v pcommon.Value) bool {
r, _ := mLabels.Get(k)
require.Equal(t, r, v)
return true
})
require.Equal(t, mLabels.Len(), vLabels.Len())
require.Equal(t, mValue, vValue)
require.Equal(t, pmetric.MetricAggregationTemporalityCumulative, vTemporality)
require.Equal(t, mIsMonotonic, vIsMonotonic)

require.Equal(t, ts3.Unix(), vTS.Unix())
})
}
}

func TestAccumulateDroppedMetrics(t *testing.T) {
tests := []struct {
name string
fillMetric func(time.Time, pmetric.Metric)
}{
{
name: "NonMonotonicIntSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
metric.Sum().SetIsMonotonic(false)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "NonMonotonicSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
metric.Sum().SetIsMonotonic(false)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(42.42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "UnspecifiedIntSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityUnspecified)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetIntVal(42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
{
name: "UnspecifiedSum",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityUnspecified)
dp := metric.Sum().DataPoints().AppendEmpty()
dp.SetDoubleVal(42.42)
dp.Attributes().InsertString("label_1", "1")
dp.Attributes().InsertString("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
tt.fillMetric(time.Now(), ilm.Metrics().AppendEmpty())

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 0, n)

signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), pcommon.NewMap())
v, ok := a.registeredMetrics.Load(signature)
require.False(t, ok)
require.Nil(t, v)
})
}
}

func getMetricProperties(metric pmetric.Metric) (
attributes pcommon.Map,
ts time.Time,
Expand Down
5 changes: 3 additions & 2 deletions receiver/statsdreceiver/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func (p *StatsDParser) GetMetrics() pmetric.Metrics {
)
}

p.lastIntervalTime = timeNowFunc()
jmacd marked this conversation as resolved.
Show resolved Hide resolved
p.gauges = make(map[statsDMetricDescription]pmetric.ScopeMetrics)
p.counters = make(map[statsDMetricDescription]pmetric.ScopeMetrics)
p.timersAndDistributions = make([]pmetric.ScopeMetrics, 0)
Expand Down Expand Up @@ -210,7 +209,9 @@ func (p *StatsDParser) Aggregate(line string) error {
case CounterType:
_, ok := p.counters[parsedMetric.description]
if !ok {
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNowFunc(), p.lastIntervalTime)
timeNow := timeNowFunc()
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNow, p.lastIntervalTime)
p.lastIntervalTime = timeNow
jmacd marked this conversation as resolved.
Show resolved Hide resolved
} else {
point := p.counters[parsedMetric.description].Metrics().At(0).Sum().DataPoints().At(0)
point.SetIntVal(point.IntVal() + parsedMetric.counterValue())
Expand Down
10 changes: 5 additions & 5 deletions receiver/statsdreceiver/protocol/statsd_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(711, 0)),
},
expectedTimer: []pmetric.ScopeMetrics{},
},
Expand All @@ -614,7 +614,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(711, 0)),
},
expectedTimer: []pmetric.ScopeMetrics{},
},
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 215, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 75, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 75, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0), time.Unix(711, 0)),
},
expectedTimer: []pmetric.ScopeMetrics{},
},
Expand Down Expand Up @@ -729,7 +729,7 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0), time.Unix(711, 0)),
},
},
}
Expand Down Expand Up @@ -777,7 +777,7 @@ func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) {
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", 7000, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0), time.Unix(611, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0), time.Unix(611, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", 50, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0), time.Unix(711, 0)),
},
},
}
Expand Down