Skip to content

Commit

Permalink
handle hasSum mismatch and do independent test setups
Browse files Browse the repository at this point in the history
  • Loading branch information
hkfgo committed Jul 6, 2023
1 parent fa39587 commit 5705810
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 188 deletions.
35 changes: 17 additions & 18 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,13 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs) // uniquely idenity this time series you are accumulating for
signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
if ip.Flags().NoRecordedValue() {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series
v, ok := a.registeredMetrics.Load(signature)
if !ok {
// first data point
m := copyMetricMetadata(metric)
Expand All @@ -253,18 +253,24 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco

switch histogram.AggregationTemporality() {
case pmetric.AggregationTemporalityDelta:
if ip.StartTimestamp().AsTime() != mv.value.Histogram().DataPoints().At(0).StartTimestamp().AsTime() {
// treat misalgnment as restart and reset or violation of single-writer principle and drop
if ip.StartTimestamp().AsTime().After(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
pp := mv.value.Histogram().DataPoints().At(0) // previous aggregated value for time range
if ip.StartTimestamp().AsTime() != pp.StartTimestamp().AsTime() {
// treat misalgnment as restart and reset, or violation of single-writer principle and drop
if ip.StartTimestamp().AsTime().After(pp.Timestamp().AsTime()) {
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
} else {
a.logger.With(
zap.String("time_series", signature),
zap.String("metric_name", metric.Name()),
).Warn("Dropped misaligned histogram datapoint")
continue
}
} else if ip.HasSum() != pp.HasSum() {
a.logger.With(
zap.String("metric_name", metric.Name()),
).Warn("Dropped histogram data point due to disagreement on sum tracking")
continue
} else {
accumulateHistogramValues(mv.value.Histogram().DataPoints().At(0), ip, m.Histogram().DataPoints().AppendEmpty())
accumulateHistogramValues(pp, ip, m.Histogram().DataPoints().AppendEmpty())
}
case pmetric.AggregationTemporalityCumulative:
if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
Expand Down Expand Up @@ -351,21 +357,14 @@ func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
dest.SetTimestamp(newer.Timestamp())

// checking for bucket boundary alignment, optionally re-aggregate on newer boundaries
match := true
if older.ExplicitBounds().Len() == newer.ExplicitBounds().Len() {
for i := 0; i < newer.ExplicitBounds().Len(); i++ {
if older.ExplicitBounds().At(i) != newer.ExplicitBounds().At(i) {
match = false
break
}
}
} else {
match = false
match := older.ExplicitBounds().Len() == newer.ExplicitBounds().Len()
for i := 0; match && i < newer.ExplicitBounds().Len(); i++ {
match = older.ExplicitBounds().At(i) == newer.ExplicitBounds().At(i)
}

if match {
dest.SetCount(newer.Count() + older.Count())
if newer.HasSum() && older.HasSum() {
if newer.HasSum() {
dest.SetSum(newer.Sum() + older.Sum())
}

Expand Down
Loading

0 comments on commit 5705810

Please sign in to comment.