Skip to content

Commit

Permalink
Use StateLocker in MinMaxSumCount (open-telemetry#546)
Browse files Browse the repository at this point in the history
* Add MinMaxSumCount stress test

* Reimplement MinMaxSumCount using StateLocker

* Address PR comments

* Round open-telemetry#2 of PR comments

Co-authored-by: Rahul Patel <rahulpa@google.com>
  • Loading branch information
2 people authored and MikeGoldsmith committed Mar 13, 2020
1 parent 18b0819 commit e6753e8
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 73 deletions.
14 changes: 14 additions & 0 deletions api/core/number.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ const (
Uint64NumberKind
)

// Zero returns a zero value for a given NumberKind
func (k NumberKind) Zero() Number {
switch k {
case Int64NumberKind:
return NewInt64Number(0)
case Float64NumberKind:
return NewFloat64Number(0.)
case Uint64NumberKind:
return NewUint64Number(0)
default:
return Number(0)
}
}

// Minimum returns the minimum representable value
// for a given NumberKind
func (k NumberKind) Minimum() Number {
Expand Down
152 changes: 86 additions & 66 deletions sdk/metric/aggregator/minmaxsumcount/mmsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import (
"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/internal"
)

type (
// Aggregator aggregates measure events, keeping only the max,
// sum, and count.
Aggregator struct {
// current has to be aligned for 64-bit atomic operations.
current state
// checkpoint has to be aligned for 64-bit atomic operations.
checkpoint state
kind core.NumberKind
// states has to be aligned for 64-bit atomic operations.
states [2]state
lock internal.StateLocker
kind core.NumberKind
}

state struct {
Expand All @@ -48,104 +48,116 @@ var _ aggregator.MinMaxSumCount = &Aggregator{}
// New returns a new measure aggregator for computing min, max, sum, and
// count. It does not compute quantile information other than Max.
//
// Note that this aggregator maintains each value using independent
// atomic operations, which introduces the possibility that
// checkpoints are inconsistent. For greater consistency and lower
// performance, consider using Array or DDSketch aggregators.
// This aggregator uses the StateLocker pattern to guarantee
// the count, sum, min and max are consistent within a checkpoint
func New(desc *export.Descriptor) *Aggregator {
kind := desc.NumberKind()
return &Aggregator{
kind: desc.NumberKind(),
current: unsetMinMaxSumCount(desc.NumberKind()),
kind: kind,
states: [2]state{
{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
},
}
}

func unsetMinMaxSumCount(kind core.NumberKind) state {
return state{min: kind.Maximum(), max: kind.Minimum()}
}

// Sum returns the sum of values in the checkpoint.
func (c *Aggregator) Sum() (core.Number, error) {
return c.checkpoint.sum, nil
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().sum, nil
}

// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
return int64(c.checkpoint.count.AsUint64()), nil
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().count.CoerceToInt64(core.Uint64NumberKind), nil
}

// Min returns the minimum value in the checkpoint.
// The error value aggregator.ErrEmptyDataSet will be returned if
// (due to a race condition) the checkpoint was set prior to
// current.min being computed in Update().
//
// Note: If a measure's recorded values for a given checkpoint are
// all equal to NumberKind.Maximum(), Min() will return ErrEmptyDataSet
// The error value aggregator.ErrEmptyDataSet will be returned
// if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Min() (core.Number, error) {
if c.checkpoint.min == c.kind.Maximum() {
return core.Number(0), aggregator.ErrEmptyDataSet
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrEmptyDataSet
}
return c.checkpoint.min, nil
return c.checkpoint().min, nil
}

// Max returns the maximum value in the checkpoint.
// The error value aggregator.ErrEmptyDataSet will be returned if
// (due to a race condition) the checkpoint was set prior to
// current.max being computed in Update().
//
// Note: If a measure's recorded values for a given checkpoint are
// all equal to NumberKind.Minimum(), Max() will return ErrEmptyDataSet
// The error value aggregator.ErrEmptyDataSet will be returned
// if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Max() (core.Number, error) {
if c.checkpoint.max == c.kind.Minimum() {
return core.Number(0), aggregator.ErrEmptyDataSet
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrEmptyDataSet
}
return c.checkpoint.max, nil
return c.checkpoint().max, nil
}

// Checkpoint saves the current state and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Min, Max, Sum, and Count are not consistent with each
// other.
// the empty set.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
// N.B. There is no atomic operation that can update all three
// values at once without a memory allocation.
//
// This aggregator is intended to trade this correctness for
// speed.
//
// Therefore, atomically swap fields independently, knowing
// that individually the three parts of this aggregation could
// be spread across multiple collections in rare cases.

c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
c.checkpoint.max = c.current.max.SwapNumberAtomic(c.kind.Minimum())
c.checkpoint.min = c.current.min.SwapNumberAtomic(c.kind.Maximum())
c.lock.SwapActiveState(c.resetCheckpoint)
}

// checkpoint returns the "cold" state, i.e. state collected prior to the
// most recent Checkpoint() call
func (c *Aggregator) checkpoint() *state {
return &c.states[c.lock.ColdIdx()]
}

func (c *Aggregator) resetCheckpoint() {
checkpoint := c.checkpoint()

checkpoint.count.SetUint64(0)
checkpoint.sum.SetNumber(c.kind.Zero())
checkpoint.min.SetNumber(c.kind.Maximum())
checkpoint.max.SetNumber(c.kind.Minimum())
}

// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
kind := desc.NumberKind()

c.current.count.AddUint64Atomic(1)
c.current.sum.AddNumberAtomic(kind, number)
cIdx := c.lock.Start()
defer c.lock.End(cIdx)

current := &c.states[cIdx]
current.count.AddUint64Atomic(1)
current.sum.AddNumberAtomic(kind, number)

for {
current := c.current.min.AsNumberAtomic()
cmin := current.min.AsNumberAtomic()

if number.CompareNumber(kind, current) >= 0 {
if number.CompareNumber(kind, cmin) >= 0 {
break
}
if c.current.min.CompareAndSwapNumber(current, number) {
if current.min.CompareAndSwapNumber(cmin, number) {
break
}
}
for {
current := c.current.max.AsNumberAtomic()
cmax := current.max.AsNumberAtomic()

if number.CompareNumber(kind, current) <= 0 {
if number.CompareNumber(kind, cmax) <= 0 {
break
}
if c.current.max.CompareAndSwapNumber(current, number) {
if current.max.CompareAndSwapNumber(cmax, number) {
break
}
}
Expand All @@ -159,14 +171,22 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error
return aggregator.NewInconsistentMergeError(c, oa)
}

c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
// Lock() synchronizes Merge() and Checkpoint() to ensure all operations of
// Merge() are performed on the same state.
c.lock.Lock()
defer c.lock.Unlock()

current := c.checkpoint()
ocheckpoint := o.checkpoint()

current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count)
current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum)

if c.checkpoint.min.CompareNumber(desc.NumberKind(), o.checkpoint.min) > 0 {
c.checkpoint.min.SetNumber(o.checkpoint.min)
if current.min.CompareNumber(desc.NumberKind(), ocheckpoint.min) > 0 {
current.min.SetNumber(ocheckpoint.min)
}
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
c.checkpoint.max.SetNumber(o.checkpoint.max)
if current.max.CompareNumber(desc.NumberKind(), ocheckpoint.max) < 0 {
current.max.SetNumber(ocheckpoint.max)
}
return nil
}
8 changes: 2 additions & 6 deletions sdk/metric/aggregator/minmaxsumcount/mmsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,8 @@ var (
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "Aggregator.current",
Offset: unsafe.Offsetof(Aggregator{}.current),
},
{
Name: "Aggregator.checkpoint",
Offset: unsafe.Offsetof(Aggregator{}.checkpoint),
Name: "Aggregator.states",
Offset: unsafe.Offsetof(Aggregator{}.states),
},
{
Name: "state.count",
Expand Down
83 changes: 83 additions & 0 deletions sdk/metric/minmaxsumcount_stress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This test is too large for the race detector. This SDK uses no locks
// that the race detector would help with, anyway.
// +build !race

package metric_test

import (
"context"
"math/rand"
"testing"
"time"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
)

func TestStressInt64MinMaxSumCount(t *testing.T) {
desc := metric.NewDescriptor("some_metric", metric.MeasureKind, nil, "", "", core.Int64NumberKind)
mmsc := minmaxsumcount.New(desc)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
rnd := rand.New(rand.NewSource(time.Now().Unix()))
v := rnd.Int63() % 103
for {
select {
case <-ctx.Done():
return
default:
_ = mmsc.Update(ctx, core.NewInt64Number(v), desc)
}
v++
}
}()

startTime := time.Now()
for time.Since(startTime) < time.Second {
mmsc.Checkpoint(context.Background(), desc)

s, _ := mmsc.Sum()
c, _ := mmsc.Count()
min, e1 := mmsc.Min()
max, e2 := mmsc.Max()
if c == 0 && (e1 == nil || e2 == nil || s.AsInt64() != 0) {
t.Fail()
}
if c != 0 {
if e1 != nil || e2 != nil {
t.Fail()
}
lo, hi, sum := min.AsInt64(), max.AsInt64(), s.AsInt64()

if hi-lo+1 != c {
t.Fail()
}
if c == 1 {
if lo != hi || lo != sum {
t.Fail()
}
} else {
if hi*(hi+1)/2-(lo-1)*lo/2 != sum {
t.Fail()
}
}
}
}
}
2 changes: 1 addition & 1 deletion sdk/metric/stress_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019, OpenTelemetry Authors
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down

0 comments on commit e6753e8

Please sign in to comment.