From 69fc555a7dd20646864c188192839b7f48de3597 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Sun, 3 Jan 2021 23:56:04 -0800 Subject: [PATCH 1/5] Move emptyState() allocations outside lock --- sdk/metric/aggregator/histogram/histogram.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index eb41fb012b6..8d75077b36b 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -78,8 +78,8 @@ func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator { aggs[i] = Aggregator{ kind: desc.NumberKind(), boundaries: sortedBoundaries, - state: emptyState(sortedBoundaries), } + aggs[i].state = aggs[i].emptyState() } return aggs } @@ -123,19 +123,20 @@ func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descrip return aggregator.NewInconsistentAggregatorError(c, oa) } + newState := c.emptyState() c.lock.Lock() if o != nil { o.state = c.state } - c.state = emptyState(c.boundaries) + c.state = newState c.lock.Unlock() return nil } -func emptyState(boundaries []float64) state { +func (c *Aggregator) emptyState() state { return state{ - bucketCounts: make([]float64, len(boundaries)+1), + bucketCounts: make([]float64, len(c.boundaries)+1), } } From bf72ffb2b60dbd7be246a32d0c3f99a1ad240068 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 4 Jan 2021 12:07:26 -0800 Subject: [PATCH 2/5] Add more testing --- sdk/metric/aggregator/histogram/histogram.go | 38 +++++-- .../aggregator/histogram/histogram_test.go | 106 ++++++++---------- 2 files changed, 72 insertions(+), 72 deletions(-) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 8d75077b36b..b2401c78e23 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -38,7 +38,7 @@ type ( lock sync.Mutex boundaries []float64 kind number.Kind - state state + state *state } // state represents the state of a histogram, consisting of @@ -79,11 +79,25 @@ func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator { kind: desc.NumberKind(), boundaries: sortedBoundaries, } - aggs[i].state = aggs[i].emptyState() + aggs[i].state = aggs[i].newState() } return aggs } +func (c *Aggregator) newState() *state { + return &state{ + bucketCounts: make([]float64, len(c.boundaries)+1), + } +} + +func (c *Aggregator) clearState() { + for i := range c.state.bucketCounts { + c.state.bucketCounts[i] = 0 + } + c.state.sum = 0 + c.state.count = 0 +} + // Aggregation returns an interface for reading the state of this aggregator. func (c *Aggregator) Aggregation() aggregation.Aggregation { return c @@ -123,23 +137,27 @@ func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descrip return aggregator.NewInconsistentAggregatorError(c, oa) } - newState := c.emptyState() + var next *state + + if o != nil { + // Swap case. + o.clearState() + next = o.state + } else { + // No swap is available. + next = c.newState() + } + c.lock.Lock() if o != nil { o.state = c.state } - c.state = newState + c.state = next c.lock.Unlock() return nil } -func (c *Aggregator) emptyState() state { - return state{ - bucketCounts: make([]float64, len(c.boundaries)+1), - } -} - // Update adds the recorded measurement to the current data set. func (c *Aggregator) Update(_ context.Context, number number.Number, desc *metric.Descriptor) error { kind := desc.NumberKind() diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index e1a096f46d1..b8c058a5ab6 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -115,42 +115,23 @@ func testHistogram(t *testing.T, profile aggregatortest.Profile, policy policy) agg, ckpt := new2(descriptor) - all := aggregatortest.NewNumbers(profile.NumberKind) - - for i := 0; i < count; i++ { - x := profile.Random(policy.sign()) - all.Append(x) - aggregatortest.CheckedUpdate(t, agg, x, descriptor) - } - - require.NoError(t, agg.SynchronizedMove(ckpt, descriptor)) - - checkZero(t, agg, descriptor) - - all.Sort() - - asum, err := ckpt.Sum() - sum := all.Sum() - require.InEpsilon(t, - sum.CoerceToFloat64(profile.NumberKind), - asum.CoerceToFloat64(profile.NumberKind), - 0.000000001, - "Same sum - "+policy.name) - require.NoError(t, err) + // This needs to repeat at least 3 times to uncover a failure to reset + // for the overall sum and count fields, since the third time through + // is the first time a `histogram.state` object is reused. + for repeat := 0; repeat < 3; repeat++ { + all := aggregatortest.NewNumbers(profile.NumberKind) - count, err := ckpt.Count() - require.Equal(t, all.Count(), count, "Same count -"+policy.name) - require.NoError(t, err) + for i := 0; i < count; i++ { + x := profile.Random(policy.sign()) + all.Append(x) + aggregatortest.CheckedUpdate(t, agg, x, descriptor) + } - buckets, err := ckpt.Histogram() - require.NoError(t, err) + require.NoError(t, agg.SynchronizedMove(ckpt, descriptor)) - require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + checkZero(t, agg, descriptor) - counts := calcBuckets(all.Points(), profile) - for i, v := range counts { - bCount := uint64(buckets.Counts[i]) - require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts) + checkHistogram(t, all, profile, ckpt) } } @@ -191,31 +172,7 @@ func TestHistogramMerge(t *testing.T) { aggregatortest.CheckedMerge(t, ckpt1, ckpt2, descriptor) - all.Sort() - - asum, err := ckpt1.Sum() - sum := all.Sum() - require.InEpsilon(t, - sum.CoerceToFloat64(profile.NumberKind), - asum.CoerceToFloat64(profile.NumberKind), - 0.000000001, - "Same sum - absolute") - require.NoError(t, err) - - count, err := ckpt1.Count() - require.Equal(t, all.Count(), count, "Same count - absolute") - require.NoError(t, err) - - buckets, err := ckpt1.Histogram() - require.NoError(t, err) - - require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") - - counts := calcBuckets(all.Points(), profile) - for i, v := range counts { - bCount := uint64(buckets.Counts[i]) - require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts) - } + checkHistogram(t, all, profile, ckpt1) }) } @@ -233,22 +190,47 @@ func TestHistogramNotSet(t *testing.T) { }) } -func calcBuckets(points []number.Number, profile aggregatortest.Profile) []uint64 { - sortedBoundaries := make([]float64, len(boundaries)) +func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregatortest.Profile, agg *histogram.Aggregator) { + + all.Sort() + + asum, err := agg.Sum() + require.NoError(t, err) + + sum := all.Sum() + require.InEpsilon(t, + sum.CoerceToFloat64(profile.NumberKind), + asum.CoerceToFloat64(profile.NumberKind), + 0.000000001, + "Same sum - absolute") + count, err := agg.Count() + require.NoError(t, err) + require.Equal(t, all.Count(), count) + + buckets, err := agg.Histogram() + require.NoError(t, err) + + require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + + sortedBoundaries := make([]float64, len(boundaries)) copy(sortedBoundaries, boundaries) sort.Float64s(sortedBoundaries) + require.EqualValues(t, sortedBoundaries, buckets.Boundaries) + counts := make([]uint64, len(sortedBoundaries)+1) idx := 0 - for _, p := range points { + for _, p := range all.Points() { for idx < len(sortedBoundaries) && p.CoerceToFloat64(profile.NumberKind) >= sortedBoundaries[idx] { idx++ } counts[idx]++ } - - return counts + for i, v := range counts { + bCount := uint64(buckets.Counts[i]) + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts) + } } func TestSynchronizedMoveReset(t *testing.T) { From ec57b79b9f63949de312eb67dbc0daa82feef948 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 4 Jan 2021 12:43:05 -0800 Subject: [PATCH 3/5] Re-comment; add CHANGELOG --- CHANGELOG.md | 1 + sdk/metric/aggregator/histogram/histogram.go | 21 +++++++++++-------- .../aggregator/histogram/histogram_test.go | 8 ++++--- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93bde7f4f9c..6e85eba7e5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369) - Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369) - Unify endpoint API that related to OTel exporter. (#1401) +- Optimize metric histogram aggregator to re-use its slice of buckets. (#) ### Removed diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index b2401c78e23..7a703f89584 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -137,22 +137,25 @@ func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descrip return aggregator.NewInconsistentAggregatorError(c, oa) } - var next *state - if o != nil { - // Swap case. + // Swap case: This is the ordinary case for a + // synchronous instrument, where the SDK allocates two + // Aggregators and lock contention is anticipated. + // Reset the target state before swapping it under the + // lock below. o.clearState() - next = o.state - } else { - // No swap is available. - next = c.newState() } c.lock.Lock() if o != nil { - o.state = c.state + c.state, o.state = o.state, c.state + } else { + // No swap case: This is the ordinary case for an + // asynchronous instrument, where the SDK allocates a + // single Aggregator and there is no anticipated lock + // contention. + c.clearState() } - c.state = next c.lock.Unlock() return nil diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index b8c058a5ab6..4e16370bc39 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -190,6 +190,8 @@ func TestHistogramNotSet(t *testing.T) { }) } +// checkHistogram ensures the correct aggregated state between `all` +// (test aggregator) and `agg` (code under test). func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregatortest.Profile, agg *histogram.Aggregator) { all.Sort() @@ -201,8 +203,7 @@ func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregator require.InEpsilon(t, sum.CoerceToFloat64(profile.NumberKind), asum.CoerceToFloat64(profile.NumberKind), - 0.000000001, - "Same sum - absolute") + 0.000000001) count, err := agg.Count() require.NoError(t, err) @@ -211,7 +212,8 @@ func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregator buckets, err := agg.Histogram() require.NoError(t, err) - require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(buckets.Counts), len(boundaries)+1, + "There should be b + 1 counts, where b is the number of boundaries") sortedBoundaries := make([]float64, len(boundaries)) copy(sortedBoundaries, boundaries) From 62bc5812cc17b93fb60e3576923ae3e84ef7f466 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 4 Jan 2021 12:46:08 -0800 Subject: [PATCH 4/5] Add CHANGELOG PR number --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e85eba7e5c..d600ba3e9c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369) - Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369) - Unify endpoint API that related to OTel exporter. (#1401) -- Optimize metric histogram aggregator to re-use its slice of buckets. (#) +- Optimize metric histogram aggregator to re-use its slice of buckets. (#1435) ### Removed From 58aff97f593e7eeb92b10968bd86560351d33e04 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 6 Jan 2021 12:37:07 -0800 Subject: [PATCH 5/5] Update CHANGELOG.md Co-authored-by: Sam Xie --- CHANGELOG.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f22d875899b..d39ed59c869 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,11 +25,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369) - Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369) - Unify endpoint API that related to OTel exporter. (#1401) -<<<<<<< HEAD - Optimize metric histogram aggregator to re-use its slice of buckets. (#1435) -======= - Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430) ->>>>>>> origin ### Removed