Skip to content

Commit

Permalink
plumb exemplarFilter configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Sep 27, 2024
1 parent 6b46cf8 commit 988e0db
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 50 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `SampledFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747)
- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850)
- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850)

### Changed

Expand Down
29 changes: 26 additions & 3 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
"fmt"
"os"
"sync"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -78,10 +79,16 @@ func unifyShutdown(funcs []func(context.Context) error) func(context.Context) er

// newConfig returns a config configured with options.
func newConfig(options []Option) config {
conf := config{res: resource.Default()}
conf := config{
res: resource.Default(),
exemplarFilter: exemplar.SampledFilter,
}
for _, o := range options {
conf = o.apply(conf)
}
for _, o := range optionsFromEnv() {
conf = o.apply(conf)
}
return conf
}

Expand Down Expand Up @@ -150,11 +157,27 @@ func WithView(views ...View) Option {
// whether to store an exemplar.
//
// By default, the [go.opentelemetry.io/otel/sdk/metric/exemplar.SampledFilter]
// is used. Exemplars can be disabled by providing the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter]
// is used. Exemplars can be entirely disabled by providing the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter].
func WithExemplarFilter(filter exemplar.Filter) Option {
return optionFunc(func(cfg config) config {
cfg.exemplarFilter = filter
return cfg
})
}

func optionsFromEnv() []Option {
var opts []Option
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

switch os.Getenv(filterEnvKey) {
case "always_on":
opts = append(opts, WithExemplarFilter(exemplar.AlwaysOnFilter))
case "always_off":
opts = append(opts, WithExemplarFilter(exemplar.AlwaysOffFilter))
case "trace_based":
opts = append(opts, WithExemplarFilter(exemplar.SampledFilter))
}
return opts
}
26 changes: 2 additions & 24 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"os"
"runtime"
"slices"

Expand All @@ -13,29 +12,8 @@ import (
)

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and user defined
// environment variables.
//
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.FilteredExemplarReservoir[N] {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

var filter exemplar.Filter

switch os.Getenv(filterEnvKey) {
case "always_on":
filter = exemplar.AlwaysOnFilter
case "always_off":
filter = exemplar.AlwaysOffFilter
case "trace_based":
fallthrough
default:
filter = exemplar.SampledFilter
}

// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
Expand Down
17 changes: 10 additions & 7 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
Expand All @@ -38,14 +39,15 @@ type instrumentSync struct {
compAgg aggregate.ComputeAggregation
}

func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline {
func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline {
if res == nil {
res = resource.Empty()
}
return &pipeline{
resource: res,
reader: reader,
views: views,
resource: res,
reader: reader,
views: views,
exemplarFilter: exemplarFilter,
// aggregations is lazy allocated when needed.
}
}
Expand All @@ -66,6 +68,7 @@ type pipeline struct {
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
}

// addSync adds the instrumentSync to pipeline p with scope. This method is not
Expand Down Expand Up @@ -349,7 +352,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
ReservoirFunc: reservoirFunc[N](stream.Aggregation),
ReservoirFunc: reservoirFunc[N](stream.Aggregation, i.pipeline.exemplarFilter),
}
b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation
Expand Down Expand Up @@ -552,10 +555,10 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
// measurement.
type pipelines []*pipeline

func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines {
func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for _, r := range readers {
p := newPipeline(res, r, views)
p := newPipeline(res, r, views, exemplarFilter)
r.register(p)
pipes = append(pipes, p)
}
Expand Down
15 changes: 8 additions & 7 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
Expand Down Expand Up @@ -357,7 +358,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var c cache[string, instID]
p := newPipeline(nil, tt.reader, tt.views)
p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter)
i := newInserter[N](p, &c)
readerAggregation := i.readerDefaultAggregation(tt.inst.Kind)
input, err := i.Instrument(tt.inst, readerAggregation)
Expand All @@ -379,7 +380,7 @@ func TestCreateAggregators(t *testing.T) {

func testInvalidInstrumentShouldPanic[N int64 | float64]() {
var c cache[string, instID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c)
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter), &c)
inst := Instrument{
Name: "foo",
Kind: InstrumentKind(255),
Expand All @@ -395,7 +396,7 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) {

func TestPipelinesAggregatorForEachReader(t *testing.T) {
r0, r1 := NewManualReader(), NewManualReader()
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil)
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter)
require.Len(t, pipes, 2, "created pipelines")

inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
Expand Down Expand Up @@ -467,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
p := newPipelines(resource.Empty(), tt.readers, tt.views)
p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter)
testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount)
testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount)
testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount)
Expand Down Expand Up @@ -521,7 +522,7 @@ func TestPipelineRegistryResource(t *testing.T) {
readers := []Reader{NewManualReader()}
views := []View{defaultView, v}
res := resource.NewSchemaless(attribute.String("key", "val"))
pipes := newPipelines(res, readers, views)
pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter)
for _, p := range pipes {
assert.True(t, res.Equal(p.resource), "resource not set")
}
Expand All @@ -532,7 +533,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {

readers := []Reader{testRdrHistogram}
views := []View{defaultView}
p := newPipelines(resource.Empty(), readers, views)
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}

var vc cache[string, instID]
Expand Down Expand Up @@ -592,7 +593,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
fooInst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
barInst := Instrument{Name: "bar", Kind: InstrumentKindCounter}

p := newPipelines(resource.Empty(), readers, views)
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter)

var vc cache[string, instID]
ri := newResolver[int64](p, &vc)
Expand Down
17 changes: 9 additions & 8 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
Expand All @@ -39,7 +40,7 @@ func testSumAggregateOutput(dest *metricdata.Aggregation) int {
}

func TestNewPipeline(t *testing.T) {
pipe := newPipeline(nil, nil, nil)
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)

output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
Expand All @@ -65,7 +66,7 @@ func TestNewPipeline(t *testing.T) {

func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil)
pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter)

output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
Expand All @@ -74,7 +75,7 @@ func TestPipelineUsesResource(t *testing.T) {
}

func TestPipelineConcurrentSafe(t *testing.T) {
pipe := newPipeline(nil, nil, nil)
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)
ctx := context.Background()
var output metricdata.ResourceMetrics

Expand Down Expand Up @@ -124,13 +125,13 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
}{
{
name: "NoView",
pipe: newPipeline(nil, reader, nil),
pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter),
},
{
name: "NoMatchingView",
pipe: newPipeline(nil, reader, []View{
NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}),
}),
}, exemplar.AlwaysOffFilter),
},
}

Expand Down Expand Up @@ -215,7 +216,7 @@ func TestLogConflictName(t *testing.T) {
return instID{Name: tc.existing}
})

i := newInserter[int64](newPipeline(nil, nil, nil), &vc)
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)
i.logConflict(instID{Name: tc.name})

if tc.conflict {
Expand Down Expand Up @@ -257,7 +258,7 @@ func TestLogConflictSuggestView(t *testing.T) {
var vc cache[string, instID]
name := strings.ToLower(orig.Name)
_ = vc.Lookup(name, func() instID { return orig })
i := newInserter[int64](newPipeline(nil, nil, nil), &vc)
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)

viewSuggestion := func(inst instID, stream string) string {
return `"NewView(Instrument{` +
Expand Down Expand Up @@ -362,7 +363,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) {
}

var vc cache[string, instID]
pipe := newPipeline(nil, NewManualReader(), nil)
pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter)
i := newInserter[int64](pipe, &vc)

readerAggregation := i.readerDefaultAggregation(kind)
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {
flush, sdown := conf.readerSignals()

mp := &MeterProvider{
pipes: newPipelines(conf.res, conf.readers, conf.views),
pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter),
forceFlush: flush,
shutdown: sdown,
}
Expand Down

0 comments on commit 988e0db

Please sign in to comment.