From eb28005e2fad10fe050b777a585b64be8cf593cb Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 10 Dec 2020 18:13:08 -0800 Subject: [PATCH] Metric Accumulator fix for SumObservers (#1381) * Let SynchronizedMove(nil) reset and discard * Add common test for SynchronizedMove(nil) * End-to-end test for the Processor and SumObserver * Implement SynchronizedMove(nil) six ways * Lint * Changelog * Test no reset for wrong aggregator type; Fix four Aggregators * Cleanup * imports Co-authored-by: Tyler Yahn --- CHANGELOG.md | 4 + sdk/export/metric/metric.go | 3 + sdk/metric/aggregator/aggregatortest/test.go | 118 ++++++++++++++++++ sdk/metric/aggregator/array/array.go | 15 ++- sdk/metric/aggregator/array/array_test.go | 11 ++ sdk/metric/aggregator/ddsketch/ddsketch.go | 10 +- .../aggregator/ddsketch/ddsketch_test.go | 11 ++ sdk/metric/aggregator/histogram/histogram.go | 9 +- .../aggregator/histogram/histogram_test.go | 11 ++ sdk/metric/aggregator/lastvalue/lastvalue.go | 4 + .../aggregator/lastvalue/lastvalue_test.go | 10 ++ sdk/metric/aggregator/minmaxsumcount/mmsc.go | 12 +- .../aggregator/minmaxsumcount/mmsc_test.go | 11 ++ sdk/metric/aggregator/sum/sum.go | 4 + sdk/metric/aggregator/sum/sum_test.go | 11 ++ sdk/metric/correct_test.go | 89 ++++++------- sdk/metric/processor/basic/basic_test.go | 54 ++++++++ sdk/metric/sdk.go | 10 +- 18 files changed, 333 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e2ea008c1f..82514e2b00c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `NewExporter` and `Start` functions in `go.opentelemetry.io/otel/exporters/otlp` now receive `context.Context` as a first parameter. (#1357) - Zipkin exporter relies on the status code for success rather than body read but still read the response body. (#1328) +### Fixed + +- Metric SDK `SumObserver` and `UpDownSumObserver` instruments correctness fixes. (#1381) + ## [0.14.0] - 2020-11-19 ### Added diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 6a089601968..525fe8f3a63 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -174,6 +174,9 @@ type Aggregator interface { // // This call has no Context argument because it is expected to // perform only computation. + // + // When called with a nil `destination`, this Aggregator is reset + // and the current value is discarded. SynchronizedMove(destination Aggregator, descriptor *metric.Descriptor) error // Merge combines the checkpointed state from the argument diff --git a/sdk/metric/aggregator/aggregatortest/test.go b/sdk/metric/aggregator/aggregatortest/test.go index 1b12032f07f..3a965551256 100644 --- a/sdk/metric/aggregator/aggregatortest/test.go +++ b/sdk/metric/aggregator/aggregatortest/test.go @@ -16,16 +16,20 @@ package aggregatortest // import "go.opentelemetry.io/otel/sdk/metric/aggregator import ( "context" + "errors" "math/rand" "os" "sort" "testing" "unsafe" + "github.com/stretchr/testify/require" + ottest "go.opentelemetry.io/otel/internal/testing" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator" ) @@ -36,6 +40,12 @@ type Profile struct { Random func(sign int) number.Number } +type NoopAggregator struct{} +type NoopAggregation struct{} + +var _ export.Aggregator = NoopAggregator{} +var _ aggregation.Aggregation = NoopAggregation{} + func newProfiles() []Profile { rnd := rand.New(rand.NewSource(rand.Int63())) return []Profile{ @@ -172,3 +182,111 @@ func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor * t.Error("Unexpected Merge failure", err) } } + +func (NoopAggregation) Kind() aggregation.Kind { + return aggregation.Kind("Noop") +} + +func (NoopAggregator) Aggregation() aggregation.Aggregation { + return NoopAggregation{} +} + +func (NoopAggregator) Update(context.Context, number.Number, *metric.Descriptor) error { + return nil +} + +func (NoopAggregator) SynchronizedMove(export.Aggregator, *metric.Descriptor) error { + return nil +} + +func (NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error { + return nil +} + +func SynchronizedMoveResetTest(t *testing.T, mkind metric.InstrumentKind, nf func(*metric.Descriptor) export.Aggregator) { + t.Run("reset on nil", func(t *testing.T) { + // Ensures that SynchronizedMove(nil, descriptor) discards and + // resets the aggregator. + RunProfiles(t, func(t *testing.T, profile Profile) { + descriptor := NewAggregatorTest( + mkind, + profile.NumberKind, + ) + agg := nf(descriptor) + + for i := 0; i < 10; i++ { + x1 := profile.Random(+1) + CheckedUpdate(t, agg, x1, descriptor) + } + + require.NoError(t, agg.SynchronizedMove(nil, descriptor)) + + if count, ok := agg.(aggregation.Count); ok { + c, err := count.Count() + require.Equal(t, int64(0), c) + require.NoError(t, err) + } + + if sum, ok := agg.(aggregation.Sum); ok { + s, err := sum.Sum() + require.Equal(t, number.Number(0), s) + require.NoError(t, err) + } + + if lv, ok := agg.(aggregation.LastValue); ok { + v, _, err := lv.LastValue() + require.Equal(t, number.Number(0), v) + require.Error(t, err) + require.True(t, errors.Is(err, aggregation.ErrNoData)) + } + }) + }) + + t.Run("no reset on incorrect type", func(t *testing.T) { + // Ensures that SynchronizedMove(wrong_type, descriptor) does not + // reset the aggregator. + RunProfiles(t, func(t *testing.T, profile Profile) { + descriptor := NewAggregatorTest( + mkind, + profile.NumberKind, + ) + agg := nf(descriptor) + + var input number.Number + const inval = 100 + if profile.NumberKind == number.Int64Kind { + input = number.NewInt64Number(inval) + } else { + input = number.NewFloat64Number(inval) + } + + CheckedUpdate(t, agg, input, descriptor) + + err := agg.SynchronizedMove(NoopAggregator{}, descriptor) + require.Error(t, err) + require.True(t, errors.Is(err, aggregation.ErrInconsistentType)) + + // Test that the aggregator was not reset + + if count, ok := agg.(aggregation.Count); ok { + c, err := count.Count() + require.Equal(t, int64(1), c) + require.NoError(t, err) + } + + if sum, ok := agg.(aggregation.Sum); ok { + s, err := sum.Sum() + require.Equal(t, input, s) + require.NoError(t, err) + } + + if lv, ok := agg.(aggregation.LastValue); ok { + v, _, err := lv.LastValue() + require.Equal(t, input, v) + require.NoError(t, err) + } + + }) + }) + +} diff --git a/sdk/metric/aggregator/array/array.go b/sdk/metric/aggregator/array/array.go index 5346be36b0e..1ac1d82a5f1 100644 --- a/sdk/metric/aggregator/array/array.go +++ b/sdk/metric/aggregator/array/array.go @@ -97,20 +97,27 @@ func (c *Aggregator) Points() ([]number.Number, error) { // the empty set, taking a lock to prevent concurrent Update() calls. func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) - if o == nil { + + if oa != nil && o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) } c.lock.Lock() - o.points, c.points = c.points, nil - o.sum, c.sum = c.sum, 0 + if o != nil { + o.points = c.points + o.sum = c.sum + } + c.points = nil + c.sum = 0 c.lock.Unlock() // TODO: This sort should be done lazily, only when quantiles // are requested. The SDK specification says you can use this // aggregator to simply list values in the order they were // received as an alternative to requesting quantile information. - o.sort(desc.NumberKind()) + if o != nil { + o.sort(desc.NumberKind()) + } return nil } diff --git a/sdk/metric/aggregator/array/array_test.go b/sdk/metric/aggregator/array/array_test.go index db32d9d603c..e9ba40b7001 100644 --- a/sdk/metric/aggregator/array/array_test.go +++ b/sdk/metric/aggregator/array/array_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" ) @@ -329,3 +330,13 @@ func TestArrayFloat64(t *testing.T) { require.Equal(t, all.Points()[i], po[i], "Wrong point at position %d", i) } } + +func TestSynchronizedMoveReset(t *testing.T) { + aggregatortest.SynchronizedMoveResetTest( + t, + metric.ValueRecorderInstrumentKind, + func(desc *metric.Descriptor) export.Aggregator { + return &New(1)[0] + }, + ) +} diff --git a/sdk/metric/aggregator/ddsketch/ddsketch.go b/sdk/metric/aggregator/ddsketch/ddsketch.go index 1e844f570fe..cbd97a776cf 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch.go @@ -117,13 +117,17 @@ func (c *Aggregator) toNumber(f float64) number.Number { // a new sketch, taking a lock to prevent concurrent Update() calls. func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error { o, _ := oa.(*Aggregator) - if o == nil { + + if oa != nil && o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) } - replace := sdk.NewDDSketch(c.cfg) + replace := sdk.NewDDSketch(c.cfg) c.lock.Lock() - o.sketch, c.sketch = c.sketch, replace + if o != nil { + o.sketch = c.sketch + } + c.sketch = replace c.lock.Unlock() return nil diff --git a/sdk/metric/aggregator/ddsketch/ddsketch_test.go b/sdk/metric/aggregator/ddsketch/ddsketch_test.go index 63bcbab4675..d05ab7b1ad8 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch_test.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" ) @@ -208,3 +209,13 @@ func TestDDSketchMerge(t *testing.T) { }) } } + +func TestSynchronizedMoveReset(t *testing.T) { + aggregatortest.SynchronizedMoveResetTest( + t, + metric.ValueRecorderInstrumentKind, + func(desc *metric.Descriptor) export.Aggregator { + return &New(1, desc, NewDefaultConfig())[0] + }, + ) +} diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 733becd09c0..eb41fb012b6 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -118,13 +118,18 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) { // other. func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) - if o == nil { + + if oa != nil && o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) } c.lock.Lock() - o.state, c.state = c.state, emptyState(c.boundaries) + if o != nil { + o.state = c.state + } + c.state = emptyState(c.boundaries) c.lock.Unlock() + return nil } diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index c7468149ddd..e1a096f46d1 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" ) @@ -249,3 +250,13 @@ func calcBuckets(points []number.Number, profile aggregatortest.Profile) []uint6 return counts } + +func TestSynchronizedMoveReset(t *testing.T) { + aggregatortest.SynchronizedMoveResetTest( + t, + metric.ValueRecorderInstrumentKind, + func(desc *metric.Descriptor) export.Aggregator { + return &histogram.New(1, desc, boundaries)[0] + }, + ) +} diff --git a/sdk/metric/aggregator/lastvalue/lastvalue.go b/sdk/metric/aggregator/lastvalue/lastvalue.go index e220d88edfb..3cc5f7055cf 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue.go @@ -93,6 +93,10 @@ func (g *Aggregator) LastValue() (number.Number, time.Time, error) { // SynchronizedMove atomically saves the current value. func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error { + if oa == nil { + atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue)) + return nil + } o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(g, oa) diff --git a/sdk/metric/aggregator/lastvalue/lastvalue_test.go b/sdk/metric/aggregator/lastvalue/lastvalue_test.go index 62438c36619..3aad095e7b3 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue_test.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue_test.go @@ -132,3 +132,13 @@ func TestLastValueNotSet(t *testing.T) { checkZero(t, g) } + +func TestSynchronizedMoveReset(t *testing.T) { + aggregatortest.SynchronizedMoveResetTest( + t, + metric.ValueObserverInstrumentKind, + func(desc *metric.Descriptor) export.Aggregator { + return &New(1)[0] + }, + ) +} diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc.go b/sdk/metric/aggregator/minmaxsumcount/mmsc.go index 5095e594452..66a888a59d0 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc.go @@ -106,15 +106,15 @@ func (c *Aggregator) Max() (number.Number, error) { // the empty set. func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) - if o == nil { + + if oa != nil && o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) } - - // TODO: It is incorrect to use an Aggregator of different - // kind. Should we test that o.kind == c.kind? (The same question - // occurs for several of the other aggregators in ../*.) c.lock.Lock() - o.state, c.state = c.state, emptyState(c.kind) + if o != nil { + o.state = c.state + } + c.state = emptyState(c.kind) c.lock.Unlock() return nil diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go index 0df3e79ddef..0fb2be99146 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" ) @@ -235,3 +236,13 @@ func TestMaxSumCountNotSet(t *testing.T) { require.Equal(t, number.Number(0), max) }) } + +func TestSynchronizedMoveReset(t *testing.T) { + aggregatortest.SynchronizedMoveResetTest( + t, + metric.ValueRecorderInstrumentKind, + func(desc *metric.Descriptor) export.Aggregator { + return &New(1, desc)[0] + }, + ) +} diff --git a/sdk/metric/aggregator/sum/sum.go b/sdk/metric/aggregator/sum/sum.go index 0c559acd2ba..fc96ddb4cba 100644 --- a/sdk/metric/aggregator/sum/sum.go +++ b/sdk/metric/aggregator/sum/sum.go @@ -61,6 +61,10 @@ func (c *Aggregator) Sum() (number.Number, error) { // SynchronizedMove atomically saves the current value into oa and resets the // current sum to zero. func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error { + if oa == nil { + c.value.SetRawAtomic(0) + return nil + } o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) diff --git a/sdk/metric/aggregator/sum/sum_test.go b/sdk/metric/aggregator/sum/sum_test.go index 9a9a14f2069..2b30c6a637c 100644 --- a/sdk/metric/aggregator/sum/sum_test.go +++ b/sdk/metric/aggregator/sum/sum_test.go @@ -24,6 +24,7 @@ import ( ottest "go.opentelemetry.io/otel/internal/testing" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" ) @@ -141,3 +142,13 @@ func TestCounterMerge(t *testing.T) { require.Nil(t, err) }) } + +func TestSynchronizedMoveReset(t *testing.T) { + aggregatortest.SynchronizedMoveResetTest( + t, + metric.SumObserverInstrumentKind, + func(desc *metric.Descriptor) export.Aggregator { + return &New(1)[0] + }, + ) +} diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 0efbf31bc55..5ccba31af82 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -300,74 +300,79 @@ func TestDefaultLabelEncoder(t *testing.T) { func TestObserverCollection(t *testing.T) { ctx := context.Background() meter, sdk, processor := newSDK(t) + mult := 1 _ = Must(meter).NewFloat64ValueObserver("float.valueobserver.lastvalue", func(_ context.Context, result metric.Float64ObserverResult) { - result.Observe(1, label.String("A", "B")) + result.Observe(float64(mult), label.String("A", "B")) // last value wins - result.Observe(-1, label.String("A", "B")) - result.Observe(-1, label.String("C", "D")) + result.Observe(float64(-mult), label.String("A", "B")) + result.Observe(float64(-mult), label.String("C", "D")) }) _ = Must(meter).NewInt64ValueObserver("int.valueobserver.lastvalue", func(_ context.Context, result metric.Int64ObserverResult) { - result.Observe(-1, label.String("A", "B")) - result.Observe(1) + result.Observe(int64(-mult), label.String("A", "B")) + result.Observe(int64(mult)) // last value wins - result.Observe(1, label.String("A", "B")) - result.Observe(1) + result.Observe(int64(mult), label.String("A", "B")) + result.Observe(int64(mult)) }) _ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) { - result.Observe(1, label.String("A", "B")) - result.Observe(2, label.String("A", "B")) - result.Observe(1, label.String("C", "D")) + result.Observe(float64(mult), label.String("A", "B")) + result.Observe(float64(2*mult), label.String("A", "B")) + result.Observe(float64(mult), label.String("C", "D")) }) _ = Must(meter).NewInt64SumObserver("int.sumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { - result.Observe(2, label.String("A", "B")) - result.Observe(1) + result.Observe(int64(2*mult), label.String("A", "B")) + result.Observe(int64(mult)) // last value wins - result.Observe(1, label.String("A", "B")) - result.Observe(1) + result.Observe(int64(mult), label.String("A", "B")) + result.Observe(int64(mult)) }) _ = Must(meter).NewFloat64UpDownSumObserver("float.updownsumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) { - result.Observe(1, label.String("A", "B")) - result.Observe(-2, label.String("A", "B")) - result.Observe(1, label.String("C", "D")) + result.Observe(float64(mult), label.String("A", "B")) + result.Observe(float64(-2*mult), label.String("A", "B")) + result.Observe(float64(mult), label.String("C", "D")) }) _ = Must(meter).NewInt64UpDownSumObserver("int.updownsumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { - result.Observe(2, label.String("A", "B")) - result.Observe(1) + result.Observe(int64(2*mult), label.String("A", "B")) + result.Observe(int64(mult)) // last value wins - result.Observe(1, label.String("A", "B")) - result.Observe(-1) + result.Observe(int64(mult), label.String("A", "B")) + result.Observe(int64(-mult)) }) _ = Must(meter).NewInt64ValueObserver("empty.valueobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) { }) - collected := sdk.Collect(ctx) + for mult = 0; mult < 3; mult++ { + processor.accumulations = nil - require.Equal(t, collected, len(processor.accumulations)) + collected := sdk.Collect(ctx) + require.Equal(t, collected, len(processor.accumulations)) - out := processortest.NewOutput(label.DefaultEncoder()) - for _, rec := range processor.accumulations { - require.NoError(t, out.AddAccumulation(rec)) + out := processortest.NewOutput(label.DefaultEncoder()) + for _, rec := range processor.accumulations { + require.NoError(t, out.AddAccumulation(rec)) + } + mult := float64(mult) + require.EqualValues(t, map[string]float64{ + "float.valueobserver.lastvalue/A=B/R=V": -mult, + "float.valueobserver.lastvalue/C=D/R=V": -mult, + "int.valueobserver.lastvalue//R=V": mult, + "int.valueobserver.lastvalue/A=B/R=V": mult, + + "float.sumobserver.sum/A=B/R=V": 2 * mult, + "float.sumobserver.sum/C=D/R=V": mult, + "int.sumobserver.sum//R=V": mult, + "int.sumobserver.sum/A=B/R=V": mult, + + "float.updownsumobserver.sum/A=B/R=V": -2 * mult, + "float.updownsumobserver.sum/C=D/R=V": mult, + "int.updownsumobserver.sum//R=V": -mult, + "int.updownsumobserver.sum/A=B/R=V": mult, + }, out.Map()) } - require.EqualValues(t, map[string]float64{ - "float.valueobserver.lastvalue/A=B/R=V": -1, - "float.valueobserver.lastvalue/C=D/R=V": -1, - "int.valueobserver.lastvalue//R=V": 1, - "int.valueobserver.lastvalue/A=B/R=V": 1, - - "float.sumobserver.sum/A=B/R=V": 2, - "float.sumobserver.sum/C=D/R=V": 1, - "int.sumobserver.sum//R=V": 1, - "int.sumobserver.sum/A=B/R=V": 1, - - "float.updownsumobserver.sum/A=B/R=V": -2, - "float.updownsumobserver.sum/C=D/R=V": 1, - "int.updownsumobserver.sum//R=V": -1, - "int.updownsumobserver.sum/A=B/R=V": 1, - }, out.Map()) } func TestSumObserverInputRange(t *testing.T) { diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index a97745c5f60..016de9677bd 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -30,7 +30,9 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/metrictest" + sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/processor/processortest" processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest" "go.opentelemetry.io/otel/sdk/resource" ) @@ -464,3 +466,55 @@ func TestMultiObserverSum(t *testing.T) { } } } + +func TestSumObserverEndToEnd(t *testing.T) { + ctx := context.Background() + eselector := export.CumulativeExportKindSelector() + proc := basic.New( + processorTest.AggregatorSelector(), + eselector, + ) + accum := sdk.NewAccumulator(proc, resource.Empty()) + meter := metric.WrapMeterImpl(accum, "testing") + + var calls int64 + metric.Must(meter).NewInt64SumObserver("observer.sum", + func(_ context.Context, result metric.Int64ObserverResult) { + calls++ + result.Observe(calls) + }, + ) + data := proc.CheckpointSet() + + var startTime [3]time.Time + var endTime [3]time.Time + + for i := range startTime { + data.Lock() + proc.StartCollection() + accum.Collect(ctx) + require.NoError(t, proc.FinishCollection()) + + exporter := processortest.NewExporter(eselector, label.DefaultEncoder()) + require.NoError(t, exporter.Export(ctx, data)) + + require.EqualValues(t, map[string]float64{ + "observer.sum//": float64(i + 1), + }, exporter.Values()) + + var record export.Record + require.NoError(t, data.ForEach(eselector, func(r export.Record) error { + record = r + return nil + })) + + startTime[i] = record.StartTime() + endTime[i] = record.EndTime() + data.Unlock() + } + + require.Equal(t, startTime[0], startTime[1]) + require.Equal(t, startTime[0], startTime[2]) + require.True(t, endTime[0].Before(endTime[1])) + require.True(t, endTime[1].Before(endTime[2])) +} diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index a8dc1013cc8..c1f68b24fe7 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -182,13 +182,9 @@ func (a *asyncInstrument) observe(num number.Number, labels *label.Set) { func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator { lrec, ok := a.recorders[labels.Equivalent()] if ok { - if lrec.observedEpoch == a.meter.currentEpoch { - // last value wins for Observers, so if we see the same labels - // in the current epoch, we replace the old recorder - a.meter.processor.AggregatorFor(&a.descriptor, &lrec.observed) - } else { - lrec.observedEpoch = a.meter.currentEpoch - } + // Note: SynchronizedMove(nil) can't return an error + _ = lrec.observed.SynchronizedMove(nil, &a.descriptor) + lrec.observedEpoch = a.meter.currentEpoch a.recorders[labels.Equivalent()] = lrec return lrec.observed }