From fecb92e366d9f3703c099c9bfbb33335a9736bd5 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 31 Jan 2024 13:15:35 -0800 Subject: [PATCH] Add the experimental exemplar feature (#4871) * Add the experimental exemplar feature * Add exemplars to EXPERIMENTAL.md * Add changelog entry * Fix hist buckets > 1 detection * Collect instead of Flush res about to be deleted * Add e2e test * Do not pre-alloc ResourceMetrics This only has a single use. * Fix grammatical error in comment * Add test cases Default and invalid OTEL_METRICS_EXEMPLAR_FILTER. Test sampled and non-sampled context for trace_based. * Comment nCPU * Doc OTEL_METRICS_EXEMPLAR_FILTER --- CHANGELOG.md | 2 + sdk/metric/EXPERIMENTAL.md | 61 +++++++++ sdk/metric/benchmark_test.go | 89 +++++++++++++ sdk/metric/exemplar.go | 96 ++++++++++++++ sdk/metric/internal/aggregate/aggregate.go | 37 ++++-- .../internal/aggregate/aggregate_test.go | 16 ++- .../aggregate/exponential_histogram.go | 20 ++- .../aggregate/exponential_histogram_test.go | 8 +- sdk/metric/internal/aggregate/histogram.go | 25 +++- .../internal/aggregate/histogram_test.go | 14 +- sdk/metric/internal/aggregate/lastvalue.go | 26 +++- sdk/metric/internal/aggregate/sum.go | 63 ++++++--- sdk/metric/pipeline.go | 3 +- sdk/metric/pipeline_test.go | 121 ++++++++++++++++++ 14 files changed, 522 insertions(+), 59 deletions(-) create mode 100644 sdk/metric/exemplar.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e128b2e6eb..1437444568a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808) +- Experimental exemplar exporting is added to the metric SDK. + See [metric documentation](./sdk/metric/EXPERIMENTAL.md#exemplars) for more information about this feature and how to enable it. (#4871) ### Fixed diff --git a/sdk/metric/EXPERIMENTAL.md b/sdk/metric/EXPERIMENTAL.md index d2a5a0470b6..658d2027687 100644 --- a/sdk/metric/EXPERIMENTAL.md +++ b/sdk/metric/EXPERIMENTAL.md @@ -40,6 +40,67 @@ Disable the cardinality limit. unset OTEL_GO_X_CARDINALITY_LIMIT ``` +### Exemplars + +A sample of measurements made may be exported directly as a set of exemplars. + +This experimental feature can be enabled by setting the `OTEL_GO_X_EXEMPLAR` environment variable. +The value of must be the case-insensitive string of `"true"` to enable the feature. +All other values are ignored. + +Exemplar filters are a supported. +The exemplar filter applies to all measurements made. +They filter these measurements, only allowing certain measurements to be passed to the underlying exemplar reservoir. + +To change the exemplar filter from the default `"trace_based"` filter set the `OTEL_METRICS_EXEMPLAR_FILTER` environment variable. +The value must be the case-sensitive string defined by the [OpenTelemetry specification]. + +- `"always_on"`: allows all measurements +- `"always_off"`: denies all measurements +- `"trace_based"`: allows only sampled measurements + +All values other than these will result in the default, `"trace_based"`, exemplar filter being used. + +[OpenTelemetry specification]: https://github.com/open-telemetry/opentelemetry-specification/blob/a6ca2fd484c9e76fe1d8e1c79c99f08f4745b5ee/specification/configuration/sdk-environment-variables.md#exemplar + +#### Examples + +Enable exemplars to be exported. + +```console +export OTEL_GO_X_EXEMPLAR=true +``` + +Disable exemplars from being exported. + +```console +unset OTEL_GO_X_EXEMPLAR +``` + +Set the exemplar filter to allow all measurements. + +```console +export OTEL_METRICS_EXEMPLAR_FILTER=always_on +``` + +Set the exemplar filter to deny all measurements. + +```console +export OTEL_METRICS_EXEMPLAR_FILTER=always_off +``` + +Set the exemplar filter to only allow sampled measurements. + +```console +export OTEL_METRICS_EXEMPLAR_FILTER=trace_based +``` + +Revert to the default exemplar filter (`"trace_based"`) + +```console +unset OTEL_METRICS_EXEMPLAR_FILTER +``` + ## Compatibility and Stability Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../VERSIONING.md). diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 90f88088630..fc089687525 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -16,6 +16,8 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "fmt" + "runtime" "strconv" "testing" @@ -24,6 +26,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/trace" ) var viewBenchmarks = []struct { @@ -369,3 +372,89 @@ func benchCollectAttrs(setup func(attribute.Set) Reader) func(*testing.B) { b.Run("Attributes/10", run(setup(attribute.NewSet(attrs...)))) } } + +func BenchmarkExemplars(b *testing.B) { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + SpanID: trace.SpanID{0o1}, + TraceID: trace.TraceID{0o1}, + TraceFlags: trace.FlagsSampled, + }) + ctx := trace.ContextWithSpanContext(context.Background(), sc) + + attr := attribute.NewSet( + attribute.String("user", "Alice"), + attribute.Bool("admin", true), + ) + + setup := func(name string) (metric.Meter, Reader) { + r := NewManualReader() + v := NewView(Instrument{Name: "*"}, Stream{ + AttributeFilter: func(kv attribute.KeyValue) bool { + return kv.Key == attribute.Key("user") + }, + }) + mp := NewMeterProvider(WithReader(r), WithView(v)) + return mp.Meter(name), r + } + nCPU := runtime.NumCPU() // Size of the fixed reservoir used. + + b.Setenv("OTEL_GO_X_EXEMPLAR", "true") + + name := fmt.Sprintf("Int64Counter/%d", nCPU) + b.Run(name, func(b *testing.B) { + m, r := setup("Int64Counter") + i, err := m.Int64Counter("int64-counter") + assert.NoError(b, err) + + rm := newRM(metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Exemplars: make([]metricdata.Exemplar[int64], 0, nCPU)}, + }, + }) + e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Exemplars) + + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for j := 0; j < 2*nCPU; j++ { + i.Add(ctx, 1, metric.WithAttributeSet(attr)) + } + + _ = r.Collect(ctx, rm) + assert.Len(b, *e, nCPU) + } + }) + + name = fmt.Sprintf("Int64Histogram/%d", nCPU) + b.Run(name, func(b *testing.B) { + m, r := setup("Int64Counter") + i, err := m.Int64Histogram("int64-histogram") + assert.NoError(b, err) + + rm := newRM(metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + {Exemplars: make([]metricdata.Exemplar[int64], 0, 1)}, + }, + }) + e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64]).DataPoints[0].Exemplars) + + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for j := 0; j < 2*nCPU; j++ { + i.Record(ctx, 1, metric.WithAttributeSet(attr)) + } + + _ = r.Collect(ctx, rm) + assert.Len(b, *e, 1) + } + }) +} + +func newRM(a metricdata.Aggregation) *metricdata.ResourceMetrics { + return &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + {Metrics: []metricdata.Metrics{{Data: a}}}, + }, + } +} diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go new file mode 100644 index 00000000000..77579acdb40 --- /dev/null +++ b/sdk/metric/exemplar.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "os" + "runtime" + + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + "go.opentelemetry.io/otel/sdk/metric/internal/x" +) + +// reservoirFunc returns the appropriately configured exemplar reservoir +// creation func based on the passed InstrumentKind and user defined +// environment variables. +// +// Note: This will only return non-nil values when the experimental exemplar +// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable +// is not set to always_off. +func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] { + if !x.Exemplars.Enabled() { + return nil + } + + // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults + resF := func() func() exemplar.Reservoir[N] { + // Explicit bucket histogram aggregation with more than 1 bucket will + // use AlignedHistogramBucketExemplarReservoir. + a, ok := agg.(AggregationExplicitBucketHistogram) + if ok && len(a.Boundaries) > 0 { + cp := make([]float64, len(a.Boundaries)) + copy(cp, a.Boundaries) + return func() exemplar.Reservoir[N] { + bounds := cp + return exemplar.Histogram[N](bounds) + } + } + + var n int + if a, ok := agg.(AggregationBase2ExponentialHistogram); ok { + // Base2 Exponential Histogram Aggregation SHOULD use a + // SimpleFixedSizeExemplarReservoir with a reservoir equal to the + // smaller of the maximum number of buckets configured on the + // aggregation or twenty (e.g. min(20, max_buckets)). + n = int(a.MaxSize) + if n > 20 { + n = 20 + } + } else { + // https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir + // This Exemplar reservoir MAY take a configuration parameter for + // the size of the reservoir. If no size configuration is + // provided, the default size MAY be the number of possible + // concurrent threads (e.g. numer of CPUs) to help reduce + // contention. Otherwise, a default size of 1 SHOULD be used. + n = runtime.NumCPU() + if n < 1 { + // Should never be the case, but be defensive. + n = 1 + } + } + + return func() exemplar.Reservoir[N] { + return exemplar.FixedSize[N](n) + } + } + + // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar + const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" + + switch os.Getenv(filterEnvKey) { + case "always_on": + return resF() + case "always_off": + return exemplar.Drop[N] + case "trace_based": + fallthrough + default: + newR := resF() + return func() exemplar.Reservoir[N] { + return exemplar.SampledFilter(newR()) + } + } +} diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index c61f8513789..4060a2f76d3 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -19,6 +19,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -44,6 +45,12 @@ type Builder[N int64 | float64] struct { // Filter is the attribute filter the aggregate function will use on the // input of measurements. Filter attribute.Filter + // ReservoirFunc is the factory function used by aggregate functions to + // create new exemplar reservoirs for a new seen attribute set. + // + // If this is not provided a default factory function that returns an + // exemplar.Drop reservoir will be used. + ReservoirFunc func() exemplar.Reservoir[N] // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" @@ -54,15 +61,27 @@ type Builder[N int64 | float64] struct { AggregationLimit int } -func (b Builder[N]) filter(f Measure[N]) Measure[N] { +func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] { + if b.ReservoirFunc != nil { + return b.ReservoirFunc + } + + return exemplar.Drop[N] +} + +type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) + +func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] { if b.Filter != nil { fltr := b.Filter // Copy to make it immutable after assignment. return func(ctx context.Context, n N, a attribute.Set) { - fAttr, _ := a.Filter(fltr) - f(ctx, n, fAttr) + fAttr, dropped := a.Filter(fltr) + f(ctx, n, fAttr, dropped) } } - return f + return func(ctx context.Context, n N, a attribute.Set) { + f(ctx, n, a, nil) + } } // LastValue returns a last-value aggregate function input and output. @@ -71,7 +90,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] { func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // Delta temporality is the only temporality that makes semantic sense for // a last-value aggregate. - lv := newLastValue[N](b.AggregationLimit) + lv := newLastValue[N](b.AggregationLimit, b.resFunc()) return b.filter(lv.measure), func(dest *metricdata.Aggregation) int { // Ignore if dest is not a metricdata.Gauge. The chance for memory @@ -87,7 +106,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // PrecomputedSum returns a sum aggregate function input and output. The // arguments passed to the input are expected to be the precomputed sum values. func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) { - s := newPrecomputedSum[N](monotonic, b.AggregationLimit) + s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta @@ -98,7 +117,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati // Sum returns a sum aggregate function input and output. func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { - s := newSum[N](monotonic, b.AggregationLimit) + s := newSum[N](monotonic, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta @@ -110,7 +129,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { // ExplicitBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { - h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit) + h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta @@ -122,7 +141,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu // ExponentialBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { - h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit) + h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 384ca51c8cf..be568f14c04 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) @@ -59,6 +60,10 @@ var ( } ) +func dropExemplars[N int64 | float64]() exemplar.Reservoir[N] { + return exemplar.Drop[N]() +} + func TestBuilderFilter(t *testing.T) { t.Run("Int64", testBuilderFilter[int64]()) t.Run("Float64", testBuilderFilter[float64]()) @@ -69,20 +74,21 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) { t.Helper() value, attr := N(1), alice - run := func(b Builder[N], wantA attribute.Set) func(*testing.T) { + run := func(b Builder[N], wantF attribute.Set, wantD []attribute.KeyValue) func(*testing.T) { return func(t *testing.T) { t.Helper() - meas := b.filter(func(_ context.Context, v N, a attribute.Set) { + meas := b.filter(func(_ context.Context, v N, f attribute.Set, d []attribute.KeyValue) { assert.Equal(t, value, v, "measured incorrect value") - assert.Equal(t, wantA, a, "measured incorrect attributes") + assert.Equal(t, wantF, f, "measured incorrect filtered attributes") + assert.ElementsMatch(t, wantD, d, "measured incorrect dropped attributes") }) meas(context.Background(), value, attr) } } - t.Run("NoFilter", run(Builder[N]{}, attr)) - t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice)) + t.Run("NoFilter", run(Builder[N]{}, attr, nil)) + t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue})) } } diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index e9c25980aa2..660b86071ae 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -40,6 +41,8 @@ const ( // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { + res exemplar.Reservoir[N] + count uint64 min N max N @@ -288,13 +291,14 @@ func (b *expoBuckets) downscale(delta int) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, maxSize: int(maxSize), maxScale: int(maxScale), + newRes: r, limit: newLimiter[*expoHistogramDataPoint[N]](limit), values: make(map[attribute.Set]*expoHistogramDataPoint[N]), @@ -310,6 +314,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int + newRes func() exemplar.Reservoir[N] limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Set]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -317,22 +322,27 @@ type expoHistogram[N int64 | float64] struct { start time.Time } -func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Set) { +func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { // Ignore NaN and infinity. if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) { return } + t := now() + e.valuesMu.Lock() defer e.valuesMu.Unlock() - attr = e.limit.Attributes(attr, e.values) + attr := e.limit.Attributes(fltrAttr, e.values) v, ok := e.values[attr] if !ok { v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) + v.res = e.newRes() + e.values[attr] = v } v.record(value) + v.res.Offer(ctx, t, value, droppedAttr) } func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { @@ -374,6 +384,8 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(b.max) } + b.res.Collect(&hDPts[i].Exemplars) + delete(e.values, a) i++ } @@ -422,6 +434,8 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(b.max) } + b.res.Collect(&hDPts[i].Exemplars) + i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index 40af402636c..3fed4897ee2 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -183,9 +183,9 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[int64](4, 20, false, false, 0) + h := newExponentialHistogram[int64](4, 20, false, false, 0, dropExemplars[int64]) for _, v := range tt.values { - h.measure(context.Background(), v, alice) + h.measure(context.Background(), v, alice, nil) } dp := h.values[alice] @@ -225,9 +225,9 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[float64](4, 20, false, false, 0) + h := newExponentialHistogram[float64](4, 20, false, false, 0, dropExemplars[float64]) for _, v := range tt.values { - h.measure(context.Background(), v, alice) + h.measure(context.Background(), v, alice, nil) } dp := h.values[alice] diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 5d886360bae..a9a4706bf00 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -21,10 +21,13 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type buckets[N int64 | float64] struct { + res exemplar.Reservoir[N] + counts []uint64 count uint64 total N @@ -54,12 +57,13 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 + newRes func() exemplar.Reservoir[N] limit limiter[*buckets[N]] values map[attribute.Set]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -70,6 +74,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) * return &histValues[N]{ noSum: noSum, bounds: b, + newRes: r, limit: newLimiter[*buckets[N]](limit), values: make(map[attribute.Set]*buckets[N]), } @@ -77,7 +82,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) * // Aggregate records the measurement value, scoped by attr, and aggregates it // into a histogram. -func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) { +func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { // This search will return an index in the range [0, len(s.bounds)], where // it will return len(s.bounds) if value is greater than the last element // of s.bounds. This aligns with the buckets in that the length of buckets @@ -85,10 +90,12 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) // (s.bounds[len(s.bounds)-1], +∞). idx := sort.SearchFloat64s(s.bounds, float64(value)) + t := now() + s.valuesMu.Lock() defer s.valuesMu.Unlock() - attr = s.limit.Attributes(attr, s.values) + attr := s.limit.Attributes(fltrAttr, s.values) b, ok := s.values[attr] if !ok { // N+1 buckets. For example: @@ -99,6 +106,8 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) b = newBuckets[N](len(s.bounds) + 1) + b.res = s.newRes() + // Ensure min and max are recorded values (not zero), for new buckets. b.min, b.max = value, value s.values[attr] = b @@ -107,13 +116,14 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) if !s.noSum { b.sum(value) } + b.res.Offer(ctx, t, value, droppedAttr) } // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histogram[N] { return &histogram[N]{ - histValues: newHistValues[N](boundaries, noSum, limit), + histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, start: now(), } @@ -164,6 +174,8 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(b.max) } + b.res.Collect(&hDPts[i].Exemplars) + // Unused attribute sets do not report. delete(s.values, a) i++ @@ -220,6 +232,9 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { hDPts[i].Min = metricdata.NewExtrema(b.min) hDPts[i].Max = metricdata.NewExtrema(b.max) } + + b.res.Collect(&hDPts[i].Exemplars) + i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index e51e9fc8f29..bd88b083492 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -313,13 +313,13 @@ func TestHistogramImmutableBounds(t *testing.T) { cpB := make([]float64, len(b)) copy(cpB, b) - h := newHistogram[int64](b, false, false, 0) + h := newHistogram[int64](b, false, false, 0, dropExemplars[int64]) require.Equal(t, cpB, h.bounds) b[0] = 10 assert.Equal(t, cpB, h.bounds, "modifying the bounds argument should not change the bounds") - h.measure(context.Background(), 5, alice) + h.measure(context.Background(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} h.cumulative(&data) @@ -329,8 +329,8 @@ func TestHistogramImmutableBounds(t *testing.T) { } func TestCumulativeHistogramImutableCounts(t *testing.T) { - h := newHistogram[int64](bounds, noMinMax, false, 0) - h.measure(context.Background(), 5, alice) + h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) + h.measure(context.Background(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} h.cumulative(&data) @@ -347,13 +347,13 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) { func TestDeltaHistogramReset(t *testing.T) { t.Cleanup(mockTime(now)) - h := newHistogram[int64](bounds, noMinMax, false, 0) + h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) var data metricdata.Aggregation = metricdata.Histogram[int64]{} require.Equal(t, 0, h.delta(&data)) require.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0) - h.measure(context.Background(), 1, alice) + h.measure(context.Background(), 1, alice, nil) expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)} @@ -366,7 +366,7 @@ func TestDeltaHistogramReset(t *testing.T) { assert.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0) // Aggregating another set should not affect the original (alice). - h.measure(context.Background(), 1, bob) + h.measure(context.Background(), 1, bob, nil) expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)} h.delta(&data) metricdatatest.AssertAggregationsEqual(t, expect, data) diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index b79e80a0c8d..5699e728f1f 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -20,6 +20,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -27,10 +28,12 @@ import ( type datapoint[N int64 | float64] struct { timestamp time.Time value N + res exemplar.Reservoir[N] } -func newLastValue[N int64 | float64](limit int) *lastValue[N] { +func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *lastValue[N] { return &lastValue[N]{ + newRes: r, limit: newLimiter[datapoint[N]](limit), values: make(map[attribute.Set]datapoint[N]), } @@ -40,16 +43,28 @@ func newLastValue[N int64 | float64](limit int) *lastValue[N] { type lastValue[N int64 | float64] struct { sync.Mutex + newRes func() exemplar.Reservoir[N] limit limiter[datapoint[N]] values map[attribute.Set]datapoint[N] } -func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) { - d := datapoint[N]{timestamp: now(), value: value} +func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { + t := now() + s.Lock() - attr = s.limit.Attributes(attr, s.values) + defer s.Unlock() + + attr := s.limit.Attributes(fltrAttr, s.values) + d, ok := s.values[attr] + if !ok { + d.res = s.newRes() + } + + d.timestamp = t + d.value = value + d.res.Offer(ctx, t, value, droppedAttr) + s.values[attr] = d - s.Unlock() } func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { @@ -66,6 +81,7 @@ func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { // ignored. (*dest)[i].Time = v.timestamp (*dest)[i].Value = v.value + v.res.Collect(&(*dest)[i].Exemplars) // Do not report stale values. delete(s.values, a) i++ diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index a0d26e1ddb9..02de2483f3b 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -20,36 +20,55 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +type sumValue[N int64 | float64] struct { + n N + res exemplar.Reservoir[N] +} + // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - limit limiter[N] - values map[attribute.Set]N + newRes func() exemplar.Reservoir[N] + limit limiter[sumValue[N]] + values map[attribute.Set]sumValue[N] } -func newValueMap[N int64 | float64](limit int) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *valueMap[N] { return &valueMap[N]{ - limit: newLimiter[N](limit), - values: make(map[attribute.Set]N), + newRes: r, + limit: newLimiter[sumValue[N]](limit), + values: make(map[attribute.Set]sumValue[N]), } } -func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { +func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { + t := now() + s.Lock() - attr = s.limit.Attributes(attr, s.values) - s.values[attr] += value - s.Unlock() + defer s.Unlock() + + attr := s.limit.Attributes(fltrAttr, s.values) + v, ok := s.values[attr] + if !ok { + v.res = s.newRes() + } + + v.n += value + v.res.Offer(ctx, t, value, droppedAttr) + + s.values[attr] = v } // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *sum[N] { return &sum[N]{ - valueMap: newValueMap[N](limit), + valueMap: newValueMap[N](limit, r), monotonic: monotonic, start: now(), } @@ -79,11 +98,12 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { + for attr, val := range s.values { dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value + dPts[i].Value = val.n + val.res.Collect(&dPts[i].Exemplars) // Do not report stale values. delete(s.values, attr) i++ @@ -117,7 +137,8 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value + dPts[i].Value = value.n + value.res.Collect(&dPts[i].Exemplars) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -134,9 +155,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observatrions as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ - valueMap: newValueMap[N](limit), + valueMap: newValueMap[N](limit, r), monotonic: monotonic, start: now(), } @@ -170,14 +191,15 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { var i int for attr, value := range s.values { - delta := value - s.reported[attr] + delta := value.n - s.reported[attr] dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = delta + value.res.Collect(&dPts[i].Exemplars) - newReported[attr] = value + newReported[attr] = value.n // Unused attribute sets do not report. delete(s.values, attr) i++ @@ -209,11 +231,12 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { + for attr, val := range s.values { dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value + dPts[i].Value = val.n + val.res.Collect(&dPts[i].Exemplars) // Unused attribute sets do not report. delete(s.values, attr) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 47d9fe07ada..da39ab961c1 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -359,7 +359,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum normID := id.normalize() cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ - Temporality: i.pipeline.reader.temporality(kind), + Temporality: i.pipeline.reader.temporality(kind), + ReservoirFunc: reservoirFunc[N](stream.Aggregation), } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index f585c7a4743..d52b9a7fa2c 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -19,6 +19,7 @@ import ( "fmt" "log" "os" + "runtime" "strings" "sync" "testing" @@ -31,10 +32,12 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/trace" ) func testSumAggregateOutput(dest *metricdata.Aggregation) int { @@ -394,3 +397,121 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) { require.Len(t, iSync, 1, "registered instrumentSync changed") assert.Equal(t, name, iSync[0].name, "stream name changed") } + +func TestExemplars(t *testing.T) { + nCPU := runtime.NumCPU() + setup := func(name string) (metric.Meter, Reader) { + r := NewManualReader() + v := NewView(Instrument{Name: "int64-expo-histogram"}, Stream{ + Aggregation: AggregationBase2ExponentialHistogram{ + MaxSize: 160, // > 20, reservoir size should default to 20. + MaxScale: 20, + }, + }) + return NewMeterProvider(WithReader(r), WithView(v)).Meter(name), r + } + + measure := func(ctx context.Context, m metric.Meter) { + i, err := m.Int64Counter("int64-counter") + require.NoError(t, err) + + h, err := m.Int64Histogram("int64-histogram") + require.NoError(t, err) + + e, err := m.Int64Histogram("int64-expo-histogram") + require.NoError(t, err) + + for j := 0; j < 20*nCPU; j++ { // will be >= 20 and > nCPU + i.Add(ctx, 1) + h.Record(ctx, 1) + e.Record(ctx, 1) + } + } + + check := func(t *testing.T, r Reader, nSum, nHist, nExpo int) { + t.Helper() + + rm := new(metricdata.ResourceMetrics) + require.NoError(t, r.Collect(context.Background(), rm)) + + require.Len(t, rm.ScopeMetrics, 1, "ScopeMetrics") + sm := rm.ScopeMetrics[0] + require.Len(t, sm.Metrics, 3, "Metrics") + + require.IsType(t, metricdata.Sum[int64]{}, sm.Metrics[0].Data, sm.Metrics[0].Name) + sum := sm.Metrics[0].Data.(metricdata.Sum[int64]) + assert.Len(t, sum.DataPoints[0].Exemplars, nSum) + + require.IsType(t, metricdata.Histogram[int64]{}, sm.Metrics[1].Data, sm.Metrics[1].Name) + hist := sm.Metrics[1].Data.(metricdata.Histogram[int64]) + assert.Len(t, hist.DataPoints[0].Exemplars, nHist) + + require.IsType(t, metricdata.ExponentialHistogram[int64]{}, sm.Metrics[2].Data, sm.Metrics[2].Name) + expo := sm.Metrics[2].Data.(metricdata.ExponentialHistogram[int64]) + assert.Len(t, expo.DataPoints[0].Exemplars, nExpo) + } + + ctx := context.Background() + sc := trace.NewSpanContext(trace.SpanContextConfig{ + SpanID: trace.SpanID{0o1}, + TraceID: trace.TraceID{0o1}, + TraceFlags: trace.FlagsSampled, + }) + sampled := trace.ContextWithSpanContext(context.Background(), sc) + + t.Run("OTEL_GO_X_EXEMPLAR=true", func(t *testing.T) { + t.Setenv("OTEL_GO_X_EXEMPLAR", "true") + + t.Run("Default", func(t *testing.T) { + m, r := setup("default") + measure(ctx, m) + check(t, r, 0, 0, 0) + + measure(sampled, m) + check(t, r, nCPU, 1, 20) + }) + + t.Run("Invalid", func(t *testing.T) { + t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "unrecognized") + m, r := setup("default") + measure(ctx, m) + check(t, r, 0, 0, 0) + + measure(sampled, m) + check(t, r, nCPU, 1, 20) + }) + + t.Run("always_on", func(t *testing.T) { + t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_on") + m, r := setup("always_on") + measure(ctx, m) + check(t, r, nCPU, 1, 20) + }) + + t.Run("always_off", func(t *testing.T) { + t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_off") + m, r := setup("always_off") + measure(ctx, m) + check(t, r, 0, 0, 0) + }) + + t.Run("trace_based", func(t *testing.T) { + t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "trace_based") + m, r := setup("trace_based") + measure(ctx, m) + check(t, r, 0, 0, 0) + + measure(sampled, m) + check(t, r, nCPU, 1, 20) + }) + }) + + t.Run("OTEL_GO_X_EXEMPLAR=false", func(t *testing.T) { + t.Setenv("OTEL_GO_X_EXEMPLAR", "false") + + t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_on") + m, r := setup("always_on") + measure(ctx, m) + check(t, r, 0, 0, 0) + }) +}