Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusexporter] accumulate delta temporality histograms #23790

Merged
merged 35 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
574b131
[exporter/prometheusexporter] accumulate delta temporality sums and h…
nabam Aug 18, 2022
2385f79
Merge branch 'main' into delta-prom
nabam Aug 18, 2022
ac88fb9
Update exporter/prometheusexporter/accumulator.go
nabam Oct 18, 2022
b96d0f5
Update exporter/prometheusexporter/accumulator.go
nabam Oct 18, 2022
f21cc65
Merge branch 'main' into chore/rebase-nabam-delta-prom
locmai Mar 30, 2023
3c69be1
fix tests
locmai Mar 30, 2023
ba8d644
add back aggregation change
locmai Mar 30, 2023
d9d5b95
set empty histogram
locmai Mar 30, 2023
4034315
demo
locmai Mar 30, 2023
3631465
Revert "demo"
locmai Mar 30, 2023
aa6d049
Revert "Revert "demo""
locmai Mar 30, 2023
11bb9a6
update aggTemporality
locmai Mar 30, 2023
6f805d3
update changelog
locmai Mar 31, 2023
7b5aa64
fix the tests by setting bounds and bucket counts
locmai Mar 31, 2023
1979cbf
fix per aneurysm9's comment
locmai Apr 4, 2023
e42fcea
handle delta-cumulative histogram interval misalignments
hkfgo Jun 25, 2023
504e3b5
handle delta-cumulative histogram interval misalignments
hkfgo Jun 26, 2023
98c0090
add test for misalignments
hkfgo Jun 26, 2023
b2e18ca
add code change and test to address out-of-bound error when comparing…
hkfgo Jun 26, 2023
c7d17bb
Merge pull request #1 from hkfgo/chore/rebase-nabam-delta-prom
locmai Jun 27, 2023
3344f60
handle delta-cumulative histogram interval misalignments
hkfgo Jun 27, 2023
a52f5af
handle delta-cumulative histogram interval misalignments
hkfgo Jun 27, 2023
107b53b
add test for misalignments
hkfgo Jun 27, 2023
ca4dde2
add code change and test to address out-of-bound error when comparing…
hkfgo Jun 27, 2023
fa39587
Merge branch 'main' into chore/rebase-nabam-delta-prom
hkfgo Jun 30, 2023
5705810
handle hasSum mismatch and do independent test setups
hkfgo Jul 6, 2023
eaa04e2
remove sum check
hkfgo Jul 11, 2023
231902d
Merge branch 'main' into chore/rebase-nabam-delta-prom
hkfgo Jul 17, 2023
42a7098
Merge branch 'main' into chore/rebase-nabam-delta-prom
hkfgo Aug 28, 2023
b6e66e3
merge master
hkfgo Aug 28, 2023
7d8b888
handle timestamp bug found during end-to-end testing
hkfgo Aug 29, 2023
e6f0cef
Merge branch 'main' into chore/rebase-nabam-delta-prom
Nov 1, 2023
5d6f49d
Merge branch 'main' into chore/rebase-nabam-delta-prom
hkfgo Dec 6, 2023
2c906ef
tidy up accumulator comments and logs
hkfgo Dec 6, 2023
a52102d
tidy up accumulator comments and logs
hkfgo Dec 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/promexp-delta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Accumulate histograms with delta temporality

# One or more tracking issues related to the change
issues: [4968]
113 changes: 91 additions & 22 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (a *lastValueAccumulator) addMetric(metric pmetric.Metric, il pcommon.Instr
case pmetric.MetricTypeSum:
return a.accumulateSum(metric, il, resourceAttrs, now)
case pmetric.MetricTypeHistogram:
return a.accumulateDoubleHistogram(metric, il, resourceAttrs, now)
return a.accumulateHistogram(metric, il, resourceAttrs, now)
case pmetric.MetricTypeSummary:
return a.accumulateSummary(metric, il, resourceAttrs, now)
default:
Expand Down Expand Up @@ -161,19 +161,19 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon
}

func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
doubleSum := metric.Sum()
sum := metric.Sum()

// Drop metrics with unspecified aggregations
if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityUnspecified {
if sum.AggregationTemporality() == pmetric.AggregationTemporalityUnspecified {
return
}

// Drop non-monotonic and non-cumulative metrics
if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && !doubleSum.IsMonotonic() {
if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && !sum.IsMonotonic() {
return
}

dps := doubleSum.DataPoints()
dps := sum.DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

Expand Down Expand Up @@ -201,7 +201,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
}

// Delta-to-Cumulative
if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).Timestamp() {
if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).Timestamp() {
hkfgo marked this conversation as resolved.
Show resolved Hide resolved
ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
switch ip.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
Expand All @@ -218,45 +218,75 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
}

hkfgo marked this conversation as resolved.
Show resolved Hide resolved
return
}

func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
doubleHistogram := metric.Histogram()

// Drop metrics with non-cumulative aggregations
if doubleHistogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
return
}
func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
histogram := metric.Histogram()
a.logger.Debug("Accumulate histogram.....")
dps := histogram.DataPoints()

dps := doubleHistogram.DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs) // uniquely idenity this time series you are accumulating for

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: idenity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh oops, some of the comments were actually just for my own understanding of the code. Let me go ahead and remove those.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you needed to make the comments to understand the code, others may as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you still looking to fix this typo?

if ip.Flags().NoRecordedValue() {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series
if !ok {
// first data point
a.logger.Debug("Accumulate first histogram data point")
hkfgo marked this conversation as resolved.
Show resolved Hide resolved
m := copyMetricMetadata(metric)
ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
continue
}
mv := v.(*accumulatedValue)

if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
m := copyMetricMetadata(metric)
m.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

switch histogram.AggregationTemporality() {
case pmetric.AggregationTemporalityDelta:
pp := mv.value.Histogram().DataPoints().At(0) // previous aggregated value for time range
if ip.StartTimestamp().AsTime() != pp.Timestamp().AsTime() {
// treat misalgnment as restart and reset, or violation of single-writer principle and drop
hkfgo marked this conversation as resolved.
Show resolved Hide resolved
a.logger.With(
zap.String("ip_start_time", ip.StartTimestamp().String()),
zap.String("pp_start_time", pp.StartTimestamp().String()),
zap.String("pp_timestamp", pp.Timestamp().String()),
zap.String("ip_timestamp", ip.Timestamp().String()),
).Warn("Misaligned starting timestamps")
if ip.StartTimestamp().AsTime().After(pp.Timestamp().AsTime()) {
a.logger.Debug("treating it like reset")
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
} else {
a.logger.With(
zap.String("metric_name", metric.Name()),
).Warn("Dropped misaligned histogram datapoint")
continue
}
} else {
a.logger.Debug("Accumulate another histogram datapoint")
accumulateHistogramValues(pp, ip, m.Histogram().DataPoints().AppendEmpty())
}
case pmetric.AggregationTemporalityCumulative:
if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
continue
}

ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
default:
// unsupported temporality
continue
}

m := copyMetricMetadata(metric)
ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
}
Expand Down Expand Up @@ -316,3 +346,42 @@ func copyMetricMetadata(metric pmetric.Metric) pmetric.Metric {

return m
}

func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
dest.SetStartTimestamp(prev.StartTimestamp())

older := prev
newer := current
if current.Timestamp().AsTime().Before(prev.Timestamp().AsTime()) {
older = current
newer = prev
}

newer.Attributes().CopyTo(dest.Attributes())
dest.SetTimestamp(newer.Timestamp())

// checking for bucket boundary alignment, optionally re-aggregate on newer boundaries
match := older.ExplicitBounds().Len() == newer.ExplicitBounds().Len()
for i := 0; match && i < newer.ExplicitBounds().Len(); i++ {
match = older.ExplicitBounds().At(i) == newer.ExplicitBounds().At(i)
}

if match {

dest.SetCount(newer.Count() + older.Count())
dest.SetSum(newer.Sum() + older.Sum())

counts := make([]uint64, newer.BucketCounts().Len())
for i := 0; i < newer.BucketCounts().Len(); i++ {
counts[i] = newer.BucketCounts().At(i) + older.BucketCounts().At(i)
}
dest.BucketCounts().FromRaw(counts)
} else {
// use new value if bucket bounds do not match
dest.SetCount(newer.Count())
dest.SetSum(newer.Sum())
dest.BucketCounts().FromRaw(newer.BucketCounts().AsRaw())
}

dest.ExplicitBounds().FromRaw(newer.ExplicitBounds().AsRaw())
}
Loading