Skip to content

Commit

Permalink
Fix Span Metrics
Browse files Browse the repository at this point in the history
Currently while generating metrics out of traces, the start timestamp of a metric is currently tied to the root span. When a "new" child span  appears for trace (eg : unhappy path on an api call which results in a new subspan or an async process that was triggered much later), the time starttimestamp for the new metric for the new child span  is that of the parent span(which can be well in the past).
This MR moves the start timestamp from resource level (root span) to metric level(child span)  . (Doesn't consider delta-temporality as of now).
References: open-telemetry#35994
Upstream Fix: open-telemetry#36019
  • Loading branch information
shivanthzen committed Nov 4, 2024
1 parent 67e9293 commit 3ece1cd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 39 deletions.
28 changes: 9 additions & 19 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,35 +283,25 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
* - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ...
*/
deltaMetricKeys := make(map[metrics.Key]bool)
startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp {
startTime := rawMetrics.startTimestamp
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok {
startTime = lastTimestamp
}
// Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update.
deltaMetricKeys[mk] = true
}
return startTime
}

sums := rawMetrics.sums
metric := sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
sums.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())

if !p.config.Histogram.Disable {
histograms := rawMetrics.histograms
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
histograms.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
events.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())
}

for mk := range deltaMetricKeys {
Expand Down Expand Up @@ -405,13 +395,13 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
}
if !p.config.Histogram.Disable {
// aggregate histogram metrics
h := histograms.GetOrCreate(key, attributes)
h := histograms.GetOrCreate(key, attributes, startTimestamp)
p.addExemplar(span, duration, h)
h.Observe(duration)

}
// aggregate sums metrics
s := sums.GetOrCreate(key, attributes)
s := sums.GetOrCreate(key, attributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
s.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand All @@ -435,7 +425,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions)
p.metricKeyToDimensions.Add(eKey, eAttributes)
}
e := events.GetOrCreate(eKey, eAttributes)
e := events.GetOrCreate(eKey, eAttributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
e.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand Down Expand Up @@ -479,8 +469,8 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimesta
if !ok {
v = &resourceMetrics{
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp),
attributes: attr,
startTimestamp: startTimestamp,
}
Expand Down
43 changes: 23 additions & 20 deletions connector/spanmetricsconnector/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
type Key string

type HistogramMetrics interface {
GetOrCreate(key Key, attributes pcommon.Map) Histogram
BuildMetrics(pmetric.Metric, generateStartTimestamp, pcommon.Timestamp, pmetric.AggregationTemporality)
GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram
BuildMetrics(pmetric.Metric, pcommon.Timestamp, pmetric.AggregationTemporality)
ClearExemplars()
}

Expand Down Expand Up @@ -47,6 +47,7 @@ type explicitHistogram struct {
bounds []float64

maxExemplarCount *int
startTimestamp pcommon.Timestamp
}

type exponentialHistogram struct {
Expand All @@ -56,6 +57,7 @@ type exponentialHistogram struct {
histogram *structure.Histogram[float64]

maxExemplarCount *int
startTimestamp pcommon.Timestamp
}

type generateStartTimestamp = func(Key) pcommon.Timestamp
Expand All @@ -76,7 +78,7 @@ func NewExplicitHistogramMetrics(bounds []float64, maxExemplarCount *int) Histog
}
}

func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
h = &explicitHistogram{
Expand All @@ -94,17 +96,16 @@ func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map)

func (m *explicitHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyHistogram().SetAggregationTemporality(temporality)
dps := metric.Histogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, h := range m.metrics {
for _, h := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
dp.SetTimestamp(timestamp)
dp.SetStartTimestamp(h.startTimestamp)
dp.ExplicitBounds().FromRaw(h.bounds)
dp.BucketCounts().FromRaw(h.bucketCounts)
dp.SetCount(h.count)
Expand All @@ -114,6 +115,7 @@ func (m *explicitHistogramMetrics) BuildMetrics(
}
h.exemplars.CopyTo(dp.Exemplars())
h.attributes.CopyTo(dp.Attributes())
dp.Attributes().PutInt("startTimestamp", int64(h.startTimestamp))
}
}

Expand All @@ -123,7 +125,7 @@ func (m *explicitHistogramMetrics) ClearExemplars() {
}
}

func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimeStamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
histogram := new(structure.Histogram[float64])
Expand All @@ -147,23 +149,23 @@ func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Ma

func (m *exponentialHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyExponentialHistogram().SetAggregationTemporality(temporality)
dps := metric.ExponentialHistogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, m := range m.metrics {
for _, e := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
dp.SetStartTimestamp(e.startTimestamp)
dp.SetTimestamp(timestamp)
expoHistToExponentialDataPoint(m.histogram, dp)
for i := 0; i < m.exemplars.Len(); i++ {
m.exemplars.At(i).SetTimestamp(timestamp)
expoHistToExponentialDataPoint(e.histogram, dp)
for i := 0; i < e.exemplars.Len(); i++ {
e.exemplars.At(i).SetTimestamp(timestamp)
}
m.exemplars.CopyTo(dp.Exemplars())
m.attributes.CopyTo(dp.Attributes())
e.exemplars.CopyTo(dp.Exemplars())
e.attributes.CopyTo(dp.Attributes())
dp.Attributes().PutInt("startTimestamp", int64(e.startTimestamp))
}
}

Expand Down Expand Up @@ -242,13 +244,14 @@ type Sum struct {
count uint64
exemplars pmetric.ExemplarSlice
maxExemplarCount *int
startTimestamp pcommon.Timestamp
}

func (s *Sum) Add(value uint64) {
s.count += value
}

func NewSumMetrics(maxExemplarCount *int) SumMetrics {
func NewSumMetrics(maxExemplarCount *int, startTimeStamp pcommon.Timestamp) SumMetrics {
return SumMetrics{
metrics: make(map[Key]*Sum),
maxExemplarCount: maxExemplarCount,
Expand All @@ -260,13 +263,14 @@ type SumMetrics struct {
maxExemplarCount *int
}

func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map) *Sum {
func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) *Sum {
s, ok := m.metrics[key]
if !ok {
s = &Sum{
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
maxExemplarCount: m.maxExemplarCount,
startTimestamp: startTimestamp,
}
m.metrics[key] = s
}
Expand All @@ -285,7 +289,6 @@ func (s *Sum) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value

func (m *SumMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
Expand All @@ -294,9 +297,9 @@ func (m *SumMetrics) BuildMetrics(

dps := metric.Sum().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, s := range m.metrics {
for _, s := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
dp.SetStartTimestamp(s.startTimestamp)
dp.SetTimestamp(timestamp)
dp.SetIntValue(int64(s.count))
for i := 0; i < s.exemplars.Len(); i++ {
Expand Down

0 comments on commit 3ece1cd

Please sign in to comment.