From 81b2a33e1bbc57ef8f401e5ee7aaa9bc6b7ac772 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 18 Oct 2024 09:05:10 -0400 Subject: [PATCH] Add selector of exemplar reservoir providers to metric.Stream configuration (#5861) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolve https://github.com/open-telemetry/opentelemetry-go/issues/5249 ### Spec > exemplar_reservoir: A functional type that generates an exemplar reservoir a MeterProvider will use when storing exemplars. This functional type needs to be a factory or callback similar to aggregation selection functionality which allows different reservoirs to be chosen by the aggregation. > Users can provide an exemplar_reservoir, but it is up to their discretion. Therefore, the stream configuration parameter needs to be structured to accept an exemplar_reservoir, but MUST NOT obligate a user to provide one. If the user does not provide an exemplar_reservoir value, the MeterProvider MUST apply a [default exemplar reservoir](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar-defaults). Also, > the reservoir MUST be given the Attributes associated with its timeseries point either at construction so that additional sampling performed by the reservoir has access to all attributes from a measurement in the "offer" method. ### Changes In sdk/metric/exemplar, add: * `exemplar.ReservoirProvider` * `exemplar.HistogramReservoirProvider` * `exemplar.FixedSizeReservoirProvider` In sdk/metric, add: * `metric.ExemplarReservoirProviderSelector` (func Aggregation -> ReservoirProvider) * `metric.DefaultExemplarReservoirProviderSelector` (our default implementation) * `ExemplarReservoirProviderSelector` added to `metric.Stream` Note: the only required public types are `metric.ExemplarReservoirProviderSelector` and `ExemplarReservoirProviderSelector` in `metric.Stream`. Others are for convenience and readability. ### Alternatives considered #### Add ExemplarReservoirProvider directly to metric.Stream, instead of ExemplarReservoirProviderSelector This would mean users can configure a `func() exemplar.Reservoir` instead of a `func(Aggregation) func() exemplar.Reservoir`. I don't think this complies with the statement: `This functional type needs to be a factory or callback similar to aggregation selection functionality which allows different reservoirs to be chosen by the aggregation.`. I'm interpreting "allows different reservoirs to be chosen by the aggregation" as meaning "allows different reservoirs to be chosen **based on the** aggregation", rather than meaning that the aggregation is somehow choosing the reservoir. ### Future work There is some refactoring I plan to do after this to simplify the interaction between the internal/aggregate and exemplar package. I've omitted that from this PR to keep the diff smaller. --------- Co-authored-by: Tyler Yahn Co-authored-by: Robert Pająk --- CHANGELOG.md | 3 ++ sdk/metric/example_test.go | 25 +++++++++++ sdk/metric/exemplar.go | 41 ++++++++++++++----- sdk/metric/exemplar/fixed_size_reservoir.go | 7 ++++ .../exemplar/fixed_size_reservoir_test.go | 8 ++-- sdk/metric/exemplar/histogram_reservoir.go | 12 +++++- .../exemplar/histogram_reservoir_test.go | 8 ++-- sdk/metric/exemplar/reservoir.go | 8 ++++ sdk/metric/exemplar/reservoir_test.go | 17 +++++--- sdk/metric/instrument.go | 6 +++ sdk/metric/internal/aggregate/aggregate.go | 4 +- .../internal/aggregate/aggregate_test.go | 4 +- sdk/metric/internal/aggregate/drop.go | 4 +- sdk/metric/internal/aggregate/drop_test.go | 3 +- .../aggregate/exponential_histogram.go | 6 +-- sdk/metric/internal/aggregate/histogram.go | 8 ++-- sdk/metric/internal/aggregate/lastvalue.go | 8 ++-- sdk/metric/internal/aggregate/sum.go | 10 ++--- sdk/metric/pipeline.go | 5 ++- sdk/metric/pipeline_test.go | 26 ++++++++++++ sdk/metric/view.go | 11 ++--- 21 files changed, 170 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bdb30ce4f8..5ae9192b754 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850) - Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850) +- Add `ExemplarReservoirProviderSelector` and `DefaultExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric`, which defines the exemplar reservoir to use based on the aggregation of the metric. (#5861) +- Add `ExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric.Stream` to allow using views to configure the exemplar reservoir to use for a metric. (#5861) +- Add `ReservoirProvider`, `HistogramReservoirProvider` and `FixedSizeReservoirProvider` to `go.opentelemetry.io/otel/sdk/metric/exemplar` to make it convenient to use providers of Reservoirs. (#5861) diff --git a/sdk/metric/example_test.go b/sdk/metric/example_test.go index 3fa7e969e4a..bf316ca94eb 100644 --- a/sdk/metric/example_test.go +++ b/sdk/metric/example_test.go @@ -242,6 +242,31 @@ func ExampleNewView_exponentialHistogram() { ) } +func ExampleNewView_exemplarreservoirproviderselector() { + // Create a view that makes all metrics use a different exemplar reservoir. + view := metric.NewView( + metric.Instrument{Name: "*"}, + metric.Stream{ + ExemplarReservoirProviderSelector: func(agg metric.Aggregation) exemplar.ReservoirProvider { + // This example uses a fixed-size reservoir with a size of 10 + // for explicit bucket histograms instead of the default + // bucket-aligned reservoir. + if _, ok := agg.(metric.AggregationExplicitBucketHistogram); ok { + return exemplar.FixedSizeReservoirProvider(10) + } + // Fall back to the default reservoir otherwise. + return metric.DefaultExemplarReservoirProviderSelector(agg) + }, + }, + ) + + // The created view can then be registered with the OpenTelemetry metric + // SDK using the WithView option. + _ = metric.NewMeterProvider( + metric.WithView(view), + ) +} + func ExampleWithExemplarFilter_disabled() { // Use exemplar.AlwaysOffFilter to disable exemplar collection. _ = metric.NewMeterProvider( diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go index 1f8652d6eca..0335b8ae48e 100644 --- a/sdk/metric/exemplar.go +++ b/sdk/metric/exemplar.go @@ -5,25 +5,48 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "runtime" - "slices" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" ) +// ExemplarReservoirProviderSelector selects the +// [exemplar.ReservoirProvider] to use +// based on the [Aggregation] of the metric. +type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider + // reservoirFunc returns the appropriately configured exemplar reservoir // creation func based on the passed InstrumentKind and filter configuration. -func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] { +func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] { + return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] { + return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs)) + } +} + +// DefaultExemplarReservoirProviderSelector returns the default +// [exemplar.ReservoirProvider] for the +// provided [Aggregation]. +// +// For explicit bucket histograms with more than 1 bucket, it uses the +// [exemplar.HistogramReservoirProvider]. +// For exponential histograms, it uses the +// [exemplar.FixedSizeReservoirProvider] +// with a size of min(20, max_buckets). +// For all other aggregations, it uses the +// [exemplar.FixedSizeReservoirProvider] +// with a size equal to the number of CPUs. +// +// Exemplar default reservoirs MAY change in a minor version bump. No +// guarantees are made on the shape or statistical properties of returned +// exemplars. +func DefaultExemplarReservoirProviderSelector(agg Aggregation) exemplar.ReservoirProvider { // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults // Explicit bucket histogram aggregation with more than 1 bucket will // use AlignedHistogramBucketExemplarReservoir. a, ok := agg.(AggregationExplicitBucketHistogram) if ok && len(a.Boundaries) > 0 { - cp := slices.Clone(a.Boundaries) - return func() aggregate.FilteredExemplarReservoir[N] { - bounds := cp - return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds)) - } + return exemplar.HistogramReservoirProvider(a.Boundaries) } var n int @@ -50,7 +73,5 @@ func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) f } } - return func() aggregate.FilteredExemplarReservoir[N] { - return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n)) - } + return exemplar.FixedSizeReservoirProvider(n) } diff --git a/sdk/metric/exemplar/fixed_size_reservoir.go b/sdk/metric/exemplar/fixed_size_reservoir.go index 34160ca608b..d4aab0aad4f 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir.go +++ b/sdk/metric/exemplar/fixed_size_reservoir.go @@ -12,6 +12,13 @@ import ( "go.opentelemetry.io/otel/attribute" ) +// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir]. +func FixedSizeReservoirProvider(k int) ReservoirProvider { + return func(_ attribute.Set) Reservoir { + return NewFixedSizeReservoir(k) + } +} + // NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most // k exemplars. If there are k or less measurements made, the Reservoir will // sample each one. If there are more than k, the Reservoir will then randomly diff --git a/sdk/metric/exemplar/fixed_size_reservoir_test.go b/sdk/metric/exemplar/fixed_size_reservoir_test.go index 1840abbd58d..0a901fd9cae 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir_test.go +++ b/sdk/metric/exemplar/fixed_size_reservoir_test.go @@ -15,12 +15,12 @@ import ( ) func TestNewFixedSizeReservoir(t *testing.T) { - t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) { - return NewFixedSizeReservoir(n), n + t.Run("Int64", ReservoirTest[int64](func(n int) (ReservoirProvider, int) { + return FixedSizeReservoirProvider(n), n })) - t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) { - return NewFixedSizeReservoir(n), n + t.Run("Float64", ReservoirTest[float64](func(n int) (ReservoirProvider, int) { + return FixedSizeReservoirProvider(n), n })) } diff --git a/sdk/metric/exemplar/histogram_reservoir.go b/sdk/metric/exemplar/histogram_reservoir.go index c27545a409a..3b76cf305a4 100644 --- a/sdk/metric/exemplar/histogram_reservoir.go +++ b/sdk/metric/exemplar/histogram_reservoir.go @@ -12,13 +12,21 @@ import ( "go.opentelemetry.io/otel/attribute" ) +// HistogramReservoirProvider is a provider of [HistogramReservoir]. +func HistogramReservoirProvider(bounds []float64) ReservoirProvider { + cp := slices.Clone(bounds) + slices.Sort(cp) + return func(_ attribute.Set) Reservoir { + return NewHistogramReservoir(cp) + } +} + // NewHistogramReservoir returns a [HistogramReservoir] that samples the last // measurement that falls within a histogram bucket. The histogram bucket // upper-boundaries are define by bounds. // -// The passed bounds will be sorted by this function. +// The passed bounds must be sorted before calling this function. func NewHistogramReservoir(bounds []float64) *HistogramReservoir { - slices.Sort(bounds) return &HistogramReservoir{ bounds: bounds, storage: newStorage(len(bounds) + 1), diff --git a/sdk/metric/exemplar/histogram_reservoir_test.go b/sdk/metric/exemplar/histogram_reservoir_test.go index 64c101cb057..3f43e801e8f 100644 --- a/sdk/metric/exemplar/histogram_reservoir_test.go +++ b/sdk/metric/exemplar/histogram_reservoir_test.go @@ -7,11 +7,11 @@ import "testing" func TestHist(t *testing.T) { bounds := []float64{0, 100} - t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) { - return NewHistogramReservoir(bounds), len(bounds) + t.Run("Int64", ReservoirTest[int64](func(int) (ReservoirProvider, int) { + return HistogramReservoirProvider(bounds), len(bounds) })) - t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) { - return NewHistogramReservoir(bounds), len(bounds) + t.Run("Float64", ReservoirTest[float64](func(int) (ReservoirProvider, int) { + return HistogramReservoirProvider(bounds), len(bounds) })) } diff --git a/sdk/metric/exemplar/reservoir.go b/sdk/metric/exemplar/reservoir.go index 055ce5bc8ec..ba5cd1a6b3d 100644 --- a/sdk/metric/exemplar/reservoir.go +++ b/sdk/metric/exemplar/reservoir.go @@ -30,3 +30,11 @@ type Reservoir interface { // The Reservoir state is preserved after this call. Collect(dest *[]Exemplar) } + +// ReservoirProvider creates new [Reservoir]s. +// +// The attributes provided are attributes which are kept by the aggregation, and +// are exclusive with attributes passed to Offer. The combination of these +// attributes and the attributes passed to Offer is the complete set of +// attributes a measurement was made with. +type ReservoirProvider func(attr attribute.Set) Reservoir diff --git a/sdk/metric/exemplar/reservoir_test.go b/sdk/metric/exemplar/reservoir_test.go index 88bb53757e9..6d9336b44a1 100644 --- a/sdk/metric/exemplar/reservoir_test.go +++ b/sdk/metric/exemplar/reservoir_test.go @@ -18,7 +18,7 @@ import ( // Sat Jan 01 2000 00:00:00 GMT+0000. var staticTime = time.Unix(946684800, 0) -type factory func(requestedCap int) (r Reservoir, actualCap int) +type factory func(requestedCap int) (r ReservoirProvider, actualCap int) func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { return func(t *testing.T) { @@ -29,10 +29,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("CaptureSpanContext", func(t *testing.T) { t.Helper() - r, n := f(1) + rp, n := f(1) if n < 1 { t.Skip("skipping, reservoir capacity less than 1:", n) } + r := rp(*attribute.EmptySet()) tID, sID := trace.TraceID{0x01}, trace.SpanID{0x01} sc := trace.NewSpanContext(trace.SpanContextConfig{ @@ -60,10 +61,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("FilterAttributes", func(t *testing.T) { t.Helper() - r, n := f(1) + rp, n := f(1) if n < 1 { t.Skip("skipping, reservoir capacity less than 1:", n) } + r := rp(*attribute.EmptySet()) adminTrue := attribute.Bool("admin", true) r.Offer(ctx, staticTime, NewValue(N(10)), []attribute.KeyValue{adminTrue}) @@ -83,10 +85,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("CollectLessThanN", func(t *testing.T) { t.Helper() - r, n := f(2) + rp, n := f(2) if n < 2 { t.Skip("skipping, reservoir capacity less than 2:", n) } + r := rp(*attribute.EmptySet()) r.Offer(ctx, staticTime, NewValue(N(10)), nil) @@ -99,10 +102,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("MultipleOffers", func(t *testing.T) { t.Helper() - r, n := f(3) + rp, n := f(3) if n < 1 { t.Skip("skipping, reservoir capacity less than 1:", n) } + r := rp(*attribute.EmptySet()) for i := 0; i < n+1; i++ { v := NewValue(N(i)) @@ -127,10 +131,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("DropAll", func(t *testing.T) { t.Helper() - r, n := f(0) + rp, n := f(0) if n > 0 { t.Skip("skipping, reservoir capacity greater than 0:", n) } + r := rp(*attribute.EmptySet()) r.Offer(context.Background(), staticTime, NewValue(N(10)), nil) diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index 2e6ac543401..48b723a7b3b 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -144,6 +144,12 @@ type Stream struct { // Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to // provide an allow-list of attribute keys here. AttributeFilter attribute.Filter + // ExemplarReservoirProvider selects the + // [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] based + // on the [Aggregation]. + // + // If unspecified, [DefaultExemplarReservoirProviderSelector] is used. + ExemplarReservoirProviderSelector ExemplarReservoirProviderSelector } // instID are the identifying properties of a instrument. diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 25de3b05086..fde21933389 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -38,7 +38,7 @@ type Builder[N int64 | float64] struct { // // If this is not provided a default factory function that returns an // dropReservoir reservoir will be used. - ReservoirFunc func() FilteredExemplarReservoir[N] + ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N] // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" @@ -49,7 +49,7 @@ type Builder[N int64 | float64] struct { AggregationLimit int } -func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] { +func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] { if b.ReservoirFunc != nil { return b.ReservoirFunc } diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index c7e242c041f..b0034010861 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -72,8 +72,8 @@ func (c *clock) Register() (unregister func()) { return func() { now = orig } } -func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] { - return dropReservoir[N]() +func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] { + return dropReservoir[N](attr) } func TestBuilderFilter(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/drop.go b/sdk/metric/internal/aggregate/drop.go index dfc5a033395..76d52839b60 100644 --- a/sdk/metric/internal/aggregate/drop.go +++ b/sdk/metric/internal/aggregate/drop.go @@ -11,7 +11,9 @@ import ( ) // dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered. -func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} } +func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] { + return &dropRes[N]{} +} type dropRes[N int64 | float64] struct{} diff --git a/sdk/metric/internal/aggregate/drop_test.go b/sdk/metric/internal/aggregate/drop_test.go index fee90adc777..38781e4c6ad 100644 --- a/sdk/metric/internal/aggregate/drop_test.go +++ b/sdk/metric/internal/aggregate/drop_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/exemplar" ) @@ -17,7 +18,7 @@ func TestDrop(t *testing.T) { } func testDropFiltered[N int64 | float64](t *testing.T) { - r := dropReservoir[N]() + r := dropReservoir[N](*attribute.EmptySet()) var dest []exemplar.Exemplar r.Collect(&dest) diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index a4de5674ba1..b7aa721651e 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, @@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int32 - newRes func() FilteredExemplarReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -327,7 +327,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib v, ok := e.values[attr.Equivalent()] if !ok { v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) - v.res = e.newRes() + v.res = e.newRes(attr) e.values[attr.Equivalent()] = v } diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 35d020378bd..d577ae2c198 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 - newRes func() FilteredExemplarReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[*buckets[N]] values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -93,7 +93,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) b = newBuckets[N](attr, len(s.bounds)+1) - b.res = s.newRes() + b.res = s.newRes(attr) // Ensure min and max are recorded values (not zero), for new buckets. b.min, b.max = value, value @@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] { return &histogram[N]{ histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index a7b5fe572be..d3a93f085c9 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -19,7 +19,7 @@ type datapoint[N int64 | float64] struct { res FilteredExemplarReservoir[N] } -func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *lastValue[N] { +func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] { return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), @@ -32,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservo type lastValue[N int64 | float64] struct { sync.Mutex - newRes func() FilteredExemplarReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[datapoint[N]] values map[attribute.Distinct]datapoint[N] start time.Time @@ -45,7 +45,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. attr := s.limit.Attributes(fltrAttr, s.values) d, ok := s.values[attr.Equivalent()] if !ok { - d.res = s.newRes() + d.res = s.newRes(attr) } d.attrs = attr @@ -114,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in // newPrecomputedLastValue returns an aggregator that summarizes a set of // observations as the last one made. -func newPrecomputedLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *precomputedLastValue[N] { +func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] { return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} } diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index c3b591c37c0..8e132ad6181 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -21,12 +21,12 @@ type sumValue[N int64 | float64] struct { // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - newRes func() FilteredExemplarReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[sumValue[N]] values map[attribute.Distinct]sumValue[N] } -func newValueMap[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), @@ -41,7 +41,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S attr := s.limit.Attributes(fltrAttr, s.values) v, ok := s.values[attr.Equivalent()] if !ok { - v.res = s.newRes() + v.res = s.newRes(attr) } v.attrs = attr @@ -54,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] { return &sum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -143,7 +143,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observations as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index fbc9b8649e6..960b0e0ef01 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -332,6 +332,9 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum // The view explicitly requested the default aggregation. stream.Aggregation = DefaultAggregationSelector(kind) } + if stream.ExemplarReservoirProviderSelector == nil { + stream.ExemplarReservoirProviderSelector = DefaultExemplarReservoirProviderSelector + } if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil { return nil, 0, fmt.Errorf( @@ -352,7 +355,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ Temporality: i.pipeline.reader.temporality(kind), - ReservoirFunc: reservoirFunc[N](stream.Aggregation, i.pipeline.exemplarFilter), + ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter), } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 3df43827e11..75ccf30c5d7 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -491,4 +491,30 @@ func TestExemplars(t *testing.T) { measure(sampled, m) check(t, r, nCPU, 1, 20) }) + + t.Run("Custom reservoir", func(t *testing.T) { + r := NewManualReader() + reservoirProviderSelector := func(agg Aggregation) exemplar.ReservoirProvider { + return exemplar.FixedSizeReservoirProvider(2) + } + v1 := NewView(Instrument{Name: "int64-expo-histogram"}, Stream{ + Aggregation: AggregationBase2ExponentialHistogram{ + MaxSize: 160, // > 20, reservoir size should default to 20. + MaxScale: 20, + }, + ExemplarReservoirProviderSelector: reservoirProviderSelector, + }) + v2 := NewView(Instrument{Name: "int64-counter"}, Stream{ + ExemplarReservoirProviderSelector: reservoirProviderSelector, + }) + v3 := NewView(Instrument{Name: "int64-histogram"}, Stream{ + ExemplarReservoirProviderSelector: reservoirProviderSelector, + }) + m := NewMeterProvider(WithReader(r), WithView(v1, v2, v3)).Meter("custom-reservoir") + measure(ctx, m) + check(t, r, 0, 0, 0) + + measure(sampled, m) + check(t, r, 2, 2, 2) + }) } diff --git a/sdk/metric/view.go b/sdk/metric/view.go index cd08c673248..630890f4263 100644 --- a/sdk/metric/view.go +++ b/sdk/metric/view.go @@ -96,11 +96,12 @@ func NewView(criteria Instrument, mask Stream) View { return func(i Instrument) (Stream, bool) { if matchFunc(i) { return Stream{ - Name: nonZero(mask.Name, i.Name), - Description: nonZero(mask.Description, i.Description), - Unit: nonZero(mask.Unit, i.Unit), - Aggregation: agg, - AttributeFilter: mask.AttributeFilter, + Name: nonZero(mask.Name, i.Name), + Description: nonZero(mask.Description, i.Description), + Unit: nonZero(mask.Unit, i.Unit), + Aggregation: agg, + AttributeFilter: mask.AttributeFilter, + ExemplarReservoirProviderSelector: mask.ExemplarReservoirProviderSelector, }, true } return Stream{}, false