diff --git a/CHANGELOG.md b/CHANGELOG.md index 23a4d7e8eae..dcb939728e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed +- Delegated instruments are unwrapped before delegating Callbacks. (#2784) - Resolve supply-chain failure for the markdown-link-checker GitHub action by calling the CLI directly. (#2834) ## [0.29.0] - 2022-04-11 diff --git a/metric/internal/global/instruments.go b/metric/internal/global/instruments.go index 6b2004af65f..c98dfdf7c99 100644 --- a/metric/internal/global/instruments.go +++ b/metric/internal/global/instruments.go @@ -54,6 +54,13 @@ func (i *afCounter) Observe(ctx context.Context, x float64, attrs ...attribute.K } } +func (i *afCounter) unwrap() instrument.Asynchronous { + if ctr := i.delegate.Load(); ctr != nil { + return ctr.(asyncfloat64.Counter) + } + return nil +} + type afUpDownCounter struct { name string opts []instrument.Option @@ -80,6 +87,13 @@ func (i *afUpDownCounter) Observe(ctx context.Context, x float64, attrs ...attri } } +func (i *afUpDownCounter) unwrap() instrument.Asynchronous { + if ctr := i.delegate.Load(); ctr != nil { + return ctr.(asyncfloat64.UpDownCounter) + } + return nil +} + type afGauge struct { name string opts []instrument.Option @@ -106,6 +120,13 @@ func (i *afGauge) Observe(ctx context.Context, x float64, attrs ...attribute.Key } } +func (i *afGauge) unwrap() instrument.Asynchronous { + if ctr := i.delegate.Load(); ctr != nil { + return ctr.(asyncfloat64.Gauge) + } + return nil +} + type aiCounter struct { name string opts []instrument.Option @@ -132,6 +153,13 @@ func (i *aiCounter) Observe(ctx context.Context, x int64, attrs ...attribute.Key } } +func (i *aiCounter) unwrap() instrument.Asynchronous { + if ctr := i.delegate.Load(); ctr != nil { + return ctr.(asyncint64.Counter) + } + return nil +} + type aiUpDownCounter struct { name string opts []instrument.Option @@ -158,6 +186,13 @@ func (i *aiUpDownCounter) Observe(ctx context.Context, x int64, attrs ...attribu } } +func (i *aiUpDownCounter) unwrap() instrument.Asynchronous { + if ctr := i.delegate.Load(); ctr != nil { + return ctr.(asyncint64.UpDownCounter) + } + return nil +} + type aiGauge struct { name string opts []instrument.Option @@ -184,6 +219,13 @@ func (i *aiGauge) Observe(ctx context.Context, x int64, attrs ...attribute.KeyVa } } +func (i *aiGauge) unwrap() instrument.Asynchronous { + if ctr := i.delegate.Load(); ctr != nil { + return ctr.(asyncint64.Gauge) + } + return nil +} + //Sync Instruments type sfCounter struct { name string diff --git a/metric/internal/global/meter.go b/metric/internal/global/meter.go index 9001ae9427b..1acac5c20cc 100644 --- a/metric/internal/global/meter.go +++ b/metric/internal/global/meter.go @@ -169,6 +169,7 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // and only on the instruments that were registered with this call. func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error { if del, ok := m.delegate.Load().(metric.Meter); ok { + insts = unwrapInstruments(insts) return del.RegisterCallback(insts, function) } @@ -182,6 +183,24 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func( return nil } +type wrapped interface { + unwrap() instrument.Asynchronous +} + +func unwrapInstruments(instruments []instrument.Asynchronous) []instrument.Asynchronous { + out := make([]instrument.Asynchronous, 0, len(instruments)) + + for _, inst := range instruments { + if in, ok := inst.(wrapped); ok { + out = append(out, in.unwrap()) + } else { + out = append(out, inst) + } + } + + return out +} + // SyncInt64 is the namespace for the Synchronous Integer instruments func (m *meter) SyncInt64() syncint64.InstrumentProvider { if del, ok := m.delegate.Load().(metric.Meter); ok { @@ -204,7 +223,8 @@ type delegatedCallback struct { } func (c *delegatedCallback) setDelegate(m metric.Meter) { - err := m.RegisterCallback(c.instruments, c.function) + insts := unwrapInstruments(c.instruments) + err := m.RegisterCallback(insts, c.function) if err != nil { otel.Handle(err) } diff --git a/sdk/metric/controller/controllertest/controller_test.go b/sdk/metric/controller/controllertest/controller_test.go new file mode 100644 index 00000000000..394d806238b --- /dev/null +++ b/sdk/metric/controller/controllertest/controller_test.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllertest // import "go.opentelemetry.io/otel/sdk/metric/controller/controllertest" + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" + "go.opentelemetry.io/otel/sdk/metric/export/aggregation" + "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" +) + +type errorCatcher struct { + lock sync.Mutex + errors []error +} + +func (e *errorCatcher) Handle(err error) { + e.lock.Lock() + defer e.lock.Unlock() + + e.errors = append(e.errors, err) +} + +func TestEndToEnd(t *testing.T) { + h := &errorCatcher{} + otel.SetErrorHandler(h) + + meter := global.Meter("go.opentelemetry.io/otel/sdk/metric/controller/controllertest_EndToEnd") + gauge, err := meter.AsyncInt64().Gauge("test") + require.NoError(t, err) + err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(context.Context) {}) + require.NoError(t, err) + + c := controller.New(basic.NewFactory(simple.NewWithInexpensiveDistribution(), aggregation.CumulativeTemporalitySelector())) + + global.SetMeterProvider(c) + + gauge, err = meter.AsyncInt64().Gauge("test2") + require.NoError(t, err) + err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(context.Context) {}) + require.NoError(t, err) + + h.lock.Lock() + require.Len(t, h.errors, 0) + +}