diff --git a/connector/spanmetricsconnector/connector.go b/connector/spanmetricsconnector/connector.go index 1e5d54b793d7..d2ea7feb9e18 100644 --- a/connector/spanmetricsconnector/connector.go +++ b/connector/spanmetricsconnector/connector.go @@ -279,17 +279,6 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics { * - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ... */ deltaMetricKeys := make(map[metrics.Key]bool) - startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp { - startTime := rawMetrics.startTimestamp - if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta { - if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok { - startTime = lastTimestamp - } - // Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update. - deltaMetricKeys[mk] = true - } - return startTime - } metricsNamespace := p.config.Namespace if legacyMetricNamesFeatureGate.IsEnabled() && metricsNamespace == DefaultNamespace { @@ -299,21 +288,21 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics { sums := rawMetrics.sums metric := sm.Metrics().AppendEmpty() metric.SetName(buildMetricName(metricsNamespace, metricNameCalls)) - sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality()) + sums.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality()) if !p.config.Histogram.Disable { histograms := rawMetrics.histograms metric = sm.Metrics().AppendEmpty() metric.SetName(buildMetricName(metricsNamespace, metricNameDuration)) metric.SetUnit(p.config.Histogram.Unit.String()) - histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality()) + histograms.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality()) } events := rawMetrics.events if p.events.Enabled { metric = sm.Metrics().AppendEmpty() metric.SetName(buildMetricName(metricsNamespace, metricNameEvents)) - events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality()) + events.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality()) } for mk := range deltaMetricKeys { @@ -407,13 +396,13 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) { } if !p.config.Histogram.Disable { // aggregate histogram metrics - h := histograms.GetOrCreate(key, attributes) + h := histograms.GetOrCreate(key, attributes, startTimestamp) p.addExemplar(span, duration, h) h.Observe(duration) } // aggregate sums metrics - s := sums.GetOrCreate(key, attributes) + s := sums.GetOrCreate(key, attributes, startTimestamp) if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() { s.AddExemplar(span.TraceID(), span.SpanID(), duration) } @@ -437,7 +426,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) { eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions) p.metricKeyToDimensions.Add(eKey, eAttributes) } - e := events.GetOrCreate(eKey, eAttributes) + e := events.GetOrCreate(eKey, eAttributes, startTimestamp) if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() { e.AddExemplar(span.TraceID(), span.SpanID(), duration) } @@ -481,8 +470,8 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimesta if !ok { v = &resourceMetrics{ histograms: initHistogramMetrics(p.config), - sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint), - events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint), + sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp), + events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp), attributes: attr, startTimestamp: startTimestamp, } diff --git a/connector/spanmetricsconnector/internal/metrics/metrics.go b/connector/spanmetricsconnector/internal/metrics/metrics.go index 7b6cc937f637..55c4d2126e91 100644 --- a/connector/spanmetricsconnector/internal/metrics/metrics.go +++ b/connector/spanmetricsconnector/internal/metrics/metrics.go @@ -14,8 +14,8 @@ import ( type Key string type HistogramMetrics interface { - GetOrCreate(key Key, attributes pcommon.Map) Histogram - BuildMetrics(pmetric.Metric, generateStartTimestamp, pcommon.Timestamp, pmetric.AggregationTemporality) + GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram + BuildMetrics(pmetric.Metric, pcommon.Timestamp, pmetric.AggregationTemporality) ClearExemplars() } @@ -47,6 +47,9 @@ type explicitHistogram struct { bounds []float64 maxExemplarCount *int + + startTimestamp pcommon.Timestamp + lastSeenTimestamp pcommon.Timestamp } type exponentialHistogram struct { @@ -56,6 +59,9 @@ type exponentialHistogram struct { histogram *structure.Histogram[float64] maxExemplarCount *int + + startTimestamp pcommon.Timestamp + lastSeenTimestamp pcommon.Timestamp } type generateStartTimestamp = func(Key) pcommon.Timestamp @@ -76,7 +82,7 @@ func NewExplicitHistogramMetrics(bounds []float64, maxExemplarCount *int) Histog } } -func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram { +func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram { h, ok := m.metrics[key] if !ok { h = &explicitHistogram{ @@ -89,21 +95,27 @@ func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) m.metrics[key] = h } + h.lastSeenTimestamp = startTimestamp return h } func (m *explicitHistogramMetrics) BuildMetrics( metric pmetric.Metric, - startTimestamp generateStartTimestamp, timestamp pcommon.Timestamp, temporality pmetric.AggregationTemporality, ) { metric.SetEmptyHistogram().SetAggregationTemporality(temporality) dps := metric.Histogram().DataPoints() dps.EnsureCapacity(len(m.metrics)) - for k, h := range m.metrics { + for _, h := range m.metrics { dp := dps.AppendEmpty() - dp.SetStartTimestamp(startTimestamp(k)) + var startTimeStamp pcommon.Timestamp + if temporality == pmetric.AggregationTemporalityDelta { + startTimeStamp = h.lastSeenTimestamp + } else { + startTimeStamp = h.startTimestamp + } + dp.SetStartTimestamp(startTimeStamp) dp.SetTimestamp(timestamp) dp.ExplicitBounds().FromRaw(h.bounds) dp.BucketCounts().FromRaw(h.bucketCounts) @@ -123,7 +135,7 @@ func (m *explicitHistogramMetrics) ClearExemplars() { } } -func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram { +func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimeStamp pcommon.Timestamp) Histogram { h, ok := m.metrics[key] if !ok { histogram := new(structure.Histogram[float64]) @@ -142,28 +154,34 @@ func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Ma } + h.lastSeenTimestamp = startTimeStamp return h } func (m *exponentialHistogramMetrics) BuildMetrics( metric pmetric.Metric, - startTimestamp generateStartTimestamp, timestamp pcommon.Timestamp, temporality pmetric.AggregationTemporality, ) { metric.SetEmptyExponentialHistogram().SetAggregationTemporality(temporality) dps := metric.ExponentialHistogram().DataPoints() dps.EnsureCapacity(len(m.metrics)) - for k, m := range m.metrics { + for _, e := range m.metrics { dp := dps.AppendEmpty() - dp.SetStartTimestamp(startTimestamp(k)) + var startTimeStamp pcommon.Timestamp + if temporality == pmetric.AggregationTemporalityDelta { + startTimeStamp = e.lastSeenTimestamp + } else { + startTimeStamp = e.startTimestamp + } + dp.SetStartTimestamp(startTimeStamp) dp.SetTimestamp(timestamp) - expoHistToExponentialDataPoint(m.histogram, dp) - for i := 0; i < m.exemplars.Len(); i++ { - m.exemplars.At(i).SetTimestamp(timestamp) + expoHistToExponentialDataPoint(e.histogram, dp) + for i := 0; i < e.exemplars.Len(); i++ { + e.exemplars.At(i).SetTimestamp(timestamp) } - m.exemplars.CopyTo(dp.Exemplars()) - m.attributes.CopyTo(dp.Attributes()) + e.exemplars.CopyTo(dp.Exemplars()) + e.attributes.CopyTo(dp.Attributes()) } } @@ -238,17 +256,21 @@ func (h *exponentialHistogram) AddExemplar(traceID pcommon.TraceID, spanID pcomm } type Sum struct { - attributes pcommon.Map - count uint64 + attributes pcommon.Map + count uint64 + exemplars pmetric.ExemplarSlice maxExemplarCount *int + + startTimestamp pcommon.Timestamp + lastSeenTimestamp pcommon.Timestamp } func (s *Sum) Add(value uint64) { s.count += value } -func NewSumMetrics(maxExemplarCount *int) SumMetrics { +func NewSumMetrics(maxExemplarCount *int, startTimeStamp pcommon.Timestamp) SumMetrics { return SumMetrics{ metrics: make(map[Key]*Sum), maxExemplarCount: maxExemplarCount, @@ -260,16 +282,18 @@ type SumMetrics struct { maxExemplarCount *int } -func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map) *Sum { +func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) *Sum { s, ok := m.metrics[key] if !ok { s = &Sum{ attributes: attributes, exemplars: pmetric.NewExemplarSlice(), maxExemplarCount: m.maxExemplarCount, + startTimestamp: startTimestamp, } m.metrics[key] = s } + s.lastSeenTimestamp = startTimestamp return s } @@ -285,7 +309,6 @@ func (s *Sum) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value func (m *SumMetrics) BuildMetrics( metric pmetric.Metric, - startTimestamp generateStartTimestamp, timestamp pcommon.Timestamp, temporality pmetric.AggregationTemporality, ) { @@ -294,9 +317,15 @@ func (m *SumMetrics) BuildMetrics( dps := metric.Sum().DataPoints() dps.EnsureCapacity(len(m.metrics)) - for k, s := range m.metrics { + for _, s := range m.metrics { dp := dps.AppendEmpty() - dp.SetStartTimestamp(startTimestamp(k)) + var startTimeStamp pcommon.Timestamp + if temporality == pmetric.AggregationTemporalityDelta { + startTimeStamp = s.lastSeenTimestamp + } else { + startTimeStamp = s.startTimestamp + } + dp.SetStartTimestamp(startTimeStamp) dp.SetTimestamp(timestamp) dp.SetIntValue(int64(s.count)) for i := 0; i < s.exemplars.Len(); i++ {