Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the experimental exemplar feature #4871

Merged
merged 13 commits into from
Jan 31, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808)
- Experimental exemplar exporting is added to the metric SDK.
See [metric documentation](./sdk/metric/EXPERIMENTAL.md#exemplars) for more information about this feature and how to enable it. (#4871)

### Fixed

Expand Down
61 changes: 61 additions & 0 deletions sdk/metric/EXPERIMENTAL.md
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,67 @@ Disable the cardinality limit.
unset OTEL_GO_X_CARDINALITY_LIMIT
```

### Exemplars
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

A sample of measurements made may be exported directly as a set of exemplars.

This experimental feature can be enabled by setting the `OTEL_GO_X_EXEMPLAR` environment variable.
The value of must be the case-insensitive string of `"true"` to enable the feature.
All other values are ignored.

Exemplar filters are a supported.
The exemplar filter applies to all measurements made.
They filter these measurements, only allowing certain measurements to be passed to the underlying exemplar reservoir.

To change the exemplar filter from the default `"trace_based"` filter set the `OTEL_METRICS_EXEMPLAR_FILTER` environment variable.
The value must be the case-sensitive string defined by the [OpenTelemetry specification].

- `"always_on"`: allows all measurements
- `"always_off"`: denies all measurements
- `"trace_based"`: allows only sampled measurements

All values other than these will result in the default, `"trace_based"`, exemplar filter being used.

[OpenTelemetry specification]: https://github.com/open-telemetry/opentelemetry-specification/blob/a6ca2fd484c9e76fe1d8e1c79c99f08f4745b5ee/specification/configuration/sdk-environment-variables.md#exemplar

#### Examples

Enable exemplars to be exported.

```console
export OTEL_GO_X_EXEMPLAR=true
```

Disable exemplars from being exported.

```console
unset OTEL_GO_X_EXEMPLAR
```

Set the exemplar filter to allow all measurements.

```console
export OTEL_METRICS_EXEMPLAR_FILTER=always_on
```

Set the exemplar filter to deny all measurements.

```console
export OTEL_METRICS_EXEMPLAR_FILTER=always_off
```

Set the exemplar filter to only allow sampled measurements.

```console
export OTEL_METRICS_EXEMPLAR_FILTER=trace_based
```

Revert to the default exemplar filter (`"trace_based"`)

```console
unset OTEL_METRICS_EXEMPLAR_FILTER
```

## Compatibility and Stability

Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../VERSIONING.md).
Expand Down
89 changes: 89 additions & 0 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"runtime"
"strconv"
"testing"

Expand All @@ -24,6 +26,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/trace"
)

var viewBenchmarks = []struct {
Expand Down Expand Up @@ -369,3 +372,89 @@ func benchCollectAttrs(setup func(attribute.Set) Reader) func(*testing.B) {
b.Run("Attributes/10", run(setup(attribute.NewSet(attrs...))))
}
}

func BenchmarkExemplars(b *testing.B) {
sc := trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{0o1},
TraceID: trace.TraceID{0o1},
pellared marked this conversation as resolved.
Show resolved Hide resolved
TraceFlags: trace.FlagsSampled,
})
ctx := trace.ContextWithSpanContext(context.Background(), sc)

attr := attribute.NewSet(
attribute.String("user", "Alice"),
attribute.Bool("admin", true),
)

setup := func(name string) (metric.Meter, Reader) {
r := NewManualReader()
v := NewView(Instrument{Name: "*"}, Stream{
AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("user")
},
})
mp := NewMeterProvider(WithReader(r), WithView(v))
return mp.Meter(name), r
}
nCPU := runtime.NumCPU() // Size of the fixed reservoir used.

b.Setenv("OTEL_GO_X_EXEMPLAR", "true")

name := fmt.Sprintf("Int64Counter/%d", nCPU)
pellared marked this conversation as resolved.
Show resolved Hide resolved
b.Run(name, func(b *testing.B) {
m, r := setup("Int64Counter")
i, err := m.Int64Counter("int64-counter")
assert.NoError(b, err)

rm := newRM(metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Exemplars: make([]metricdata.Exemplar[int64], 0, nCPU)},
},
})
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Exemplars)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for j := 0; j < 2*nCPU; j++ {
i.Add(ctx, 1, metric.WithAttributeSet(attr))
}

_ = r.Collect(ctx, rm)
assert.Len(b, *e, nCPU)
}
})

name = fmt.Sprintf("Int64Histogram/%d", nCPU)
b.Run(name, func(b *testing.B) {
m, r := setup("Int64Counter")
i, err := m.Int64Histogram("int64-histogram")
assert.NoError(b, err)

rm := newRM(metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{Exemplars: make([]metricdata.Exemplar[int64], 0, 1)},
},
})
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64]).DataPoints[0].Exemplars)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for j := 0; j < 2*nCPU; j++ {
i.Record(ctx, 1, metric.WithAttributeSet(attr))
}

_ = r.Collect(ctx, rm)
assert.Len(b, *e, 1)
}
})
}

func newRM(a metricdata.Aggregation) *metricdata.ResourceMetrics {
return &metricdata.ResourceMetrics{
ScopeMetrics: []metricdata.ScopeMetrics{
{Metrics: []metricdata.Metrics{{Data: a}}},
},
}
}
96 changes: 96 additions & 0 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"os"
"runtime"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
)

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and user defined
// environment variables.
//
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] {
if !x.Exemplars.Enabled() {
return nil
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir[N] {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := make([]float64, len(a.Boundaries))
copy(cp, a.Boundaries)
return func() exemplar.Reservoir[N] {
bounds := cp
return exemplar.Histogram[N](bounds)
}
}

var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. numer of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}

Check warning on line 72 in sdk/metric/exemplar.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/exemplar.go#L70-L72

Added lines #L70 - L72 were not covered by tests
}

return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](n)
}
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

switch os.Getenv(filterEnvKey) {
case "always_on":
return resF()
case "always_off":
return exemplar.Drop[N]
case "trace_based":
fallthrough
default:
newR := resF()
return func() exemplar.Reservoir[N] {
return exemplar.SampledFilter(newR())
}
}
}
37 changes: 28 additions & 9 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -44,6 +45,12 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// ReservoirFunc is the factory function used by aggregate functions to
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -54,15 +61,27 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

return exemplar.Drop[N]
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)

func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
f(ctx, n, fAttr)
fAttr, dropped := a.Filter(fltr)
f(ctx, n, fAttr, dropped)
}
}
return f
return func(ctx context.Context, n N, a attribute.Set) {
f(ctx, n, a, nil)
}
}

// LastValue returns a last-value aggregate function input and output.
Expand All @@ -71,7 +90,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N](b.AggregationLimit)
lv := newLastValue[N](b.AggregationLimit, b.resFunc())

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
Expand All @@ -87,7 +106,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newPrecomputedSum[N](monotonic, b.AggregationLimit)
s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -98,7 +117,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic, b.AggregationLimit)
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -110,7 +129,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit)
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand All @@ -122,7 +141,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit)
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand Down
Loading