Skip to content

Commit

Permalink
Fix incorrect metrics getting generated from multiple readers (#5900)
Browse files Browse the repository at this point in the history
Fixes #5866

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
  • Loading branch information
pree-dew and pellared authored Nov 8, 2024
1 parent d2b0663 commit 1492efa
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Fix `WithEndpointURL` to always use a secure connection when an https URL is passed in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#5944)
- Fix `WithEndpointURL` to always use a secure connection when an https URL is passed in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#5944)
- Fix `WithEndpointURL` to always use a secure connection when an https URL is passed in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5944)
- Fix incorrect metrics generated from callbacks when multiple readers are used in `go.opentelemetry.io/otel/sdk/metric`. (#5900)

### Changed

Expand Down
63 changes: 49 additions & 14 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
continue
}
inst.appendMeasures(in)

// Add the measures to the pipeline. It is required to maintain
// measures per pipeline to avoid calling the measure that
// is not part of the pipeline.
insert.pipeline.addInt64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
fn := cback
Expand Down Expand Up @@ -309,6 +314,11 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
continue
}
inst.appendMeasures(in)

// Add the measures to the pipeline. It is required to maintain
// measures per pipeline to avoid calling the measure that
// is not part of the pipeline.
insert.pipeline.addFloat64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
fn := cback
Expand Down Expand Up @@ -441,8 +451,8 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
return noopRegister{}, nil
}

reg := newObserver()
var err error
validInstruments := make([]metric.Observable, 0, len(insts))
for _, inst := range insts {
switch o := inst.(type) {
case int64Observable:
Expand All @@ -452,49 +462,64 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}
continue
}
reg.registerInt64(o.observableID)

validInstruments = append(validInstruments, inst)
case float64Observable:
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
reg.registerFloat64(o.observableID)

validInstruments = append(validInstruments, inst)
default:
// Instrument external to the SDK.
return nil, fmt.Errorf("invalid observable: from different implementation")
}
}

if reg.len() == 0 {
if len(validInstruments) == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
}

// Some or all instruments were valid.
cback := func(ctx context.Context) error { return f(ctx, reg) }
return m.pipes.registerMultiCallback(cback), err
unregs := make([]func(), len(m.pipes))
for ix, pipe := range m.pipes {
reg := newObserver(pipe)
for _, inst := range validInstruments {
switch o := inst.(type) {
case int64Observable:
reg.registerInt64(o.observableID)
case float64Observable:
reg.registerFloat64(o.observableID)
}
}

// Some or all instruments were valid.
cBack := func(ctx context.Context) error { return f(ctx, reg) }
unregs[ix] = pipe.addMultiCallback(cBack)
}

return unregisterFuncs{f: unregs}, err
}

type observer struct {
embedded.Observer

pipe *pipeline
float64 map[observableID[float64]]struct{}
int64 map[observableID[int64]]struct{}
}

func newObserver() observer {
func newObserver(p *pipeline) observer {
return observer{
pipe: p,
float64: make(map[observableID[float64]]struct{}),
int64: make(map[observableID[int64]]struct{}),
}
}

func (r observer) len() int {
return len(r.float64) + len(r.int64)
}

func (r observer) registerFloat64(id observableID[float64]) {
r.float64[id] = struct{}{}
}
Expand Down Expand Up @@ -530,7 +555,12 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
return
}
c := metric.NewObserveConfig(opts)
oImpl.observe(v, c.Attributes())
// Access to r.pipe.float64Measure is already guarded by a lock in pipeline.produce.
// TODO (#5946): Refactor pipeline and observable measures.
measures := r.pipe.float64Measures[oImpl.observableID]
for _, m := range measures {
m(context.Background(), v, c.Attributes())
}
}

func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric.ObserveOption) {
Expand All @@ -555,7 +585,12 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
return
}
c := metric.NewObserveConfig(opts)
oImpl.observe(v, c.Attributes())
// Access to r.pipe.int64Measures is already guarded b a lock in pipeline.produce.
// TODO (#5946): Refactor pipeline and observable measures.
measures := r.pipe.int64Measures[oImpl.observableID]
for _, m := range measures {
m(context.Background(), v, c.Attributes())
}
}

type noopRegister struct{ embedded.Registration }
Expand Down
43 changes: 26 additions & 17 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync/atomic"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
Expand Down Expand Up @@ -43,10 +42,12 @@ func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFi
res = resource.Empty()
}
return &pipeline{
resource: res,
reader: reader,
views: views,
exemplarFilter: exemplarFilter,
resource: res,
reader: reader,
views: views,
int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
exemplarFilter: exemplarFilter,
// aggregations is lazy allocated when needed.
}
}
Expand All @@ -64,10 +65,26 @@ type pipeline struct {
views []View

sync.Mutex
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
int64Measures map[observableID[int64]][]aggregate.Measure[int64]
float64Measures map[observableID[float64]][]aggregate.Measure[float64]
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
}

// addInt64Measure adds a new int64 measure to the pipeline for each observer.
func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) {
p.Lock()
defer p.Unlock()
p.int64Measures[id] = m
}

// addFloat64Measure adds a new float64 measure to the pipeline for each observer.
func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) {
p.Lock()
defer p.Unlock()
p.float64Measures[id] = m
}

// addSync adds the instrumentSync to pipeline p with scope. This method is not
Expand Down Expand Up @@ -574,14 +591,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View, exempl
return pipes
}

func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
unregs := make([]func(), len(p))
for i, pipe := range p {
unregs[i] = pipe.addMultiCallback(c)
}
return unregisterFuncs{f: unregs}
}

type unregisterFuncs struct {
embedded.Registration
f []func()
Expand Down
95 changes: 95 additions & 0 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"

"github.com/go-logr/logr"
Expand All @@ -24,6 +25,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -101,6 +103,21 @@ func TestPipelineConcurrentSafe(t *testing.T) {
defer wg.Done()
pipe.addMultiCallback(func(context.Context) error { return nil })
}()

wg.Add(1)
go func() {
defer wg.Done()
b := aggregate.Builder[int64]{
Temporality: metricdata.CumulativeTemporality,
ReservoirFunc: nil,
AggregationLimit: 0,
}
var oID observableID[int64]
m, _ := b.PrecomputedSum(false)
measures := []aggregate.Measure[int64]{}
measures = append(measures, m)
pipe.addInt64Measure(oID, measures)
}()
}
wg.Wait()
}
Expand Down Expand Up @@ -518,3 +535,81 @@ func TestExemplars(t *testing.T) {
check(t, r, 2, 2, 2)
})
}

func TestAddingAndObservingMeasureConcurrentSafe(t *testing.T) {
r1 := NewManualReader()
r2 := NewManualReader()

mp := NewMeterProvider(WithReader(r1), WithReader(r2))
m := mp.Meter("test")

oc1, err := m.Int64ObservableCounter("int64-observable-counter")
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
_, err := m.Int64ObservableCounter("int64-observable-counter-2")
require.NoError(t, err)
}()

wg.Add(1)
go func() {
defer wg.Done()
_, err := m.RegisterCallback(
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(oc1, 2)
return nil
}, oc1)
require.NoError(t, err)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = mp.pipes[0].produce(context.Background(), &metricdata.ResourceMetrics{})
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = mp.pipes[1].produce(context.Background(), &metricdata.ResourceMetrics{})
}()

wg.Wait()
}

func TestPipelineWithMultipleReaders(t *testing.T) {
r1 := NewManualReader()
r2 := NewManualReader()
mp := NewMeterProvider(WithReader(r1), WithReader(r2))
m := mp.Meter("test")
var val atomic.Int64
oc, err := m.Int64ObservableCounter("int64-observable-counter")
require.NoError(t, err)
reg, err := m.RegisterCallback(
// SDK calls this function when collecting data.
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(oc, val.Load())
return nil
}, oc)
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, reg.Unregister()) })
ctx := context.Background()
rm := new(metricdata.ResourceMetrics)
val.Add(1)
err = r1.Collect(ctx, rm)
require.NoError(t, err)
if assert.Len(t, rm.ScopeMetrics, 1) &&
assert.Len(t, rm.ScopeMetrics[0].Metrics, 1) {
assert.Equal(t, int64(1), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value)
}
val.Add(1)
err = r2.Collect(ctx, rm)
require.NoError(t, err)
if assert.Len(t, rm.ScopeMetrics, 1) &&
assert.Len(t, rm.ScopeMetrics[0].Metrics, 1) {
assert.Equal(t, int64(2), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value)
}
}

0 comments on commit 1492efa

Please sign in to comment.