Skip to content

Commit

Permalink
remove unneccessary FilteredExemplarReservoir interface
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Oct 18, 2024
1 parent faebc0d commit c9cb3b4
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 95 deletions.
11 changes: 5 additions & 6 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,13 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ExemplarFilter != nil {
return func(attrs attribute.Set) FilteredExemplarReservoir[N] {
return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs))
func (b Builder[N]) resFunc() func(attribute.Set) *filteredExemplarReservoir[N] {
return func(attrs attribute.Set) *filteredExemplarReservoir[N] {
if b.ExemplarReservoirProvider == nil {
return newFilteredExemplarReservoir[N](b.ExemplarFilter, exemplar.NewFixedSizeReservoir(0))
}
return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs))
}

return dropReservoir
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
Expand Down
5 changes: 3 additions & 2 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
Expand Down Expand Up @@ -72,8 +73,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}

func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
return dropReservoir[N](attr)
func dropExemplars[N int64 | float64](attr attribute.Set) *filteredExemplarReservoir[N] {
return newFilteredExemplarReservoir[N](exemplar.AlwaysOffFilter, exemplar.NewFixedSizeReservoir(0))
}

func TestBuilderFilter(t *testing.T) {
Expand Down
26 changes: 0 additions & 26 deletions sdk/metric/internal/aggregate/drop.go

This file was deleted.

27 changes: 0 additions & 27 deletions sdk/metric/internal/aggregate/drop_test.go

This file was deleted.

6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res FilteredExemplarReservoir[N]
res *filteredExemplarReservoir[N]

count uint64
min N
Expand Down Expand Up @@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32

newRes func(attribute.Set) FilteredExemplarReservoir[N]
newRes func(attribute.Set) *filteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand Down
23 changes: 5 additions & 18 deletions sdk/metric/internal/aggregate/filtered_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,24 @@ import (
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
type FilteredExemplarReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]exemplar.Exemplar)
}

// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct {
filter exemplar.Filter
reservoir exemplar.Reservoir
}

// newFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
// that are allowed by the filter.
func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
// newFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which
// only offers values that are allowed by the filter. If the provided filter is
// nil, all measurements are dropped..
func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) *filteredExemplarReservoir[N] {
return &filteredExemplarReservoir[N]{
filter: f,
reservoir: r,
}
}

func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
if f.filter != nil && f.filter(ctx) {
// only record the current time if we are sampling this measurement.
f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

type buckets[N int64 | float64] struct {
attrs attribute.Set
res FilteredExemplarReservoir[N]
res *filteredExemplarReservoir[N]

counts []uint64
count uint64
Expand Down Expand Up @@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func(attribute.Set) FilteredExemplarReservoir[N]
newRes func(attribute.Set) *filteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
type datapoint[N int64 | float64] struct {
attrs attribute.Set
value N
res FilteredExemplarReservoir[N]
res *filteredExemplarReservoir[N]
}

func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
Expand All @@ -32,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredEx
type lastValue[N int64 | float64] struct {
sync.Mutex

newRes func(attribute.Set) FilteredExemplarReservoir[N]
newRes func(attribute.Set) *filteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in

// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}

Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/internal/aggregate/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ import (

type sumValue[N int64 | float64] struct {
n N
res FilteredExemplarReservoir[N]
res *filteredExemplarReservoir[N]
attrs attribute.Set
}

// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
newRes func(attribute.Set) FilteredExemplarReservoir[N]
newRes func(attribute.Set) *filteredExemplarReservoir[N]
limit limiter[sumValue[N]]
values map[attribute.Distinct]sumValue[N]
}

func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] {
func newValueMap[N int64 | float64](limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *valueMap[N] {
return &valueMap[N]{
newRes: r,
limit: newLimiter[sumValue[N]](limit),
Expand Down Expand Up @@ -54,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] {
func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
Expand Down Expand Up @@ -143,7 +143,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
// newPrecomputedSum returns an aggregator that summarizes a set of
// observations as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] {
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
Expand Down

0 comments on commit c9cb3b4

Please sign in to comment.