Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use default view if instrument does not match any pipeline view #3237

Merged
merged 10 commits into from
Oct 7, 2022
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This addresses [GO-2022-0493](https://pkg.go.dev/vuln/GO-2022-0493). (#3235)
- Update histogram default bounds to match the requirements of the latest specification. (#3222)

### Fixed

- Use default view if instrument does not match any registered view of a reader. (#3224, #3237)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22

### Changed
Expand Down
4 changes: 0 additions & 4 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ func WithReader(r Reader, views ...view.View) Option {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.View)
}
if len(views) == 0 {
views = []view.View{{}}
}

cfg.readers[r] = views
return cfg
})
Expand Down
52 changes: 34 additions & 18 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,31 +188,23 @@ func newInserter[N int64 | float64](p *pipeline) *inserter[N] {
// Instrument inserts instrument inst with instUnit returning the Aggregators
// that need to be updated with measurments for that instrument.
func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
var matched bool
seen := map[instrumentID]struct{}{}
var aggs []internal.Aggregator[N]
errs := &multierror{wrapped: errCreatingAggregators}
for _, v := range i.pipeline.views {
inst, match := v.TransformInstrument(inst)
if !match {
continue
}
matched = true

id := instrumentID{
scope: inst.Scope,
name: inst.Name,
description: inst.Description,
}

if _, ok := seen[id]; ok || !match {
continue
}

if inst.Aggregation == nil {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
} else if _, ok := inst.Aggregation.(aggregation.Default); ok {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err)
errs.append(err)
if _, ok := seen[id]; ok {
continue
}

Expand All @@ -233,16 +225,40 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in
errs.append(err)
}
}
// TODO(#3224): handle when no views match. Default should be reader
// aggregation returned.

if !matched { // Apply implicit default view if no explicit matched.
a, err := i.aggregator(inst)
if err != nil {
errs.append(err)
}
if a != nil {
aggs = append(aggs, a)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a)
if err != nil {
errs.append(err)
}
}
}

return aggs, errs.errorOrNil()
}

// aggregator returns the Aggregator for an instrument configuration. If the
// instrument defines an unknown aggregation, an error is returned.
func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) {
// TODO (#3011): If filtering is done by the Aggregator it should be passed
// here.
switch inst.Aggregation.(type) {
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
return nil, fmt.Errorf(
"creating aggregator with instrumentKind: %d, aggregation %v: %w",
inst.Kind, inst.Aggregation, err,
)
}

var (
temporality = i.pipeline.reader.temporality(inst.Kind)
monotonic bool
Expand Down
61 changes: 61 additions & 0 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -211,3 +214,61 @@ func TestPipelineConcurrency(t *testing.T) {
}
wg.Wait()
}

func TestDefaultViewImplicit(t *testing.T) {
t.Run("Int64", testDefaultViewImplicit[int64]())
t.Run("Float64", testDefaultViewImplicit[float64]())
}

func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
inst := view.Instrument{
Scope: instrumentation.Scope{Name: "testing/lib"},
Name: "requests",
Description: "count of requests received",
Kind: view.SyncCounter,
Aggregation: aggregation.Sum{},
}
return func(t *testing.T) {
reader := NewManualReader()
v, err := view.New(view.MatchInstrumentName("foo"), view.WithRename("bar"))
require.NoError(t, err)

tests := []struct {
name string
pipe *pipeline
}{
{
name: "NoView",
pipe: newPipeline(nil, reader, nil),
},
{
name: "NoMatchingView",
pipe: newPipeline(nil, reader, []view.View{v}),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
i := newInserter[N](test.pipe)
got, err := i.Instrument(inst, unit.Dimensionless)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")

out, err := test.pipe.produce(context.Background())
require.NoError(t, err)
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
sm := out.ScopeMetrics[0]
require.Len(t, sm.Metrics, 1, "metrics not produced from default view")
metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: inst.Name,
Description: inst.Description,
Unit: unit.Dimensionless,
Data: metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())
})
}
}
}