From b6ed43ac10e778a946076f93f5357869af443124 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 11 Oct 2023 14:07:45 +0000 Subject: [PATCH 1/3] implement WithExplicitBucketBoundaries option in the metric SDK --- CHANGELOG.md | 1 + sdk/metric/meter.go | 54 ++++++++++++++++-- sdk/metric/meter_test.go | 83 ++++++++++++++++++++++++++++ sdk/metric/pipeline.go | 81 ++++++++++++++++++--------- sdk/metric/pipeline_registry_test.go | 36 +++++++++++- sdk/metric/pipeline_test.go | 8 ++- 6 files changed, 229 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a91d8f99d1b..533caacf176 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `Version` function in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#4660) - Add `Version` function in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4660) - Add Summary, SummaryDataPoint, and QuantileValue to `go.opentelemetry.io/sdk/metric/metricdata`. (#4622) +- Add support for WithExplicitBucketBoundaries in `go.opentelemetry.io/otel/sdk/metric` (#4605) ### Deprecated diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 0c17dfb20a5..e60fdc70c53 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -95,9 +95,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou // distribution of int64 measurements during a computational operation. func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) { cfg := metric.NewInt64HistogramConfig(options...) - const kind = InstrumentKindHistogram p := int64InstProvider{m} - i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + i, err := p.lookupHistogram(name, cfg) if err != nil { return i, err } @@ -188,9 +187,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow // distribution of float64 measurements during a computational operation. func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { cfg := metric.NewFloat64HistogramConfig(options...) - const kind = InstrumentKindHistogram p := float64InstProvider{m} - i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + i, err := p.lookupHistogram(name, cfg) if err != nil { return i, err } @@ -456,12 +454,36 @@ func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]ag return p.int64Resolver.Aggregators(inst) } +func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramConfig) ([]aggregate.Measure[int64], error) { + boundaries := cfg.ExplicitBucketBoundaries() + aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err() + if aggError != nil { + // if boundaries are invalid, ignore them + boundaries = nil + } + inst := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindHistogram, + Scope: p.scope, + } + measures, err := p.int64Resolver.HistogramAggregators(inst, boundaries) + return measures, errors.Join(aggError, err) +} + // lookup returns the resolved instrumentImpl. func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { aggs, err := p.aggs(kind, name, desc, u) return &int64Inst{measures: aggs}, err } +// lookupHistogram returns the resolved instrumentImpl. +func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) { + aggs, err := p.histogramAggs(name, cfg) + return &int64Inst{measures: aggs}, err +} + // float64InstProvider provides float64 OpenTelemetry instruments. type float64InstProvider struct{ *meter } @@ -476,12 +498,36 @@ func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([] return p.float64Resolver.Aggregators(inst) } +func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64HistogramConfig) ([]aggregate.Measure[float64], error) { + boundaries := cfg.ExplicitBucketBoundaries() + aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err() + if aggError != nil { + // if boundaries are invalid, ignore them + boundaries = nil + } + inst := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindHistogram, + Scope: p.scope, + } + measures, err := p.float64Resolver.HistogramAggregators(inst, boundaries) + return measures, errors.Join(aggError, err) +} + // lookup returns the resolved instrumentImpl. func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { aggs, err := p.aggs(kind, name, desc, u) return &float64Inst{measures: aggs}, err } +// lookupHistogram returns the resolved instrumentImpl. +func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) { + aggs, err := p.histogramAggs(name, cfg) + return &float64Inst{measures: aggs}, err +} + type int64ObservProvider struct{ *meter } func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) { diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index e2b2a5938fd..0e082a7be06 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -16,6 +16,7 @@ package metric import ( "context" + "errors" "fmt" "strings" "sync" @@ -550,6 +551,17 @@ func TestMeterCreatesInstrumentsValidations(t *testing.T) { wantErr: fmt.Errorf("%w: _: must start with a letter", ErrInstrumentName), }, + { + name: "Int64Histogram with invalid buckets", + + fn: func(t *testing.T, m metric.Meter) error { + i, err := m.Int64Histogram("histogram", metric.WithExplicitBucketBoundaries(-1, 1, -5)) + assert.NotNil(t, i) + return err + }, + + wantErr: errors.Join(fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, []float64{-1, 1, -5})), + }, { name: "Int64ObservableCounter with no validation issues", @@ -670,6 +682,17 @@ func TestMeterCreatesInstrumentsValidations(t *testing.T) { wantErr: fmt.Errorf("%w: _: must start with a letter", ErrInstrumentName), }, + { + name: "Float64Histogram with invalid buckets", + + fn: func(t *testing.T, m metric.Meter) error { + i, err := m.Float64Histogram("histogram", metric.WithExplicitBucketBoundaries(-1, 1, -5)) + assert.NotNil(t, i) + return err + }, + + wantErr: errors.Join(fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, []float64{-1, 1, -5})), + }, { name: "Float64ObservableCounter with no validation issues", @@ -1970,3 +1993,63 @@ func TestMalformedSelectors(t *testing.T) { }) } } + +func TestHistogramBucketPrecedenceOrdering(t *testing.T) { + defaultBuckets := []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000} + aggregationSelector := func(InstrumentKind) Aggregation { + return AggregationExplicitBucketHistogram{Boundaries: []float64{0, 1, 2, 3, 4, 5}} + } + for _, tt := range []struct { + desc string + reader Reader + views []View + histogramOpts []metric.Float64HistogramOption + expectedBucketBoundaries []float64 + }{ + { + desc: "default", + reader: NewManualReader(), + expectedBucketBoundaries: defaultBuckets, + }, + { + desc: "custom reader aggregation overrides default", + reader: NewManualReader(WithAggregationSelector(aggregationSelector)), + expectedBucketBoundaries: []float64{0, 1, 2, 3, 4, 5}, + }, + { + desc: "overridden by histogram option", + reader: NewManualReader(WithAggregationSelector(aggregationSelector)), + histogramOpts: []metric.Float64HistogramOption{ + metric.WithExplicitBucketBoundaries(0, 2, 4, 6, 8, 10), + }, + expectedBucketBoundaries: []float64{0, 2, 4, 6, 8, 10}, + }, + { + desc: "overridden by view", + reader: NewManualReader(WithAggregationSelector(aggregationSelector)), + histogramOpts: []metric.Float64HistogramOption{ + metric.WithExplicitBucketBoundaries(0, 2, 4, 6, 8, 10), + }, + views: []View{NewView(Instrument{Name: "*"}, Stream{ + Aggregation: AggregationExplicitBucketHistogram{Boundaries: []float64{0, 3, 6, 9, 12, 15}}, + })}, + expectedBucketBoundaries: []float64{0, 3, 6, 9, 12, 15}, + }, + } { + t.Run(tt.desc, func(t *testing.T) { + meter := NewMeterProvider(WithView(tt.views...), WithReader(tt.reader)).Meter("TestHistogramBucketPrecedenceOrdering") + sfHistogram, err := meter.Float64Histogram("sync.float64.histogram", tt.histogramOpts...) + require.NoError(t, err) + sfHistogram.Record(context.Background(), 1) + var rm metricdata.ResourceMetrics + err = tt.reader.Collect(context.Background(), &rm) + require.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + gotHist, ok := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[float64]) + require.True(t, ok) + require.Len(t, gotHist.DataPoints, 1) + assert.Equal(t, tt.expectedBucketBoundaries, gotHist.DataPoints[0].Bounds) + }) + } +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index c1597a75597..48abcc8a7f3 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -231,7 +231,7 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *ins // // If an instrument is determined to use a Drop aggregation, that instrument is // not inserted nor returned. -func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error) { +func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) ([]aggregate.Measure[N], error) { var ( matched bool measures []aggregate.Measure[N] @@ -245,8 +245,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error continue } matched = true - - in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream) + in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) if err != nil { errs.append(err) } @@ -271,7 +270,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error Description: inst.Description, Unit: inst.Unit, } - in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream) + in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) if err != nil { errs.append(err) } @@ -291,6 +290,31 @@ type aggVal[N int64 | float64] struct { Err error } +// readerDefaultAggregation returns the default aggregation for the instrument +// kind based on the reader's aggregation preferences. This is used unless the +// aggregation is overridden with a view. +func (i *inserter[N]) readerDefaultAggregation(kind InstrumentKind) Aggregation { + aggregation := i.pipeline.reader.aggregation(kind) + switch aggregation.(type) { + case nil, AggregationDefault: + // If the reader returns default or nil use the default selector. + aggregation = DefaultAggregationSelector(kind) + default: + // Deep copy and validate before using. + aggregation = aggregation.copy() + if err := aggregation.err(); err != nil { + orig := aggregation + aggregation = DefaultAggregationSelector(kind) + global.Error( + err, "using default aggregation instead", + "aggregation", orig, + "replacement", aggregation, + ) + } + } + return aggregation +} + // cachedAggregator returns the appropriate aggregate input and output // functions for an instrument configuration. If the exact instrument has been // created within the inst.Scope, those aggregate function instances will be @@ -305,29 +329,14 @@ type aggVal[N int64 | float64] struct { // // If the instrument defines an unknown or incompatible aggregation, an error // is returned. -func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) { +func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream, readerAggregation Aggregation) (meas aggregate.Measure[N], aggID uint64, err error) { switch stream.Aggregation.(type) { case nil: - // Undefined, nil, means to use the default from the reader. - stream.Aggregation = i.pipeline.reader.aggregation(kind) - switch stream.Aggregation.(type) { - case nil, AggregationDefault: - // If the reader returns default or nil use the default selector. - stream.Aggregation = DefaultAggregationSelector(kind) - default: - // Deep copy and validate before using. - stream.Aggregation = stream.Aggregation.copy() - if err := stream.Aggregation.err(); err != nil { - orig := stream.Aggregation - stream.Aggregation = DefaultAggregationSelector(kind) - global.Error( - err, "using default aggregation instead", - "aggregation", orig, - "replacement", stream.Aggregation, - ) - } - } + // The aggregation was not overridden with a view. Use the aggregation + // provided by the reader. + stream.Aggregation = readerAggregation case AggregationDefault: + // The view explicitly requested the default aggregation. stream.Aggregation = DefaultAggregationSelector(kind) } @@ -596,7 +605,29 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) errs := &multierror{} for _, i := range r.inserters { - in, err := i.Instrument(id) + in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind)) + if err != nil { + errs.append(err) + } + measures = append(measures, in...) + } + return measures, errs.errorOrNil() +} + +// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument +// defined by key. If boundaries were provided on instrument instantiation, those take precedence +// over boundaries provided by the reader. +func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) { + var measures []aggregate.Measure[N] + + errs := &multierror{} + for _, i := range r.inserters { + agg := i.readerDefaultAggregation(id.Kind) + if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 { + histAgg.Boundaries = boundaries + agg = histAgg + } + in, err := i.Instrument(id, agg) if err != nil { errs.append(err) } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index b0832a87f52..fe01d6971b3 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -351,7 +351,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { var c cache[string, instID] p := newPipeline(nil, tt.reader, tt.views) i := newInserter[N](p, &c) - input, err := i.Instrument(tt.inst) + readerAggregation := i.readerDefaultAggregation(tt.inst.Kind) + input, err := i.Instrument(tt.inst, readerAggregation) var comps []aggregate.ComputeAggregation for _, instSyncs := range p.aggregations { for _, i := range instSyncs { @@ -375,7 +376,8 @@ func testInvalidInstrumentShouldPanic[N int64 | float64]() { Name: "foo", Kind: InstrumentKind(255), } - _, _ = i.Instrument(inst) + readerAggregation := i.readerDefaultAggregation(inst.Kind) + _, _ = i.Instrument(inst, readerAggregation) } func TestInvalidInstrumentShouldPanic(t *testing.T) { @@ -460,6 +462,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { p := newPipelines(resource.Empty(), tt.readers, tt.views) testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount) testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount) + testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount) + testPipelineRegistryResolveFloatHistogramAggregators(t, p, tt.wantCount) }) } } @@ -484,6 +488,26 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want require.Len(t, aggs, wantCount) } +func testPipelineRegistryResolveIntHistogramAggregators(t *testing.T, p pipelines, wantCount int) { + inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} + var c cache[string, instID] + r := newResolver[int64](p, &c) + aggs, err := r.HistogramAggregators(inst, []float64{1, 2, 3}) + assert.NoError(t, err) + + require.Len(t, aggs, wantCount) +} + +func testPipelineRegistryResolveFloatHistogramAggregators(t *testing.T, p pipelines, wantCount int) { + inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} + var c cache[string, instID] + r := newResolver[float64](p, &c) + aggs, err := r.HistogramAggregators(inst, []float64{1, 2, 3}) + assert.NoError(t, err) + + require.Len(t, aggs, wantCount) +} + func TestPipelineRegistryResource(t *testing.T) { v := NewView(Instrument{Name: "bar"}, Stream{Name: "foo"}) readers := []Reader{NewManualReader()} @@ -513,6 +537,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { floatAggs, err := rf.Aggregators(inst) assert.Error(t, err) assert.Len(t, floatAggs, 0) + + intAggs, err = ri.HistogramAggregators(inst, []float64{1, 2, 3}) + assert.Error(t, err) + assert.Len(t, intAggs, 0) + + floatAggs, err = rf.HistogramAggregators(inst, []float64{1, 2, 3}) + assert.Error(t, err) + assert.Len(t, floatAggs, 0) } type logCounter struct { diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 1026fd268ff..f585c7a4743 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -146,7 +146,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { t.Run(test.name, func(t *testing.T) { var c cache[string, instID] i := newInserter[N](test.pipe, &c) - got, err := i.Instrument(inst) + readerAggregation := i.readerDefaultAggregation(inst.Kind) + got, err := i.Instrument(inst, readerAggregation) require.NoError(t, err) assert.Len(t, got, 1, "default view not applied") for _, in := range got { @@ -372,7 +373,8 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) { pipe := newPipeline(nil, NewManualReader(), nil) i := newInserter[int64](pipe, &vc) - _, origID, err := i.cachedAggregator(scope, kind, stream) + readerAggregation := i.readerDefaultAggregation(kind) + _, origID, err := i.cachedAggregator(scope, kind, stream, readerAggregation) require.NoError(t, err) require.Len(t, pipe.aggregations, 1) @@ -382,7 +384,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) { require.Equal(t, name, iSync[0].name) stream.Name = "RequestCount" - _, id, err := i.cachedAggregator(scope, kind, stream) + _, id, err := i.cachedAggregator(scope, kind, stream, readerAggregation) require.NoError(t, err) assert.Equal(t, origID, id, "multiple aggregators for equivalent name") From a96e4474824eeead364a99725a63e55f076a3340 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 26 Oct 2023 13:37:03 -0400 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Tyler Yahn --- sdk/metric/meter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index e60fdc70c53..7f51ec512ad 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -458,7 +458,7 @@ func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramC boundaries := cfg.ExplicitBucketBoundaries() aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err() if aggError != nil { - // if boundaries are invalid, ignore them + // If boundaries are invalid, ignore them. boundaries = nil } inst := Instrument{ @@ -502,7 +502,7 @@ func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64Histog boundaries := cfg.ExplicitBucketBoundaries() aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err() if aggError != nil { - // if boundaries are invalid, ignore them + // If boundaries are invalid, ignore them. boundaries = nil } inst := Instrument{ From 5d6f90fe859f08b023d8ad661ea2532f8a4d318e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 27 Oct 2023 08:58:50 +0200 Subject: [PATCH 3/3] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 533caacf176..284d2ea87c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `Version` function in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#4660) - Add `Version` function in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4660) - Add Summary, SummaryDataPoint, and QuantileValue to `go.opentelemetry.io/sdk/metric/metricdata`. (#4622) -- Add support for WithExplicitBucketBoundaries in `go.opentelemetry.io/otel/sdk/metric` (#4605) +- Add support for `WithExplicitBucketBoundaries` in `go.opentelemetry.io/otel/sdk/metric`. (#4605) ### Deprecated