Skip to content

Commit

Permalink
[connectors/spanmetrics] Set resource attributes for generated metric…
Browse files Browse the repository at this point in the history
…s. (#19216)

* [connectors/spanmetrics] Set resource attributes for generated metrics.
  • Loading branch information
kovrus authored Mar 10, 2023
1 parent b0f6db7 commit 36479b5
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 157 deletions.
11 changes: 11 additions & 0 deletions .chloggen/spanmetrics-set-resource-attrs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Set resource attributes for generated metrics.

# One or more tracking issues related to the change
issues: [18502]
138 changes: 79 additions & 59 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/internal/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)

const (
Expand Down Expand Up @@ -63,9 +64,7 @@ type connectorImp struct {
// The starting time of the data points.
startTimestamp pcommon.Timestamp

// Metrics
histograms metrics.HistogramMetrics
sums metrics.SumMetrics
resourceMetrics map[resourceKey]*resourceMetrics

keyBuf *bytes.Buffer

Expand All @@ -80,6 +79,12 @@ type connectorImp struct {
shutdownOnce sync.Once
}

type resourceMetrics struct {
histograms metrics.HistogramMetrics
sums metrics.SumMetrics
attributes pcommon.Map
}

type dimension struct {
name string
value *pcommon.Value
Expand Down Expand Up @@ -109,33 +114,11 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
return nil, err
}

var histograms metrics.HistogramMetrics
if cfg.Histogram.Exponential != nil {
maxSize := cfg.Histogram.Exponential.MaxSize
if cfg.Histogram.Exponential.MaxSize == 0 {
maxSize = structure.DefaultMaxSize
}
histograms = metrics.NewExponentialHistogramMetrics(maxSize)
} else {
bounds := defaultHistogramBucketsMs
// TODO remove deprecated `latency_histogram_buckets`
if cfg.LatencyHistogramBuckets != nil {
logger.Warn("latency_histogram_buckets is deprecated. " +
"Use `histogram: explicit: buckets` to set histogram buckets")
bounds = durationsToUnits(cfg.LatencyHistogramBuckets, unitDivider(cfg.Histogram.Unit))
}
if cfg.Histogram.Explicit != nil && cfg.Histogram.Explicit.Buckets != nil {
bounds = durationsToUnits(cfg.Histogram.Explicit.Buckets, unitDivider(cfg.Histogram.Unit))
}
histograms = metrics.NewExplicitHistogramMetrics(bounds)
}

return &connectorImp{
logger: logger,
config: *cfg,
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
histograms: histograms,
sums: metrics.NewSumMetrics(),
resourceMetrics: make(map[resourceKey]*resourceMetrics),
dimensions: newDimensions(cfg.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
Expand All @@ -144,6 +127,28 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
}, nil
}

func (p *connectorImp) initHistogramMetrics() metrics.HistogramMetrics {
cfg := p.config
if cfg.Histogram.Exponential != nil {
maxSize := structure.DefaultMaxSize
if cfg.Histogram.Exponential.MaxSize != 0 {
maxSize = cfg.Histogram.Exponential.MaxSize
}
return metrics.NewExponentialHistogramMetrics(maxSize)
}
bounds := defaultHistogramBucketsMs
// TODO remove deprecated `latency_histogram_buckets`
if cfg.LatencyHistogramBuckets != nil {
p.logger.Warn("latency_histogram_buckets is deprecated. " +
"Use `histogram: explicit: buckets` to set histogram buckets")
bounds = durationsToUnits(cfg.LatencyHistogramBuckets, unitDivider(cfg.Histogram.Unit))
}
if cfg.Histogram.Explicit != nil && cfg.Histogram.Explicit.Buckets != nil {
bounds = durationsToUnits(cfg.Histogram.Explicit.Buckets, unitDivider(cfg.Histogram.Unit))
}
return metrics.NewExplicitHistogramMetrics(bounds)
}

// unitDivider returns a unit divider to convert nanoseconds to milliseconds or seconds.
func unitDivider(u metrics.Unit) int64 {
return map[metrics.Unit]int64{
Expand Down Expand Up @@ -199,8 +204,7 @@ func (p *connectorImp) Capabilities() consumer.Capabilities {
}

// ConsumeTraces implements the consumer.Traces interface.
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter.
// The original input trace data will be forwarded to the next consumer, unmodified.
// It aggregates the trace data to generate metrics.
func (p *connectorImp) ConsumeTraces(_ context.Context, traces ptrace.Traces) error {
p.lock.Lock()
p.aggregateMetrics(traces)
Expand All @@ -223,53 +227,49 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
}
}

// buildMetrics collects the computed raw metrics data, builds the metrics object and
// writes the raw metrics data into the metrics object.
// buildMetrics collects the computed raw metrics data and builds OTLP metrics.
func (p *connectorImp) buildMetrics() pmetric.Metrics {
m := pmetric.NewMetrics()
ilm := m.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("spanmetricsconnector")

p.buildCallsMetric(ilm)
p.buildDurationMetric(ilm)
for _, rawMetrics := range p.resourceMetrics {
rm := m.ResourceMetrics().AppendEmpty()
rawMetrics.attributes.CopyTo(rm.Resource().Attributes())

sm := rm.ScopeMetrics().AppendEmpty()
sm.Scope().SetName("spanmetricsconnector")

sums := rawMetrics.sums
metric := sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
sums.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())

histograms := rawMetrics.histograms
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
}

return m
}

// buildDurationMetric collects the raw call count metrics and builds
// a explicit or exponential buckets histogram scope metric.
func (p *connectorImp) buildDurationMetric(ilm pmetric.ScopeMetrics) {
m := ilm.Metrics().AppendEmpty()
m.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
m.SetUnit(p.config.Histogram.Unit.String())

p.histograms.BuildMetrics(m, p.startTimestamp, p.config.GetAggregationTemporality())
}

// buildCallsMetric collects the raw call count metrics and builds
// a sum scope metric.
func (p *connectorImp) buildCallsMetric(ilm pmetric.ScopeMetrics) {
m := ilm.Metrics().AppendEmpty()
m.SetName(buildMetricName(p.config.Namespace, metricNameCalls))

p.sums.BuildMetrics(m, p.startTimestamp, p.config.GetAggregationTemporality())
}

func (p *connectorImp) resetState() {
// If delta metrics, reset accumulated data
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
p.histograms.Reset(false)
p.sums.Reset()
p.resourceMetrics = make(map[resourceKey]*resourceMetrics)
p.metricKeyToDimensions.Purge()
} else {
p.metricKeyToDimensions.RemoveEvictedItems()

// Exemplars are only relevant to this batch of traces, so must be cleared within the lock
p.histograms.Reset(true)
for _, m := range p.resourceMetrics {
m.histograms.Reset(true)
}
}
}

// aggregateMetrics aggregates the raw metrics from the input trace data.
//
// Metrics are grouped by resource attributes.
// Each metric is identified by a key that is built from the service name
// and span metadata such as name, kind, status_code and any additional
// dimensions the user has configured.
Expand All @@ -282,6 +282,10 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
continue
}

rm := p.getOrCreateResourceMetrics(resourceAttr)
sums := rm.sums
histograms := rm.histograms

unitDivider := unitDivider(p.config.Histogram.Unit)
serviceName := serviceAttr.Str()
ilsSlice := rspans.ScopeSpans()
Expand All @@ -306,20 +310,36 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
}

// aggregate histogram metrics
h := p.histograms.GetOrCreate(key, attributes)
h := histograms.GetOrCreate(key, attributes)
h.Observe(duration)
if !span.TraceID().IsEmpty() {
h.AddExemplar(span.TraceID(), span.SpanID(), duration)
}

// aggregate sums metrics
s := p.sums.GetOrCreate(key, attributes)
s := sums.GetOrCreate(key, attributes)
s.Add(1)
}
}
}
}

type resourceKey [16]byte

func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
key := resourceKey(pdatautil.MapHash(attr))
v, ok := p.resourceMetrics[key]
if !ok {
v = &resourceMetrics{
histograms: p.initHistogramMetrics(),
sums: metrics.NewSumMetrics(),
attributes: attr,
}
p.resourceMetrics[key] = v
}
return v
}

func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) pcommon.Map {
attr := pcommon.NewMap()
attr.EnsureCapacity(4 + len(p.dimensions))
Expand Down
Loading

0 comments on commit 36479b5

Please sign in to comment.