Skip to content

Commit

Permalink
Add selector of exemplar reservoir providers to metric.Stream configu…
Browse files Browse the repository at this point in the history
…ration
  • Loading branch information
dashpole committed Oct 1, 2024
1 parent f4e2052 commit 36f4a47
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 11 deletions.
39 changes: 30 additions & 9 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"os"
"runtime"
"slices"

"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

// ExemplarReservoirProviderSelector selects the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] to use
// based on the [Aggregation] of the metric.
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and user defined
// environment variables.
Expand All @@ -36,16 +40,35 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.Filtered
filter = exemplar.SampledFilter
}

provider := DefaultExemplarReservoirProviderSelector(agg)
return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, provider())
}
}

// DefaultExemplarReservoirProviderSelector returns the default
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] for the
// provided [Aggregation].
//
// For explicit bucket histograms with more than 1 bucket, it uses the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.HistogramReservoirProvider].
// For exponential histograms, it uses the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.FixedSizeReservoirProvider]
// with a size of min(20, max_buckets).
// For all other aggregations, it uses the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.FixedSizeReservoirProvider]
// with a size equal to the number of CPUs.
//
// Exemplar default reservoirs MAY change in a minor version bump. No
// guarantees are made on the shape or statistical properties of returned
// exemplars.
func DefaultExemplarReservoirProviderSelector(agg Aggregation) exemplar.ReservoirProvider {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
}
return exemplar.HistogramReservoirProvider(a.Boundaries)
}

var n int
Expand All @@ -72,7 +95,5 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.Filtered
}
}

return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
}
return exemplar.FixedSizeReservoirProvider(n)
}
7 changes: 7 additions & 0 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
func FixedSizeReservoirProvider(k int) ReservoirProvider {
return func() Reservoir {
return NewFixedSizeReservoir(k)
}
}

// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
Expand Down
12 changes: 10 additions & 2 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// HistogramReservoirProvider is a provider of [HistogramReservoir].
func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
cp := slices.Clone(bounds)
slices.Sort(cp)
return func() Reservoir {
return NewHistogramReservoir(cp)
}
}

// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds will be sorted by this function.
// The passed bounds must be sorted before calling this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
slices.Sort(bounds)
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/exemplar/reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ type Reservoir interface {
// The Reservoir state is preserved after this call.
Collect(dest *[]Exemplar)
}

// ReservoirProvider creates new [Reservoir]s.
type ReservoirProvider func() Reservoir
6 changes: 6 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ type Stream struct {
// Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to
// provide an allow-list of attribute keys here.
AttributeFilter attribute.Filter
// ExemplarReservoirProvider selects the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] based
// on the [Aggregation].
//
// If unspecified, [DefaultExemplarReservoirProviderSelector] is used.
ExemplarReservoirProviderSelector ExemplarReservoirProviderSelector
}

// instID are the identifying properties of a instrument.
Expand Down

0 comments on commit 36f4a47

Please sign in to comment.