From 988e0db46a4f1d5a963a560ce56727a38e142f2d Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 27 Sep 2024 20:36:36 +0000 Subject: [PATCH] plumb exemplarFilter configuration --- CHANGELOG.md | 2 ++ sdk/metric/config.go | 29 +++++++++++++++++++++++++--- sdk/metric/exemplar.go | 26 ++----------------------- sdk/metric/pipeline.go | 17 +++++++++------- sdk/metric/pipeline_registry_test.go | 15 +++++++------- sdk/metric/pipeline_test.go | 17 ++++++++-------- sdk/metric/provider.go | 2 +- 7 files changed, 58 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b201a5d6763..443fa41516e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `SampledFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747) +- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850) +- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850) ### Changed diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 76dc5871daa..6f38f05480f 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -6,6 +6,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" "fmt" + "os" "sync" "go.opentelemetry.io/otel" @@ -78,10 +79,16 @@ func unifyShutdown(funcs []func(context.Context) error) func(context.Context) er // newConfig returns a config configured with options. func newConfig(options []Option) config { - conf := config{res: resource.Default()} + conf := config{ + res: resource.Default(), + exemplarFilter: exemplar.SampledFilter, + } for _, o := range options { conf = o.apply(conf) } + for _, o := range optionsFromEnv() { + conf = o.apply(conf) + } return conf } @@ -150,11 +157,27 @@ func WithView(views ...View) Option { // whether to store an exemplar. // // By default, the [go.opentelemetry.io/otel/sdk/metric/exemplar.SampledFilter] -// is used. Exemplars can be disabled by providing the -// [go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter] +// is used. Exemplars can be entirely disabled by providing the +// [go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter]. func WithExemplarFilter(filter exemplar.Filter) Option { return optionFunc(func(cfg config) config { cfg.exemplarFilter = filter return cfg }) } + +func optionsFromEnv() []Option { + var opts []Option + // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar + const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" + + switch os.Getenv(filterEnvKey) { + case "always_on": + opts = append(opts, WithExemplarFilter(exemplar.AlwaysOnFilter)) + case "always_off": + opts = append(opts, WithExemplarFilter(exemplar.AlwaysOffFilter)) + case "trace_based": + opts = append(opts, WithExemplarFilter(exemplar.SampledFilter)) + } + return opts +} diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go index 8279cfbd06a..1f8652d6eca 100644 --- a/sdk/metric/exemplar.go +++ b/sdk/metric/exemplar.go @@ -4,7 +4,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "os" "runtime" "slices" @@ -13,29 +12,8 @@ import ( ) // reservoirFunc returns the appropriately configured exemplar reservoir -// creation func based on the passed InstrumentKind and user defined -// environment variables. -// -// Note: This will only return non-nil values when the experimental exemplar -// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable -// is not set to always_off. -func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.FilteredExemplarReservoir[N] { - // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar - const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" - - var filter exemplar.Filter - - switch os.Getenv(filterEnvKey) { - case "always_on": - filter = exemplar.AlwaysOnFilter - case "always_off": - filter = exemplar.AlwaysOffFilter - case "trace_based": - fallthrough - default: - filter = exemplar.SampledFilter - } - +// creation func based on the passed InstrumentKind and filter configuration. +func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] { // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults // Explicit bucket histogram aggregation with more than 1 bucket will // use AlignedHistogramBucketExemplarReservoir. diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 823bf2fe3d2..fbc9b8649e6 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/internal/x" @@ -38,14 +39,15 @@ type instrumentSync struct { compAgg aggregate.ComputeAggregation } -func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline { +func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline { if res == nil { res = resource.Empty() } return &pipeline{ - resource: res, - reader: reader, - views: views, + resource: res, + reader: reader, + views: views, + exemplarFilter: exemplarFilter, // aggregations is lazy allocated when needed. } } @@ -66,6 +68,7 @@ type pipeline struct { aggregations map[instrumentation.Scope][]instrumentSync callbacks []func(context.Context) error multiCallbacks list.List + exemplarFilter exemplar.Filter } // addSync adds the instrumentSync to pipeline p with scope. This method is not @@ -349,7 +352,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ Temporality: i.pipeline.reader.temporality(kind), - ReservoirFunc: reservoirFunc[N](stream.Aggregation), + ReservoirFunc: reservoirFunc[N](stream.Aggregation, i.pipeline.exemplarFilter), } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation @@ -552,10 +555,10 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error { // measurement. type pipelines []*pipeline -func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines { +func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines { pipes := make([]*pipeline, 0, len(readers)) for _, r := range readers { - p := newPipeline(res, r, views) + p := newPipeline(res, r, views, exemplarFilter) r.register(p) pipes = append(pipes, p) } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index fc632cbb101..d1fb71a3b13 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" @@ -357,7 +358,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { var c cache[string, instID] - p := newPipeline(nil, tt.reader, tt.views) + p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter) i := newInserter[N](p, &c) readerAggregation := i.readerDefaultAggregation(tt.inst.Kind) input, err := i.Instrument(tt.inst, readerAggregation) @@ -379,7 +380,7 @@ func TestCreateAggregators(t *testing.T) { func testInvalidInstrumentShouldPanic[N int64 | float64]() { var c cache[string, instID] - i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c) + i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter), &c) inst := Instrument{ Name: "foo", Kind: InstrumentKind(255), @@ -395,7 +396,7 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) { func TestPipelinesAggregatorForEachReader(t *testing.T) { r0, r1 := NewManualReader(), NewManualReader() - pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil) + pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter) require.Len(t, pipes, 2, "created pipelines") inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} @@ -467,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - p := newPipelines(resource.Empty(), tt.readers, tt.views) + p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter) testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount) testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount) testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount) @@ -521,7 +522,7 @@ func TestPipelineRegistryResource(t *testing.T) { readers := []Reader{NewManualReader()} views := []View{defaultView, v} res := resource.NewSchemaless(attribute.String("key", "val")) - pipes := newPipelines(res, readers, views) + pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter) for _, p := range pipes { assert.True(t, res.Equal(p.resource), "resource not set") } @@ -532,7 +533,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { readers := []Reader{testRdrHistogram} views := []View{defaultView} - p := newPipelines(resource.Empty(), readers, views) + p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter) inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge} var vc cache[string, instID] @@ -592,7 +593,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { fooInst := Instrument{Name: "foo", Kind: InstrumentKindCounter} barInst := Instrument{Name: "bar", Kind: InstrumentKindCounter} - p := newPipelines(resource.Empty(), readers, views) + p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter) var vc cache[string, instID] ri := newResolver[int64](p, &vc) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 43f9499a09e..3df43827e11 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/resource" @@ -39,7 +40,7 @@ func testSumAggregateOutput(dest *metricdata.Aggregation) int { } func TestNewPipeline(t *testing.T) { - pipe := newPipeline(nil, nil, nil) + pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter) output := metricdata.ResourceMetrics{} err := pipe.produce(context.Background(), &output) @@ -65,7 +66,7 @@ func TestNewPipeline(t *testing.T) { func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) - pipe := newPipeline(res, nil, nil) + pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter) output := metricdata.ResourceMetrics{} err := pipe.produce(context.Background(), &output) @@ -74,7 +75,7 @@ func TestPipelineUsesResource(t *testing.T) { } func TestPipelineConcurrentSafe(t *testing.T) { - pipe := newPipeline(nil, nil, nil) + pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter) ctx := context.Background() var output metricdata.ResourceMetrics @@ -124,13 +125,13 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { }{ { name: "NoView", - pipe: newPipeline(nil, reader, nil), + pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter), }, { name: "NoMatchingView", pipe: newPipeline(nil, reader, []View{ NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}), - }), + }, exemplar.AlwaysOffFilter), }, } @@ -215,7 +216,7 @@ func TestLogConflictName(t *testing.T) { return instID{Name: tc.existing} }) - i := newInserter[int64](newPipeline(nil, nil, nil), &vc) + i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc) i.logConflict(instID{Name: tc.name}) if tc.conflict { @@ -257,7 +258,7 @@ func TestLogConflictSuggestView(t *testing.T) { var vc cache[string, instID] name := strings.ToLower(orig.Name) _ = vc.Lookup(name, func() instID { return orig }) - i := newInserter[int64](newPipeline(nil, nil, nil), &vc) + i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc) viewSuggestion := func(inst instID, stream string) string { return `"NewView(Instrument{` + @@ -362,7 +363,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) { } var vc cache[string, instID] - pipe := newPipeline(nil, NewManualReader(), nil) + pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter) i := newInserter[int64](pipe, &vc) readerAggregation := i.readerDefaultAggregation(kind) diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index a82af538e67..7b0c0dbf714 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -42,7 +42,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { flush, sdown := conf.readerSignals() mp := &MeterProvider{ - pipes: newPipelines(conf.res, conf.readers, conf.views), + pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter), forceFlush: flush, shutdown: sdown, }