diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ffccf17b8d..ff05ad60c55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317) - Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337) - Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338) +- The `ManualReader` will not panic if `AggregationSelector` returns `nil`. (#4350) +- If a Reader's AggregationSelector return nil or DefaultAggregation the pipeline will use the default aggregation. (#4350) - Log a suggested view that fixes instrument conflicts in `go.opentelemetry.io/otel/sdk/metric`. (#4349) - Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353) diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 5677003550b..b8dcf1fccda 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -227,6 +227,9 @@ func WithAggregationSelector(selector AggregationSelector) ManualReaderOption { // Deep copy and validate before using. wrapped := func(ik InstrumentKind) aggregation.Aggregation { a := selector(ik) + if a == nil { + return nil + } cpA := a.Copy() if err := cpA.Err(); err != nil { cpA = DefaultAggregationSelector(ik) diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index bfa1879120b..65f5acfa6c8 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -1811,3 +1812,148 @@ func BenchmarkInstrumentCreation(b *testing.B) { sfHistogram, _ = meter.Float64Histogram("sync.float64.histogram") } } + +func testNilAggregationSelector(InstrumentKind) aggregation.Aggregation { + return nil +} +func testDefaultAggregationSelector(InstrumentKind) aggregation.Aggregation { + return aggregation.Default{} +} +func testUndefinedTemporalitySelector(InstrumentKind) metricdata.Temporality { + return metricdata.Temporality(0) +} +func testInvalidTemporalitySelector(InstrumentKind) metricdata.Temporality { + return metricdata.Temporality(255) +} + +type noErrorHandler struct { + t *testing.T +} + +func (h noErrorHandler) Handle(err error) { + assert.NoError(h.t, err) +} + +func TestMalformedSelectors(t *testing.T) { + type testCase struct { + name string + reader Reader + } + testCases := []testCase{ + { + name: "nil aggregation selector", + reader: NewManualReader(WithAggregationSelector(testNilAggregationSelector)), + }, + { + name: "nil aggregation selector periodic", + reader: NewPeriodicReader(&fnExporter{aggregationFunc: testNilAggregationSelector}), + }, + { + name: "default aggregation selector", + reader: NewManualReader(WithAggregationSelector(testDefaultAggregationSelector)), + }, + { + name: "default aggregation selector periodic", + reader: NewPeriodicReader(&fnExporter{aggregationFunc: testDefaultAggregationSelector}), + }, + { + name: "undefined temporality selector", + reader: NewManualReader(WithTemporalitySelector(testUndefinedTemporalitySelector)), + }, + { + name: "undefined temporality selector periodic", + reader: NewPeriodicReader(&fnExporter{temporalityFunc: testUndefinedTemporalitySelector}), + }, + { + name: "invalid temporality selector", + reader: NewManualReader(WithTemporalitySelector(testInvalidTemporalitySelector)), + }, + { + name: "invalid temporality selector periodic", + reader: NewPeriodicReader(&fnExporter{temporalityFunc: testInvalidTemporalitySelector}), + }, + { + name: "both aggregation and temporality selector", + reader: NewManualReader( + WithAggregationSelector(testNilAggregationSelector), + WithTemporalitySelector(testUndefinedTemporalitySelector), + ), + }, + { + name: "both aggregation and temporality selector periodic", + reader: NewPeriodicReader(&fnExporter{ + aggregationFunc: testNilAggregationSelector, + temporalityFunc: testUndefinedTemporalitySelector, + }), + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + origErrorHandler := global.GetErrorHandler() + defer global.SetErrorHandler(origErrorHandler) + global.SetErrorHandler(noErrorHandler{t}) + + defer func() { + _ = tt.reader.Shutdown(context.Background()) + }() + + meter := NewMeterProvider(WithReader(tt.reader)).Meter("TestNilAggregationSelector") + + // Create All instruments, they should not error + aiCounter, err := meter.Int64ObservableCounter("observable.int64.counter") + require.NoError(t, err) + aiUpDownCounter, err := meter.Int64ObservableUpDownCounter("observable.int64.up.down.counter") + require.NoError(t, err) + aiGauge, err := meter.Int64ObservableGauge("observable.int64.gauge") + require.NoError(t, err) + + afCounter, err := meter.Float64ObservableCounter("observable.float64.counter") + require.NoError(t, err) + afUpDownCounter, err := meter.Float64ObservableUpDownCounter("observable.float64.up.down.counter") + require.NoError(t, err) + afGauge, err := meter.Float64ObservableGauge("observable.float64.gauge") + require.NoError(t, err) + + siCounter, err := meter.Int64Counter("sync.int64.counter") + require.NoError(t, err) + siUpDownCounter, err := meter.Int64UpDownCounter("sync.int64.up.down.counter") + require.NoError(t, err) + siHistogram, err := meter.Int64Histogram("sync.int64.histogram") + require.NoError(t, err) + + sfCounter, err := meter.Float64Counter("sync.float64.counter") + require.NoError(t, err) + sfUpDownCounter, err := meter.Float64UpDownCounter("sync.float64.up.down.counter") + require.NoError(t, err) + sfHistogram, err := meter.Float64Histogram("sync.float64.histogram") + require.NoError(t, err) + + callback := func(ctx context.Context, obs metric.Observer) error { + obs.ObserveInt64(aiCounter, 1) + obs.ObserveInt64(aiUpDownCounter, 1) + obs.ObserveInt64(aiGauge, 1) + obs.ObserveFloat64(afCounter, 1) + obs.ObserveFloat64(afUpDownCounter, 1) + obs.ObserveFloat64(afGauge, 1) + return nil + } + _, err = meter.RegisterCallback(callback, aiCounter, aiUpDownCounter, aiGauge, afCounter, afUpDownCounter, afGauge) + require.NoError(t, err) + + siCounter.Add(context.Background(), 1) + siUpDownCounter.Add(context.Background(), 1) + siHistogram.Record(context.Background(), 1) + sfCounter.Add(context.Background(), 1) + sfUpDownCounter.Add(context.Background(), 1) + sfHistogram.Record(context.Background(), 1) + + var rm metricdata.ResourceMetrics + err = tt.reader.Collect(context.Background(), &rm) + require.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 12) + }) + } +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0d9a363a29b..9b9483fad89 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -311,6 +311,11 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum case nil, aggregation.Default: // Undefined, nil, means to use the default from the reader. stream.Aggregation = i.pipeline.reader.aggregation(kind) + switch stream.Aggregation.(type) { + case nil, aggregation.Default: + // If the reader returns default or nil use the default selector. + stream.Aggregation = DefaultAggregationSelector(kind) + } } if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil { diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 95d8f0d2231..a281104bd08 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -145,6 +145,9 @@ func DefaultTemporalitySelector(InstrumentKind) metricdata.Temporality { // AggregationSelector selects the aggregation and the parameters to use for // that aggregation based on the InstrumentKind. +// +// If the Aggregation returned is nil or DefaultAggregation, the selection from +// DefaultAggregationSelector will be used. type AggregationSelector func(InstrumentKind) aggregation.Aggregation // DefaultAggregationSelector returns the default aggregation and parameters