Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric histogram aggregator: Swap in SynchronizedMove to avoid allocations #1435

Merged
merged 7 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369)
- Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369)
- Unify endpoint API that related to OTel exporter. (#1401)
- Optimize metric histogram aggregator to re-use its slice of buckets. (#1435)
- Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430)
- `SamplingResult` now passed a `Tracestate` from the parent `SpanContext` (#1432)
- Moved gRPC driver for OTLP exporter to `exporters/otlp/otlpgrpc`. (#1420)
Expand Down
36 changes: 29 additions & 7 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type (
lock sync.Mutex
boundaries []float64
kind number.Kind
state state
state *state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State has only 40B, not sure you need to use pointer here.

}

// state represents the state of a histogram, consisting of
Expand Down Expand Up @@ -78,8 +78,8 @@ func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator {
aggs[i] = Aggregator{
kind: desc.NumberKind(),
boundaries: sortedBoundaries,
state: emptyState(sortedBoundaries),
}
aggs[i].state = aggs[i].newState()
}
return aggs
}
Expand Down Expand Up @@ -123,20 +123,42 @@ func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descrip
return aggregator.NewInconsistentAggregatorError(c, oa)
}

if o != nil {
// Swap case: This is the ordinary case for a
// synchronous instrument, where the SDK allocates two
// Aggregators and lock contention is anticipated.
// Reset the target state before swapping it under the
// lock below.
o.clearState()
}

c.lock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the SynchronizedMove function comment still true? "Since no locks are taken, there is a chance that the independent Sum, Count and Bucket Count are not consistent with each other."

if o != nil {
o.state = c.state
c.state, o.state = o.state, c.state
} else {
// No swap case: This is the ordinary case for an
// asynchronous instrument, where the SDK allocates a
// single Aggregator and there is no anticipated lock
// contention.
c.clearState()
}
c.state = emptyState(c.boundaries)
c.lock.Unlock()

return nil
}

func emptyState(boundaries []float64) state {
return state{
bucketCounts: make([]uint64, len(boundaries)+1),
func (c *Aggregator) newState() *state {
return &state{
bucketCounts: make([]uint64, len(c.boundaries)+1),
}
}

func (c *Aggregator) clearState() {
for i := range c.state.bucketCounts {
c.state.bucketCounts[i] = 0
}
c.state.sum = 0
c.state.count = 0
}

// Update adds the recorded measurement to the current data set.
Expand Down
108 changes: 46 additions & 62 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,42 +115,23 @@ func testHistogram(t *testing.T, profile aggregatortest.Profile, policy policy)

agg, ckpt := new2(descriptor)

all := aggregatortest.NewNumbers(profile.NumberKind)

for i := 0; i < count; i++ {
x := profile.Random(policy.sign())
all.Append(x)
aggregatortest.CheckedUpdate(t, agg, x, descriptor)
}

require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

checkZero(t, agg, descriptor)

all.Sort()

asum, err := ckpt.Sum()
sum := all.Sum()
require.InEpsilon(t,
sum.CoerceToFloat64(profile.NumberKind),
asum.CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - "+policy.name)
require.NoError(t, err)
// This needs to repeat at least 3 times to uncover a failure to reset
// for the overall sum and count fields, since the third time through
// is the first time a `histogram.state` object is reused.
for repeat := 0; repeat < 3; repeat++ {
all := aggregatortest.NewNumbers(profile.NumberKind)

count, err := ckpt.Count()
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
require.NoError(t, err)
for i := 0; i < count; i++ {
x := profile.Random(policy.sign())
all.Append(x)
aggregatortest.CheckedUpdate(t, agg, x, descriptor)
}

buckets, err := ckpt.Histogram()
require.NoError(t, err)
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))

require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
checkZero(t, agg, descriptor)

counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := uint64(buckets.Counts[i])
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
checkHistogram(t, all, profile, ckpt)
}
}

Expand Down Expand Up @@ -191,31 +172,7 @@ func TestHistogramMerge(t *testing.T) {

aggregatortest.CheckedMerge(t, ckpt1, ckpt2, descriptor)

all.Sort()

asum, err := ckpt1.Sum()
sum := all.Sum()
require.InEpsilon(t,
sum.CoerceToFloat64(profile.NumberKind),
asum.CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - absolute")
require.NoError(t, err)

count, err := ckpt1.Count()
require.Equal(t, all.Count(), count, "Same count - absolute")
require.NoError(t, err)

buckets, err := ckpt1.Histogram()
require.NoError(t, err)

require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")

counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := uint64(buckets.Counts[i])
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
}
checkHistogram(t, all, profile, ckpt1)
})
}

Expand All @@ -233,22 +190,49 @@ func TestHistogramNotSet(t *testing.T) {
})
}

func calcBuckets(points []number.Number, profile aggregatortest.Profile) []uint64 {
sortedBoundaries := make([]float64, len(boundaries))
// checkHistogram ensures the correct aggregated state between `all`
// (test aggregator) and `agg` (code under test).
func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregatortest.Profile, agg *histogram.Aggregator) {

all.Sort()

asum, err := agg.Sum()
require.NoError(t, err)

sum := all.Sum()
require.InEpsilon(t,
sum.CoerceToFloat64(profile.NumberKind),
asum.CoerceToFloat64(profile.NumberKind),
0.000000001)

count, err := agg.Count()
require.NoError(t, err)
require.Equal(t, all.Count(), count)

buckets, err := agg.Histogram()
require.NoError(t, err)

require.Equal(t, len(buckets.Counts), len(boundaries)+1,
"There should be b + 1 counts, where b is the number of boundaries")

sortedBoundaries := make([]float64, len(boundaries))
copy(sortedBoundaries, boundaries)
sort.Float64s(sortedBoundaries)

require.EqualValues(t, sortedBoundaries, buckets.Boundaries)

counts := make([]uint64, len(sortedBoundaries)+1)
idx := 0
for _, p := range points {
for _, p := range all.Points() {
for idx < len(sortedBoundaries) && p.CoerceToFloat64(profile.NumberKind) >= sortedBoundaries[idx] {
idx++
}
counts[idx]++
}

return counts
for i, v := range counts {
bCount := uint64(buckets.Counts[i])
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
}
}

func TestSynchronizedMoveReset(t *testing.T) {
Expand Down