From e368276257de587c235e11531da66842a2582ae4 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 5 Jan 2023 14:22:06 -0800 Subject: [PATCH] Create metric API Callback type (#3564) * Create metric API Callback type Document the type according the OTel specification requirements. * Update all impls of the metric API with new type * Add changes to changelog * Update PR number in changelog entry --- CHANGELOG.md | 4 ++++ metric/internal/global/meter.go | 2 +- metric/internal/global/meter_types_test.go | 2 +- metric/meter.go | 15 ++++++++++++++- metric/noop.go | 2 +- sdk/metric/meter.go | 8 ++------ sdk/metric/pipeline.go | 6 +++--- 7 files changed, 26 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60003ffc014..628f3aba84c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package. This `Registration` can be used to unregister callbacks. (#3522) - Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524) +- Add the `Callback` function type to the `go.opentelemetry.io/otel/metric` package. + This new named function type is registered with a `Meter`. (#3564) ### Changed @@ -52,6 +54,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `InstrumentKindAsyncCounter` is renamed to `InstrumentKindObservableCounter` - `InstrumentKindAsyncUpDownCounter` is renamed to `InstrumentKindObservableUpDownCounter` - `InstrumentKindAsyncGauge` is renamed to `InstrumentKindObservableGauge` +- Update the `RegisterCallback` method of the `Meter` in the `go.opentelemetry.io/otel/sdk/metric` package to accept the added `Callback` type instead of an inline function type definition. + The underlying type of a `Callback` is the same `func(context.Context)` that the method used to accept. (#3564) ### Deprecated diff --git a/metric/internal/global/meter.go b/metric/internal/global/meter.go index 940728cb2ea..06de9176983 100644 --- a/metric/internal/global/meter.go +++ b/metric/internal/global/meter.go @@ -283,7 +283,7 @@ func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option // // It is only valid to call Observe within the scope of the passed function, // and only on the instruments that were registered with this call. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f metric.Callback) (metric.Registration, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { insts = unwrapInstruments(insts) return del.RegisterCallback(insts, f) diff --git a/metric/internal/global/meter_types_test.go b/metric/internal/global/meter_types_test.go index b77f7926840..6e49ce84f51 100644 --- a/metric/internal/global/meter_types_test.go +++ b/metric/internal/global/meter_types_test.go @@ -119,7 +119,7 @@ func (m *testMeter) Float64ObservableGauge(name string, options ...instrument.Op // // It is only valid to call Observe within the scope of the passed function, // and only on the instruments that were registered with this call. -func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { +func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f metric.Callback) (metric.Registration, error) { m.callbacks = append(m.callbacks, f) return testReg{ f: func(idx int) func() { diff --git a/metric/meter.go b/metric/meter.go index 83f2d6a8189..604254c10ab 100644 --- a/metric/meter.go +++ b/metric/meter.go @@ -106,9 +106,22 @@ type Meter interface { // // If no instruments are passed, f should not be registered nor called // during collection. - RegisterCallback(instruments []instrument.Asynchronous, f func(context.Context)) (Registration, error) + RegisterCallback(instruments []instrument.Asynchronous, f Callback) (Registration, error) } +// Callback is a function registered with a Meter that makes observations for +// the set of instruments it is registered with. +// +// The function needs to complete in a finite amount of time and the deadline +// of the passed context is expected to be honored. +// +// The function needs to make unique observations across all registered +// Callbacks. Meaning, it should not report measurements for an instrument with +// the same attributes as another Callback will report. +// +// The function needs to be concurrent safe. +type Callback func(context.Context) + // Registration is an token representing the unique registration of a callback // for a set of instruments with a Meter. type Registration interface { diff --git a/metric/noop.go b/metric/noop.go index 568257c0964..0b0a9707d14 100644 --- a/metric/noop.go +++ b/metric/noop.go @@ -92,7 +92,7 @@ func (noopMeter) Float64ObservableGauge(string, ...instrument.Option) (asyncfloa } // RegisterCallback creates a register callback that does not record any metrics. -func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) (Registration, error) { +func (noopMeter) RegisterCallback([]instrument.Asynchronous, Callback) (Registration, error) { return noopReg{}, nil } diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index c82a4fb5f55..d28f53f0fdb 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -15,8 +15,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "context" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" @@ -143,7 +141,7 @@ func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f metric.Callback) (metric.Registration, error) { for _, inst := range insts { // Only register if at least one instrument has a non-drop aggregation. // Otherwise, calling f during collection will be wasted computation. @@ -174,9 +172,7 @@ func (noopRegister) Unregister() error { return nil } -type callback func(context.Context) - -func (m *meter) registerCallback(c callback) (metric.Registration, error) { +func (m *meter) registerCallback(c metric.Callback) (metric.Registration, error) { return m.pipes.registerCallback(c), nil } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 17cdadf918c..4ebabc0c1d2 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -96,7 +96,7 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) { } // addCallback registers a callback to be run when `produce()` is called. -func (p *pipeline) addCallback(c callback) (unregister func()) { +func (p *pipeline) addCallback(c metric.Callback) (unregister func()) { p.Lock() defer p.Unlock() e := p.callbacks.PushBack(c) @@ -126,7 +126,7 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err for e := p.callbacks.Front(); e != nil; e = e.Next() { // TODO make the callbacks parallel. ( #3034 ) - f := e.Value.(callback) + f := e.Value.(metric.Callback) f(ctx) if err := ctx.Err(); err != nil { // This means the context expired before we finished running callbacks. @@ -447,7 +447,7 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli return pipes } -func (p pipelines) registerCallback(c callback) metric.Registration { +func (p pipelines) registerCallback(c metric.Callback) metric.Registration { unregs := make([]func(), len(p)) for i, pipe := range p { unregs[i] = pipe.addCallback(c)