From eefbce261b666540292dee9182f4612815f73436 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 15 Oct 2024 16:05:01 +0000 Subject: [PATCH] plumb selector to reservoir func --- sdk/metric/exemplar.go | 3 +-- sdk/metric/pipeline.go | 5 ++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go index 066b54bf871a..42fd4a2d045f 100644 --- a/sdk/metric/exemplar.go +++ b/sdk/metric/exemplar.go @@ -18,8 +18,7 @@ type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvi // reservoirFunc returns the appropriately configured exemplar reservoir // creation func based on the passed InstrumentKind and filter configuration. -func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] { - provider := DefaultExemplarReservoirProviderSelector(agg) +func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] { return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] { return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs)) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index fbc9b8649e64..960b0e0ef01b 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -332,6 +332,9 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum // The view explicitly requested the default aggregation. stream.Aggregation = DefaultAggregationSelector(kind) } + if stream.ExemplarReservoirProviderSelector == nil { + stream.ExemplarReservoirProviderSelector = DefaultExemplarReservoirProviderSelector + } if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil { return nil, 0, fmt.Errorf( @@ -352,7 +355,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ Temporality: i.pipeline.reader.temporality(kind), - ReservoirFunc: reservoirFunc[N](stream.Aggregation, i.pipeline.exemplarFilter), + ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter), } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation