Skip to content

Commit

Permalink
Use default view if inst matches no other
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Sep 27, 2022
1 parent d45bee9 commit 3ab3eed
Showing 1 changed file with 34 additions and 18 deletions.
52 changes: 34 additions & 18 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,31 +188,23 @@ func newInserter[N int64 | float64](p *pipeline) *inserter[N] {
// Instrument inserts instrument inst with instUnit returning the Aggregators
// that need to be updated with measurments for that instrument.
func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
var matched bool
seen := map[instrumentID]struct{}{}
var aggs []internal.Aggregator[N]
errs := &multierror{wrapped: errCreatingAggregators}
for _, v := range i.pipeline.views {
inst, match := v.TransformInstrument(inst)
if !match {
continue
}
matched = true

id := instrumentID{
scope: inst.Scope,
name: inst.Name,
description: inst.Description,
}

if _, ok := seen[id]; ok || !match {
continue
}

if inst.Aggregation == nil {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
} else if _, ok := inst.Aggregation.(aggregation.Default); ok {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err)
errs.append(err)
if _, ok := seen[id]; ok {
continue
}

Expand All @@ -233,16 +225,40 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in
errs.append(err)
}
}
// TODO(#3224): handle when no views match. Default should be reader
// aggregation returned.

if !matched { // Apply implicit default view if no explicit matched.
a, err := i.aggregator(inst)
if err != nil {
errs.append(err)
}
if a != nil {
aggs = append(aggs, a)
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a)
if err != nil {
errs.append(err)
}
}
}

return aggs, errs.errorOrNil()
}

// aggregator returns the Aggregator for an instrument configuration. If the
// instrument defines an unknown aggregation, an error is returned.
func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) {
// TODO (#3011): If filtering is done by the Aggregator it should be passed
// here.
switch inst.Aggregation.(type) {
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
return nil, fmt.Errorf(
"creating aggregator with instrumentKind: %d, aggregation %v: %w",
inst.Kind, inst.Aggregation, err,
)
}

var (
temporality = i.pipeline.reader.temporality(inst.Kind)
monotonic bool
Expand Down

0 comments on commit 3ab3eed

Please sign in to comment.