Skip to content

Commit

Permalink
fix Fix a bug where using multiple readers resulted in incorrect asyn…
Browse files Browse the repository at this point in the history
…chronous counter values
  • Loading branch information
dashpole committed Dec 6, 2023
1 parent d37d851 commit 6c4286a
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 108 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4719)
- Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4719)
- Fix a bug where using multiple readers resulted in incorrect asynchronous counter values in `go.opentelemetry.io/otel/sdk/metric`. (#4742)

## [1.20.0/0.43.0] 2023-11-10

Expand Down
28 changes: 19 additions & 9 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ var (
_ metric.Float64ObservableGauge = float64Observable{}
)

func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string) float64Observable {
return float64Observable{
observable: newObservable(m, kind, name, desc, u, meas),
observable: newObservable[float64](m, kind, name, desc, u),
}
}

Expand All @@ -291,9 +291,9 @@ var (
_ metric.Int64ObservableGauge = int64Observable{}
)

func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string) int64Observable {
return int64Observable{
observable: newObservable(m, kind, name, desc, u, meas),
observable: newObservable[int64](m, kind, name, desc, u),
}
}

Expand All @@ -302,10 +302,10 @@ type observable[N int64 | float64] struct {
observablID[N]

meter *meter
measures []aggregate.Measure[N]
measures measures[N]
}

func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
Expand All @@ -314,14 +314,24 @@ func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc,
unit: u,
scope: m.scope,
},
meter: m,
measures: meas,
meter: m,
}
}

// observe records the val for the set of attrs.
func (o *observable[N]) observe(val N, s attribute.Set) {
for _, in := range o.measures {
o.measures.observe(val, s)
}

func (o *observable[N]) appendMeasures(meas []aggregate.Measure[N]) {
o.measures = append(o.measures, meas...)
}

type measures[N int64 | float64] []aggregate.Measure[N]

// observe records the val for the set of attrs.
func (m measures[N]) observe(val N, s attribute.Set) {
for _, in := range m {
in(context.Background(), val, s)
}
}
Expand Down
182 changes: 92 additions & 90 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,44 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
return i, validateInstrumentName(name)
}

// int64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) {
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for i, pipe := range m.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
inserter := m.int64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
pipe.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
}

// Int64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableCounter,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.int64ObservableInstrument(id, cfg.Callbacks())
}

// Int64ObservableUpDownCounter returns a new instrument identified by name and
Expand All @@ -126,14 +150,14 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
// measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableUpDownCounter,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.int64ObservableInstrument(id, cfg.Callbacks())
}

// Int64ObservableGauge returns a new instrument identified by name and
Expand All @@ -142,14 +166,14 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
cfg := metric.NewInt64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableGauge,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.int64ObservableInstrument(id, cfg.Callbacks())
}

// Float64Counter returns a new instrument identified by name and configured
Expand Down Expand Up @@ -196,20 +220,44 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
return i, validateInstrumentName(name)
}

// float64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) {
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for i, pipe := range m.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
inserter := m.float64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
pipe.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
}

// Float64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableCounter,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.float64ObservableInstrument(id, cfg.Callbacks())
}

// Float64ObservableUpDownCounter returns a new instrument identified by name
Expand All @@ -218,14 +266,14 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
// measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableUpDownCounter,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.float64ObservableInstrument(id, cfg.Callbacks())
}

// Float64ObservableGauge returns a new instrument identified by name and
Expand All @@ -234,14 +282,14 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindObservableGauge,
Scope: m.scope,
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
return m.float64ObservableInstrument(id, cfg.Callbacks())
}

func validateInstrumentName(name string) error {
Expand Down Expand Up @@ -528,65 +576,19 @@ func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64Hist
return &float64Inst{measures: aggs}, err
}

type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) {
if inst.observable == nil || len(inst.measures) == 0 {
// Drop aggregator.
return
}

for _, cBack := range cBacks {
p.pipes.registerCallback(p.callback(inst, cBack))
}
}

func (p int64ObservProvider) callback(i int64Observable, f metric.Int64Callback) func(context.Context) error {
inst := int64Observer{int64Observable: i}
return func(ctx context.Context) error { return f(ctx, inst) }
}

type int64Observer struct {
embedded.Int64Observer
int64Observable
measures[int64]
}

func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
c := metric.NewObserveConfig(opts)
o.observe(val, c.Attributes())
}

type float64ObservProvider struct{ *meter }

func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) {
if inst.observable == nil || len(inst.measures) == 0 {
// Drop aggregator.
return
}

for _, cBack := range cBacks {
p.pipes.registerCallback(p.callback(inst, cBack))
}
}

func (p float64ObservProvider) callback(i float64Observable, f metric.Float64Callback) func(context.Context) error {
inst := float64Observer{float64Observable: i}
return func(ctx context.Context) error { return f(ctx, inst) }
}

type float64Observer struct {
embedded.Float64Observer
float64Observable
measures[float64]
}

func (o float64Observer) Observe(val float64, opts ...metric.ObserveOption) {
Expand Down
13 changes: 10 additions & 3 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,15 +1589,16 @@ func TestObservableExample(t *testing.T) {
)

selector := func(InstrumentKind) metricdata.Temporality { return temp }
reader := NewManualReader(WithTemporalitySelector(selector))
reader1 := NewManualReader(WithTemporalitySelector(selector))
reader2 := NewManualReader(WithTemporalitySelector(selector))

allowAll := attribute.NewDenyKeysFilter()
noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: allowAll})

filter := attribute.NewDenyKeysFilter("tid")
filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter})

mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered))
mp := NewMeterProvider(WithReader(reader1), WithReader(reader2), WithView(noFiltered, filtered))
meter := mp.Meter(scopeName)

observations := make(map[attribute.Set]int64)
Expand Down Expand Up @@ -1644,7 +1645,13 @@ func TestObservableExample(t *testing.T) {
collect := func(t *testing.T) {
t.Helper()
got := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &got)
err := reader1.Collect(context.Background(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())

got = metricdata.ResourceMetrics{}
err = reader2.Collect(context.Background(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())
Expand Down
6 changes: 0 additions & 6 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli
return pipes
}

func (p pipelines) registerCallback(cback func(context.Context) error) {
for _, pipe := range p {
pipe.addCallback(cback)
}
}

func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
unregs := make([]func(), len(p))
for i, pipe := range p {
Expand Down

0 comments on commit 6c4286a

Please sign in to comment.