Skip to content

Commit

Permalink
Metric Accumulator fix for SumObservers (#1381)
Browse files Browse the repository at this point in the history
* Let SynchronizedMove(nil) reset and discard

* Add common test for SynchronizedMove(nil)

* End-to-end test for the Processor and SumObserver

* Implement SynchronizedMove(nil) six ways

* Lint

* Changelog

* Test no reset for wrong aggregator type; Fix four Aggregators

* Cleanup

* imports

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
jmacd and MrAlias authored Dec 11, 2020
1 parent 970755b commit eb28005
Show file tree
Hide file tree
Showing 18 changed files with 333 additions and 64 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `NewExporter` and `Start` functions in `go.opentelemetry.io/otel/exporters/otlp` now receive `context.Context` as a first parameter. (#1357)
- Zipkin exporter relies on the status code for success rather than body read but still read the response body. (#1328)

### Fixed

- Metric SDK `SumObserver` and `UpDownSumObserver` instruments correctness fixes. (#1381)

## [0.14.0] - 2020-11-19

### Added
Expand Down
3 changes: 3 additions & 0 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type Aggregator interface {
//
// This call has no Context argument because it is expected to
// perform only computation.
//
// When called with a nil `destination`, this Aggregator is reset
// and the current value is discarded.
SynchronizedMove(destination Aggregator, descriptor *metric.Descriptor) error

// Merge combines the checkpointed state from the argument
Expand Down
118 changes: 118 additions & 0 deletions sdk/metric/aggregator/aggregatortest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ package aggregatortest // import "go.opentelemetry.io/otel/sdk/metric/aggregator

import (
"context"
"errors"
"math/rand"
"os"
"sort"
"testing"
"unsafe"

"github.com/stretchr/testify/require"

ottest "go.opentelemetry.io/otel/internal/testing"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
)

Expand All @@ -36,6 +40,12 @@ type Profile struct {
Random func(sign int) number.Number
}

type NoopAggregator struct{}
type NoopAggregation struct{}

var _ export.Aggregator = NoopAggregator{}
var _ aggregation.Aggregation = NoopAggregation{}

func newProfiles() []Profile {
rnd := rand.New(rand.NewSource(rand.Int63()))
return []Profile{
Expand Down Expand Up @@ -172,3 +182,111 @@ func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *
t.Error("Unexpected Merge failure", err)
}
}

func (NoopAggregation) Kind() aggregation.Kind {
return aggregation.Kind("Noop")
}

func (NoopAggregator) Aggregation() aggregation.Aggregation {
return NoopAggregation{}
}

func (NoopAggregator) Update(context.Context, number.Number, *metric.Descriptor) error {
return nil
}

func (NoopAggregator) SynchronizedMove(export.Aggregator, *metric.Descriptor) error {
return nil
}

func (NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error {
return nil
}

func SynchronizedMoveResetTest(t *testing.T, mkind metric.InstrumentKind, nf func(*metric.Descriptor) export.Aggregator) {
t.Run("reset on nil", func(t *testing.T) {
// Ensures that SynchronizedMove(nil, descriptor) discards and
// resets the aggregator.
RunProfiles(t, func(t *testing.T, profile Profile) {
descriptor := NewAggregatorTest(
mkind,
profile.NumberKind,
)
agg := nf(descriptor)

for i := 0; i < 10; i++ {
x1 := profile.Random(+1)
CheckedUpdate(t, agg, x1, descriptor)
}

require.NoError(t, agg.SynchronizedMove(nil, descriptor))

if count, ok := agg.(aggregation.Count); ok {
c, err := count.Count()
require.Equal(t, int64(0), c)
require.NoError(t, err)
}

if sum, ok := agg.(aggregation.Sum); ok {
s, err := sum.Sum()
require.Equal(t, number.Number(0), s)
require.NoError(t, err)
}

if lv, ok := agg.(aggregation.LastValue); ok {
v, _, err := lv.LastValue()
require.Equal(t, number.Number(0), v)
require.Error(t, err)
require.True(t, errors.Is(err, aggregation.ErrNoData))
}
})
})

t.Run("no reset on incorrect type", func(t *testing.T) {
// Ensures that SynchronizedMove(wrong_type, descriptor) does not
// reset the aggregator.
RunProfiles(t, func(t *testing.T, profile Profile) {
descriptor := NewAggregatorTest(
mkind,
profile.NumberKind,
)
agg := nf(descriptor)

var input number.Number
const inval = 100
if profile.NumberKind == number.Int64Kind {
input = number.NewInt64Number(inval)
} else {
input = number.NewFloat64Number(inval)
}

CheckedUpdate(t, agg, input, descriptor)

err := agg.SynchronizedMove(NoopAggregator{}, descriptor)
require.Error(t, err)
require.True(t, errors.Is(err, aggregation.ErrInconsistentType))

// Test that the aggregator was not reset

if count, ok := agg.(aggregation.Count); ok {
c, err := count.Count()
require.Equal(t, int64(1), c)
require.NoError(t, err)
}

if sum, ok := agg.(aggregation.Sum); ok {
s, err := sum.Sum()
require.Equal(t, input, s)
require.NoError(t, err)
}

if lv, ok := agg.(aggregation.LastValue); ok {
v, _, err := lv.LastValue()
require.Equal(t, input, v)
require.NoError(t, err)
}

})
})

}
15 changes: 11 additions & 4 deletions sdk/metric/aggregator/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,27 @@ func (c *Aggregator) Points() ([]number.Number, error) {
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {

if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}

c.lock.Lock()
o.points, c.points = c.points, nil
o.sum, c.sum = c.sum, 0
if o != nil {
o.points = c.points
o.sum = c.sum
}
c.points = nil
c.sum = 0
c.lock.Unlock()

// TODO: This sort should be done lazily, only when quantiles
// are requested. The SDK specification says you can use this
// aggregator to simply list values in the order they were
// received as an alternative to requesting quantile information.
o.sort(desc.NumberKind())
if o != nil {
o.sort(desc.NumberKind())
}
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/aggregator/array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
)
Expand Down Expand Up @@ -329,3 +330,13 @@ func TestArrayFloat64(t *testing.T) {
require.Equal(t, all.Points()[i], po[i], "Wrong point at position %d", i)
}
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueRecorderInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &New(1)[0]
},
)
}
10 changes: 7 additions & 3 deletions sdk/metric/aggregator/ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,17 @@ func (c *Aggregator) toNumber(f float64) number.Number {
// a new sketch, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {

if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
replace := sdk.NewDDSketch(c.cfg)

replace := sdk.NewDDSketch(c.cfg)
c.lock.Lock()
o.sketch, c.sketch = c.sketch, replace
if o != nil {
o.sketch = c.sketch
}
c.sketch = replace
c.lock.Unlock()

return nil
Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/aggregator/ddsketch/ddsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
)
Expand Down Expand Up @@ -208,3 +209,13 @@ func TestDDSketchMerge(t *testing.T) {
})
}
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueRecorderInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &New(1, desc, NewDefaultConfig())[0]
},
)
}
9 changes: 7 additions & 2 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,18 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
// other.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {

if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}

c.lock.Lock()
o.state, c.state = c.state, emptyState(c.boundaries)
if o != nil {
o.state = c.state
}
c.state = emptyState(c.boundaries)
c.lock.Unlock()

return nil
}

Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
)
Expand Down Expand Up @@ -249,3 +250,13 @@ func calcBuckets(points []number.Number, profile aggregatortest.Profile) []uint6

return counts
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueRecorderInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &histogram.New(1, desc, boundaries)[0]
},
)
}
4 changes: 4 additions & 0 deletions sdk/metric/aggregator/lastvalue/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (g *Aggregator) LastValue() (number.Number, time.Time, error) {

// SynchronizedMove atomically saves the current value.
func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
if oa == nil {
atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue))
return nil
}
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa)
Expand Down
10 changes: 10 additions & 0 deletions sdk/metric/aggregator/lastvalue/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,13 @@ func TestLastValueNotSet(t *testing.T) {

checkZero(t, g)
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueObserverInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &New(1)[0]
},
)
}
12 changes: 6 additions & 6 deletions sdk/metric/aggregator/minmaxsumcount/mmsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ func (c *Aggregator) Max() (number.Number, error) {
// the empty set.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {

if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}

// TODO: It is incorrect to use an Aggregator of different
// kind. Should we test that o.kind == c.kind? (The same question
// occurs for several of the other aggregators in ../*.)
c.lock.Lock()
o.state, c.state = c.state, emptyState(c.kind)
if o != nil {
o.state = c.state
}
c.state = emptyState(c.kind)
c.lock.Unlock()

return nil
Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/aggregator/minmaxsumcount/mmsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
)
Expand Down Expand Up @@ -235,3 +236,13 @@ func TestMaxSumCountNotSet(t *testing.T) {
require.Equal(t, number.Number(0), max)
})
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueRecorderInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &New(1, desc)[0]
},
)
}
4 changes: 4 additions & 0 deletions sdk/metric/aggregator/sum/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (c *Aggregator) Sum() (number.Number, error) {
// SynchronizedMove atomically saves the current value into oa and resets the
// current sum to zero.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
if oa == nil {
c.value.SetRawAtomic(0)
return nil
}
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
Loading

0 comments on commit eb28005

Please sign in to comment.