Skip to content

Commit

Permalink
Fix for delayed global registration (#2784)
Browse files Browse the repository at this point in the history
* Fix for delayed global registration

* Changelog

* Fix license

* Fix ChangeLog
  • Loading branch information
MadVikingGod authored Apr 14, 2022
1 parent 57a248d commit 46a10bb
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Fixed

- Delegated instruments are unwrapped before delegating Callbacks. (#2784)
- Resolve supply-chain failure for the markdown-link-checker GitHub action by calling the CLI directly. (#2834)

## [0.29.0] - 2022-04-11
Expand Down
42 changes: 42 additions & 0 deletions metric/internal/global/instruments.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ func (i *afCounter) Observe(ctx context.Context, x float64, attrs ...attribute.K
}
}

func (i *afCounter) unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(asyncfloat64.Counter)
}
return nil
}

type afUpDownCounter struct {
name string
opts []instrument.Option
Expand All @@ -80,6 +87,13 @@ func (i *afUpDownCounter) Observe(ctx context.Context, x float64, attrs ...attri
}
}

func (i *afUpDownCounter) unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(asyncfloat64.UpDownCounter)
}
return nil
}

type afGauge struct {
name string
opts []instrument.Option
Expand All @@ -106,6 +120,13 @@ func (i *afGauge) Observe(ctx context.Context, x float64, attrs ...attribute.Key
}
}

func (i *afGauge) unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(asyncfloat64.Gauge)
}
return nil
}

type aiCounter struct {
name string
opts []instrument.Option
Expand All @@ -132,6 +153,13 @@ func (i *aiCounter) Observe(ctx context.Context, x int64, attrs ...attribute.Key
}
}

func (i *aiCounter) unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(asyncint64.Counter)
}
return nil
}

type aiUpDownCounter struct {
name string
opts []instrument.Option
Expand All @@ -158,6 +186,13 @@ func (i *aiUpDownCounter) Observe(ctx context.Context, x int64, attrs ...attribu
}
}

func (i *aiUpDownCounter) unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(asyncint64.UpDownCounter)
}
return nil
}

type aiGauge struct {
name string
opts []instrument.Option
Expand All @@ -184,6 +219,13 @@ func (i *aiGauge) Observe(ctx context.Context, x int64, attrs ...attribute.KeyVa
}
}

func (i *aiGauge) unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(asyncint64.Gauge)
}
return nil
}

//Sync Instruments
type sfCounter struct {
name string
Expand Down
22 changes: 21 additions & 1 deletion metric/internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
// and only on the instruments that were registered with this call.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
if del, ok := m.delegate.Load().(metric.Meter); ok {
insts = unwrapInstruments(insts)
return del.RegisterCallback(insts, function)
}

Expand All @@ -182,6 +183,24 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(
return nil
}

type wrapped interface {
unwrap() instrument.Asynchronous
}

func unwrapInstruments(instruments []instrument.Asynchronous) []instrument.Asynchronous {
out := make([]instrument.Asynchronous, 0, len(instruments))

for _, inst := range instruments {
if in, ok := inst.(wrapped); ok {
out = append(out, in.unwrap())
} else {
out = append(out, inst)
}
}

return out
}

// SyncInt64 is the namespace for the Synchronous Integer instruments
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
if del, ok := m.delegate.Load().(metric.Meter); ok {
Expand All @@ -204,7 +223,8 @@ type delegatedCallback struct {
}

func (c *delegatedCallback) setDelegate(m metric.Meter) {
err := m.RegisterCallback(c.instruments, c.function)
insts := unwrapInstruments(c.instruments)
err := m.RegisterCallback(insts, c.function)
if err != nil {
otel.Handle(err)
}
Expand Down
67 changes: 67 additions & 0 deletions sdk/metric/controller/controllertest/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllertest // import "go.opentelemetry.io/otel/sdk/metric/controller/controllertest"

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

type errorCatcher struct {
lock sync.Mutex
errors []error
}

func (e *errorCatcher) Handle(err error) {
e.lock.Lock()
defer e.lock.Unlock()

e.errors = append(e.errors, err)
}

func TestEndToEnd(t *testing.T) {
h := &errorCatcher{}
otel.SetErrorHandler(h)

meter := global.Meter("go.opentelemetry.io/otel/sdk/metric/controller/controllertest_EndToEnd")
gauge, err := meter.AsyncInt64().Gauge("test")
require.NoError(t, err)
err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(context.Context) {})
require.NoError(t, err)

c := controller.New(basic.NewFactory(simple.NewWithInexpensiveDistribution(), aggregation.CumulativeTemporalitySelector()))

global.SetMeterProvider(c)

gauge, err = meter.AsyncInt64().Gauge("test2")
require.NoError(t, err)
err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(context.Context) {})
require.NoError(t, err)

h.lock.Lock()
require.Len(t, h.errors, 0)

}

0 comments on commit 46a10bb

Please sign in to comment.