From 36479b5c55a97d80b044e5f6101a6f6aabc0e20b Mon Sep 17 00:00:00 2001 From: Ruslan Kovalov Date: Fri, 10 Mar 2023 18:16:00 +0400 Subject: [PATCH] [connectors/spanmetrics] Set resource attributes for generated metrics. (#19216) * [connectors/spanmetrics] Set resource attributes for generated metrics. --- .chloggen/spanmetrics-set-resource-attrs.yaml | 11 + connector/spanmetricsconnector/connector.go | 138 +++++++----- .../spanmetricsconnector/connector_test.go | 208 +++++++++--------- connector/spanmetricsconnector/go.mod | 4 + connector/spanmetricsconnector/go.sum | 2 + 5 files changed, 206 insertions(+), 157 deletions(-) create mode 100644 .chloggen/spanmetrics-set-resource-attrs.yaml diff --git a/.chloggen/spanmetrics-set-resource-attrs.yaml b/.chloggen/spanmetrics-set-resource-attrs.yaml new file mode 100644 index 000000000000..ced448d9910c --- /dev/null +++ b/.chloggen/spanmetrics-set-resource-attrs.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: spanmetricsconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Set resource attributes for generated metrics. + +# One or more tracking issues related to the change +issues: [18502] diff --git a/connector/spanmetricsconnector/connector.go b/connector/spanmetricsconnector/connector.go index 80b43fe57423..a1d714e60908 100644 --- a/connector/spanmetricsconnector/connector.go +++ b/connector/spanmetricsconnector/connector.go @@ -33,6 +33,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/internal/cache" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" ) const ( @@ -63,9 +64,7 @@ type connectorImp struct { // The starting time of the data points. startTimestamp pcommon.Timestamp - // Metrics - histograms metrics.HistogramMetrics - sums metrics.SumMetrics + resourceMetrics map[resourceKey]*resourceMetrics keyBuf *bytes.Buffer @@ -80,6 +79,12 @@ type connectorImp struct { shutdownOnce sync.Once } +type resourceMetrics struct { + histograms metrics.HistogramMetrics + sums metrics.SumMetrics + attributes pcommon.Map +} + type dimension struct { name string value *pcommon.Value @@ -109,33 +114,11 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic return nil, err } - var histograms metrics.HistogramMetrics - if cfg.Histogram.Exponential != nil { - maxSize := cfg.Histogram.Exponential.MaxSize - if cfg.Histogram.Exponential.MaxSize == 0 { - maxSize = structure.DefaultMaxSize - } - histograms = metrics.NewExponentialHistogramMetrics(maxSize) - } else { - bounds := defaultHistogramBucketsMs - // TODO remove deprecated `latency_histogram_buckets` - if cfg.LatencyHistogramBuckets != nil { - logger.Warn("latency_histogram_buckets is deprecated. " + - "Use `histogram: explicit: buckets` to set histogram buckets") - bounds = durationsToUnits(cfg.LatencyHistogramBuckets, unitDivider(cfg.Histogram.Unit)) - } - if cfg.Histogram.Explicit != nil && cfg.Histogram.Explicit.Buckets != nil { - bounds = durationsToUnits(cfg.Histogram.Explicit.Buckets, unitDivider(cfg.Histogram.Unit)) - } - histograms = metrics.NewExplicitHistogramMetrics(bounds) - } - return &connectorImp{ logger: logger, config: *cfg, startTimestamp: pcommon.NewTimestampFromTime(time.Now()), - histograms: histograms, - sums: metrics.NewSumMetrics(), + resourceMetrics: make(map[resourceKey]*resourceMetrics), dimensions: newDimensions(cfg.Dimensions), keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), metricKeyToDimensions: metricKeyToDimensionsCache, @@ -144,6 +127,28 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic }, nil } +func (p *connectorImp) initHistogramMetrics() metrics.HistogramMetrics { + cfg := p.config + if cfg.Histogram.Exponential != nil { + maxSize := structure.DefaultMaxSize + if cfg.Histogram.Exponential.MaxSize != 0 { + maxSize = cfg.Histogram.Exponential.MaxSize + } + return metrics.NewExponentialHistogramMetrics(maxSize) + } + bounds := defaultHistogramBucketsMs + // TODO remove deprecated `latency_histogram_buckets` + if cfg.LatencyHistogramBuckets != nil { + p.logger.Warn("latency_histogram_buckets is deprecated. " + + "Use `histogram: explicit: buckets` to set histogram buckets") + bounds = durationsToUnits(cfg.LatencyHistogramBuckets, unitDivider(cfg.Histogram.Unit)) + } + if cfg.Histogram.Explicit != nil && cfg.Histogram.Explicit.Buckets != nil { + bounds = durationsToUnits(cfg.Histogram.Explicit.Buckets, unitDivider(cfg.Histogram.Unit)) + } + return metrics.NewExplicitHistogramMetrics(bounds) +} + // unitDivider returns a unit divider to convert nanoseconds to milliseconds or seconds. func unitDivider(u metrics.Unit) int64 { return map[metrics.Unit]int64{ @@ -199,8 +204,7 @@ func (p *connectorImp) Capabilities() consumer.Capabilities { } // ConsumeTraces implements the consumer.Traces interface. -// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter. -// The original input trace data will be forwarded to the next consumer, unmodified. +// It aggregates the trace data to generate metrics. func (p *connectorImp) ConsumeTraces(_ context.Context, traces ptrace.Traces) error { p.lock.Lock() p.aggregateMetrics(traces) @@ -223,53 +227,49 @@ func (p *connectorImp) exportMetrics(ctx context.Context) { } } -// buildMetrics collects the computed raw metrics data, builds the metrics object and -// writes the raw metrics data into the metrics object. +// buildMetrics collects the computed raw metrics data and builds OTLP metrics. func (p *connectorImp) buildMetrics() pmetric.Metrics { m := pmetric.NewMetrics() - ilm := m.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() - ilm.Scope().SetName("spanmetricsconnector") - - p.buildCallsMetric(ilm) - p.buildDurationMetric(ilm) + for _, rawMetrics := range p.resourceMetrics { + rm := m.ResourceMetrics().AppendEmpty() + rawMetrics.attributes.CopyTo(rm.Resource().Attributes()) + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("spanmetricsconnector") + + sums := rawMetrics.sums + metric := sm.Metrics().AppendEmpty() + metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls)) + sums.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality()) + + histograms := rawMetrics.histograms + metric = sm.Metrics().AppendEmpty() + metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration)) + metric.SetUnit(p.config.Histogram.Unit.String()) + histograms.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality()) + } return m } -// buildDurationMetric collects the raw call count metrics and builds -// a explicit or exponential buckets histogram scope metric. -func (p *connectorImp) buildDurationMetric(ilm pmetric.ScopeMetrics) { - m := ilm.Metrics().AppendEmpty() - m.SetName(buildMetricName(p.config.Namespace, metricNameDuration)) - m.SetUnit(p.config.Histogram.Unit.String()) - - p.histograms.BuildMetrics(m, p.startTimestamp, p.config.GetAggregationTemporality()) -} - -// buildCallsMetric collects the raw call count metrics and builds -// a sum scope metric. -func (p *connectorImp) buildCallsMetric(ilm pmetric.ScopeMetrics) { - m := ilm.Metrics().AppendEmpty() - m.SetName(buildMetricName(p.config.Namespace, metricNameCalls)) - - p.sums.BuildMetrics(m, p.startTimestamp, p.config.GetAggregationTemporality()) -} - func (p *connectorImp) resetState() { // If delta metrics, reset accumulated data if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta { - p.histograms.Reset(false) - p.sums.Reset() + p.resourceMetrics = make(map[resourceKey]*resourceMetrics) p.metricKeyToDimensions.Purge() } else { p.metricKeyToDimensions.RemoveEvictedItems() // Exemplars are only relevant to this batch of traces, so must be cleared within the lock - p.histograms.Reset(true) + for _, m := range p.resourceMetrics { + m.histograms.Reset(true) + } } } // aggregateMetrics aggregates the raw metrics from the input trace data. +// +// Metrics are grouped by resource attributes. // Each metric is identified by a key that is built from the service name // and span metadata such as name, kind, status_code and any additional // dimensions the user has configured. @@ -282,6 +282,10 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) { continue } + rm := p.getOrCreateResourceMetrics(resourceAttr) + sums := rm.sums + histograms := rm.histograms + unitDivider := unitDivider(p.config.Histogram.Unit) serviceName := serviceAttr.Str() ilsSlice := rspans.ScopeSpans() @@ -306,20 +310,36 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) { } // aggregate histogram metrics - h := p.histograms.GetOrCreate(key, attributes) + h := histograms.GetOrCreate(key, attributes) h.Observe(duration) if !span.TraceID().IsEmpty() { h.AddExemplar(span.TraceID(), span.SpanID(), duration) } // aggregate sums metrics - s := p.sums.GetOrCreate(key, attributes) + s := sums.GetOrCreate(key, attributes) s.Add(1) } } } } +type resourceKey [16]byte + +func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics { + key := resourceKey(pdatautil.MapHash(attr)) + v, ok := p.resourceMetrics[key] + if !ok { + v = &resourceMetrics{ + histograms: p.initHistogramMetrics(), + sums: metrics.NewSumMetrics(), + attributes: attr, + } + p.resourceMetrics[key] = v + } + return v +} + func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) pcommon.Map { attr := pcommon.NewMap() attr.EnsureCapacity(4 + len(p.dimensions)) diff --git a/connector/spanmetricsconnector/connector_test.go b/connector/spanmetricsconnector/connector_test.go index 4e981dde85b2..56a7ce7d1255 100644 --- a/connector/spanmetricsconnector/connector_test.go +++ b/connector/spanmetricsconnector/connector_test.go @@ -15,7 +15,6 @@ package spanmetricsconnector import ( - "bytes" "context" "fmt" "sync" @@ -40,7 +39,6 @@ import ( "go.uber.org/zap/zaptest/observer" "google.golang.org/grpc/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/internal/cache" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/mocks" ) @@ -113,56 +111,76 @@ func verifyMultipleCumulativeConsumptions() func(t testing.TB, input pmetric.Met // This is the best point to verify the computed metrics from spans are as expected. func verifyConsumeMetricsInput(t testing.TB, input pmetric.Metrics, expectedTemporality pmetric.AggregationTemporality, numCumulativeConsumptions int) bool { require.Equal(t, 6, input.DataPointCount(), - "Should be 3 for each of calls count and duration. Each group of 3 data points is made of: "+ - "service-a (server kind) -> service-a (client kind) -> service-b (service kind)", + "Should be 3 for each of call count and latency split into two resource scopes defined by: "+ + "service-a: service-a (server kind) -> service-a (client kind) and "+ + "service-b: service-b (service kind)", ) - rm := input.ResourceMetrics() - require.Equal(t, 1, rm.Len()) + require.Equal(t, 2, input.ResourceMetrics().Len()) - ilm := rm.At(0).ScopeMetrics() - require.Equal(t, 1, ilm.Len()) - assert.Equal(t, "spanmetricsconnector", ilm.At(0).Scope().Name()) + for i := 0; i < input.ResourceMetrics().Len(); i++ { + rm := input.ResourceMetrics().At(i) - m := ilm.At(0).Metrics() - require.Equal(t, 2, m.Len()) + var numDataPoints int + val, ok := rm.Resource().Attributes().Get(serviceNameKey) + require.True(t, ok) + serviceName := val.AsString() + if serviceName == "service-a" { + numDataPoints = 2 + } else if serviceName == "service-b" { + numDataPoints = 1 + } - seenMetricIDs := make(map[metricID]bool) - // The first 3 data points are for call counts. - assert.Equal(t, metricNameCalls, m.At(0).Name()) - assert.Equal(t, expectedTemporality, m.At(0).Sum().AggregationTemporality()) - assert.True(t, m.At(0).Sum().IsMonotonic()) - callsDps := m.At(0).Sum().DataPoints() - require.Equal(t, 3, callsDps.Len()) - for dpi := 0; dpi < 3; dpi++ { - dp := callsDps.At(dpi) - assert.Equal(t, int64(numCumulativeConsumptions), dp.IntValue(), "There should only be one metric per Service/name/kind combination") - assert.NotZero(t, dp.StartTimestamp(), "StartTimestamp should be set") - assert.NotZero(t, dp.Timestamp(), "Timestamp should be set") - verifyMetricLabels(dp, t, seenMetricIDs) - } + ilm := rm.ScopeMetrics() + require.Equal(t, 1, ilm.Len()) + assert.Equal(t, "spanmetricsconnector", ilm.At(0).Scope().Name()) + + m := ilm.At(0).Metrics() + require.Equal(t, 2, m.Len(), "only sum and histogram metric types generated") + + // validate calls - sum metrics + metric := m.At(0) + assert.Equal(t, metricNameCalls, metric.Name()) + assert.Equal(t, expectedTemporality, metric.Sum().AggregationTemporality()) + assert.True(t, metric.Sum().IsMonotonic()) + + seenMetricIDs := make(map[metricID]bool) + callsDps := metric.Sum().DataPoints() + require.Equal(t, numDataPoints, callsDps.Len()) + for dpi := 0; dpi < numDataPoints; dpi++ { + dp := callsDps.At(dpi) + assert.Equal(t, + int64(numCumulativeConsumptions), + dp.IntValue(), + "There should only be one metric per Service/name/kind combination", + ) + assert.NotZero(t, dp.StartTimestamp(), "StartTimestamp should be set") + assert.NotZero(t, dp.Timestamp(), "Timestamp should be set") + verifyMetricLabels(dp, t, seenMetricIDs) + } - h := m.At(1) - assert.Equal(t, metricNameDuration, h.Name()) - assert.Equal(t, defaultUnit.String(), h.Unit()) - - // The remaining 3 data points are for duration. - if h.Type() == pmetric.MetricTypeExponentialHistogram { - hist := h.ExponentialHistogram() - assert.Equal(t, expectedTemporality, hist.AggregationTemporality()) - verifyExponentialHistogramDataPoints(t, hist.DataPoints(), numCumulativeConsumptions) - } else { - hist := h.Histogram() - assert.Equal(t, expectedTemporality, hist.AggregationTemporality()) - verifyExplicitHistogramDataPoints(t, hist.DataPoints(), numCumulativeConsumptions) + // validate latency - histogram metrics + metric = m.At(1) + assert.Equal(t, metricNameDuration, metric.Name()) + assert.Equal(t, defaultUnit.String(), metric.Unit()) + + if metric.Type() == pmetric.MetricTypeExponentialHistogram { + hist := metric.ExponentialHistogram() + assert.Equal(t, expectedTemporality, hist.AggregationTemporality()) + verifyExponentialHistogramDataPoints(t, hist.DataPoints(), numDataPoints, numCumulativeConsumptions) + } else { + hist := metric.Histogram() + assert.Equal(t, expectedTemporality, hist.AggregationTemporality()) + verifyExplicitHistogramDataPoints(t, hist.DataPoints(), numDataPoints, numCumulativeConsumptions) + } } return true } -func verifyExplicitHistogramDataPoints(t testing.TB, dps pmetric.HistogramDataPointSlice, numCumulativeConsumptions int) { +func verifyExplicitHistogramDataPoints(t testing.TB, dps pmetric.HistogramDataPointSlice, numDataPoints, numCumulativeConsumptions int) { seenMetricIDs := make(map[metricID]bool) - require.Equal(t, 3, dps.Len()) - for dpi := 0; dpi < 3; dpi++ { + require.Equal(t, numDataPoints, dps.Len()) + for dpi := 0; dpi < numDataPoints; dpi++ { dp := dps.At(dpi) assert.Equal( t, @@ -198,10 +216,10 @@ func verifyExplicitHistogramDataPoints(t testing.TB, dps pmetric.HistogramDataPo } } -func verifyExponentialHistogramDataPoints(t testing.TB, dps pmetric.ExponentialHistogramDataPointSlice, numCumulativeConsumptions int) { +func verifyExponentialHistogramDataPoints(t testing.TB, dps pmetric.ExponentialHistogramDataPointSlice, numDataPoints, numCumulativeConsumptions int) { seenMetricIDs := make(map[metricID]bool) - require.Equal(t, 3, dps.Len()) - for dpi := 0; dpi < 3; dpi++ { + require.Equal(t, numDataPoints, dps.Len()) + for dpi := 0; dpi < numDataPoints; dpi++ { dp := dps.At(dpi) assert.Equal( t, @@ -337,12 +355,22 @@ func initSpan(span span, s ptrace.Span) { s.SetSpanID(pcommon.SpanID([8]byte{byte(42)})) } -func initExplicitHistograms() metrics.HistogramMetrics { - return metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs) +func explicitHistogramsConfig() HistogramConfig { + return HistogramConfig{ + Unit: defaultUnit, + Explicit: &ExplicitHistogramConfig{ + Buckets: []time.Duration{4 * time.Second, 6 * time.Second, 8 * time.Second}, + }, + } } -func initExponentialHistograms() metrics.HistogramMetrics { - return metrics.NewExponentialHistogramMetrics(10) +func exponentialHistogramsConfig() HistogramConfig { + return HistogramConfig{ + Unit: defaultUnit, + Exponential: &ExponentialHistogramConfig{ + MaxSize: 10, + }, + } } func TestBuildKeySameServiceNameCharSequence(t *testing.T) { @@ -467,7 +495,7 @@ func TestConcurrentShutdown(t *testing.T) { ticker := mockClock.NewTicker(time.Nanosecond) // Test - p := newConnectorImp(new(consumertest.MetricsSink), nil, initExplicitHistograms, cumulative, logger, ticker) + p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, cumulative, logger, ticker) err := p.Start(ctx, componenttest.NewNopHost()) require.NoError(t, err) @@ -496,10 +524,11 @@ func TestConcurrentShutdown(t *testing.T) { return len(allLogs) > 0 }, time.Second, time.Millisecond*10) + // Building spanmetrics connector... // Starting spanmetricsconnector... // Shutting down spanmetricsconnector... // Stopping ticker. - assert.Len(t, allLogs, 3) + assert.Len(t, allLogs, 4) } func TestConnectorCapabilities(t *testing.T) { @@ -534,7 +563,7 @@ func TestConsumeMetricsErrors(t *testing.T) { mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) - p := newConnectorImp(mcon, nil, initExplicitHistograms, cumulative, logger, ticker) + p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, cumulative, logger, ticker) ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.Start(ctx, componenttest.NewNopHost()) @@ -570,7 +599,7 @@ func TestConsumeTraces(t *testing.T) { testcases := []struct { name string aggregationTemporality string - histograms func() metrics.HistogramMetrics + histogramConfig func() HistogramConfig verifier func(t testing.TB, input pmetric.Metrics) bool traces []ptrace.Traces }{ @@ -578,14 +607,14 @@ func TestConsumeTraces(t *testing.T) { { name: "Test single consumption, three spans (Cumulative), using exp. histogram", aggregationTemporality: cumulative, - histograms: initExponentialHistograms, + histogramConfig: exponentialHistogramsConfig, verifier: verifyConsumeMetricsInputCumulative, traces: []ptrace.Traces{buildSampleTrace()}, }, { name: "Test single consumption, three spans (Delta), using exp. histogram", aggregationTemporality: delta, - histograms: initExponentialHistograms, + histogramConfig: exponentialHistogramsConfig, verifier: verifyConsumeMetricsInputDelta, traces: []ptrace.Traces{buildSampleTrace()}, }, @@ -593,7 +622,7 @@ func TestConsumeTraces(t *testing.T) { // More consumptions, should accumulate additively. name: "Test two consumptions (Cumulative), using exp. histogram", aggregationTemporality: cumulative, - histograms: initExponentialHistograms, + histogramConfig: exponentialHistogramsConfig, verifier: verifyMultipleCumulativeConsumptions(), traces: []ptrace.Traces{buildSampleTrace(), buildSampleTrace()}, }, @@ -601,7 +630,7 @@ func TestConsumeTraces(t *testing.T) { // More consumptions, should not accumulate. Therefore, end state should be the same as single consumption case. name: "Test two consumptions (Delta), using exp. histogram", aggregationTemporality: delta, - histograms: initExponentialHistograms, + histogramConfig: exponentialHistogramsConfig, verifier: verifyConsumeMetricsInputDelta, traces: []ptrace.Traces{buildSampleTrace(), buildSampleTrace()}, }, @@ -609,7 +638,7 @@ func TestConsumeTraces(t *testing.T) { // Consumptions with improper timestamps name: "Test bad consumptions (Delta), using exp. histogram", aggregationTemporality: cumulative, - histograms: initExponentialHistograms, + histogramConfig: exponentialHistogramsConfig, verifier: verifyBadMetricsOkay, traces: []ptrace.Traces{buildBadSampleTrace()}, }, @@ -618,14 +647,14 @@ func TestConsumeTraces(t *testing.T) { { name: "Test single consumption, three spans (Cumulative).", aggregationTemporality: cumulative, - histograms: initExplicitHistograms, + histogramConfig: explicitHistogramsConfig, verifier: verifyConsumeMetricsInputCumulative, traces: []ptrace.Traces{buildSampleTrace()}, }, { name: "Test single consumption, three spans (Delta).", aggregationTemporality: delta, - histograms: initExplicitHistograms, + histogramConfig: explicitHistogramsConfig, verifier: verifyConsumeMetricsInputDelta, traces: []ptrace.Traces{buildSampleTrace()}, }, @@ -633,7 +662,7 @@ func TestConsumeTraces(t *testing.T) { // More consumptions, should accumulate additively. name: "Test two consumptions (Cumulative).", aggregationTemporality: cumulative, - histograms: initExplicitHistograms, + histogramConfig: explicitHistogramsConfig, verifier: verifyMultipleCumulativeConsumptions(), traces: []ptrace.Traces{buildSampleTrace(), buildSampleTrace()}, }, @@ -641,7 +670,7 @@ func TestConsumeTraces(t *testing.T) { // More consumptions, should not accumulate. Therefore, end state should be the same as single consumption case. name: "Test two consumptions (Delta).", aggregationTemporality: delta, - histograms: initExplicitHistograms, + histogramConfig: explicitHistogramsConfig, verifier: verifyConsumeMetricsInputDelta, traces: []ptrace.Traces{buildSampleTrace(), buildSampleTrace()}, }, @@ -649,7 +678,7 @@ func TestConsumeTraces(t *testing.T) { // Consumptions with improper timestamps name: "Test bad consumptions (Delta).", aggregationTemporality: cumulative, - histograms: initExplicitHistograms, + histogramConfig: explicitHistogramsConfig, verifier: verifyBadMetricsOkay, traces: []ptrace.Traces{buildBadSampleTrace()}, }, @@ -671,12 +700,10 @@ func TestConsumeTraces(t *testing.T) { return tc.verifier(t, input) })).Return(nil) - defaultNullValue := pcommon.NewValueStr("defaultNullValue") - mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) - p := newConnectorImp(mcon, &defaultNullValue, tc.histograms, tc.aggregationTemporality, zaptest.NewLogger(t), ticker) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.aggregationTemporality, zaptest.NewLogger(t), ticker) ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.Start(ctx, componenttest.NewNopHost()) @@ -701,8 +728,7 @@ func TestMetricKeyCache(t *testing.T) { mcon := &mocks.MetricsConsumer{} mcon.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil) - defaultNullValue := pcommon.NewValueStr("defaultNullValue") - p := newConnectorImp(mcon, &defaultNullValue, initExplicitHistograms, cumulative, zaptest.NewLogger(t), nil) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, cumulative, zaptest.NewLogger(t), nil) traces := buildSampleTrace() // Test @@ -734,8 +760,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) { mcon := &mocks.MetricsConsumer{} mcon.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil) - defaultNullValue := pcommon.NewValueStr("defaultNullValue") - conn := newConnectorImp(mcon, &defaultNullValue, initExplicitHistograms, cumulative, zaptest.NewLogger(b), nil) + conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, cumulative, zaptest.NewLogger(b), nil) traces := buildSampleTrace() @@ -746,28 +771,12 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) { } } -func newConnectorImp( - mcon consumer.Metrics, - defaultNullValue *pcommon.Value, - histograms func() metrics.HistogramMetrics, - temporality string, - logger *zap.Logger, - ticker *clock.Ticker, -) *connectorImp { - defaultNotInSpanAttrVal := pcommon.NewValueStr("defaultNotInSpanAttrVal") - // use size 2 for LRU cache for testing purpose - metricKeyToDimensions, err := cache.NewCache[metrics.Key, pcommon.Map](DimensionsCacheSize) - if err != nil { - panic(err) - } - return &connectorImp{ - logger: logger, - config: Config{AggregationTemporality: temporality, Histogram: HistogramConfig{Unit: defaultUnit}}, - metricsConsumer: mcon, - startTimestamp: pcommon.NewTimestampFromTime(time.Now()), - histograms: histograms(), - sums: metrics.NewSumMetrics(), - dimensions: []dimension{ +func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker) *connectorImp { + cfg := &Config{ + AggregationTemporality: temporality, + Histogram: histogramConfig(), + DimensionsCacheSize: DimensionsCacheSize, + Dimensions: []Dimension{ // Set nil defaults to force a lookup for the attribute in the span. {stringAttrName, nil}, {intAttrName, nil}, @@ -777,17 +786,21 @@ func newConnectorImp( {arrayAttrName, nil}, {nullAttrName, defaultNullValue}, // Add a default value for an attribute that doesn't exist in a span - {notInSpanAttrName0, &defaultNotInSpanAttrVal}, + {notInSpanAttrName0, stringp("defaultNotInSpanAttrVal")}, // Leave the default value unset to test that this dimension should not be added to the metric. {notInSpanAttrName1, nil}, // Add a resource attribute to test "process" attributes like IP, host, region, cluster, etc. {regionResourceAttrName, nil}, }, - keyBuf: new(bytes.Buffer), - metricKeyToDimensions: metricKeyToDimensions, - ticker: ticker, - done: make(chan struct{}), } + c, err := newConnector(logger, cfg, ticker) + require.NoError(t, err) + c.metricsConsumer = mcon + return c +} + +func stringp(str string) *string { + return &str } func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) { @@ -886,12 +899,11 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) { return true })).Return(nil) - defaultNullValue := pcommon.NewValueStr("defaultNullValue") mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) // Note: default dimension key cache size is 2. - p := newConnectorImp(mcon, &defaultNullValue, initExplicitHistograms, cumulative, zaptest.NewLogger(t), ticker) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, cumulative, zaptest.NewLogger(t), ticker) ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.Start(ctx, componenttest.NewNopHost()) diff --git a/connector/spanmetricsconnector/go.mod b/connector/spanmetricsconnector/go.mod index 594129b1eab9..5740fadf162e 100644 --- a/connector/spanmetricsconnector/go.mod +++ b/connector/spanmetricsconnector/go.mod @@ -6,6 +6,7 @@ require ( github.com/hashicorp/golang-lru v0.6.0 github.com/lightstep/go-expohisto v1.0.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.73.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.73.0 github.com/stretchr/testify v1.8.2 github.com/tilinna/clock v1.1.0 go.opentelemetry.io/collector v0.73.0 @@ -21,6 +22,7 @@ require ( require ( github.com/benbjohnson/clock v1.3.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -51,3 +53,5 @@ require ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil diff --git a/connector/spanmetricsconnector/go.sum b/connector/spanmetricsconnector/go.sum index bf38134ac561..9a570842d4b9 100644 --- a/connector/spanmetricsconnector/go.sum +++ b/connector/spanmetricsconnector/go.sum @@ -29,6 +29,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=