Skip to content

Commit

Permalink
move addCallback to inserter
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Dec 8, 2023
1 parent 6c4286a commit a918b25
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 16 deletions.
14 changes: 6 additions & 8 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,10 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
// It registers callbacks for each reader's pipeline.
func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) {
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for i, pipe := range m.pipes {
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
inserter := m.int64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
Expand All @@ -122,7 +121,7 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
pipe.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
Expand Down Expand Up @@ -224,11 +223,10 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
// It registers callbacks for each reader's pipeline.
func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) {
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for i, pipe := range m.pipes {
for _, insert := range m.float64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
inserter := m.float64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
Expand All @@ -238,7 +236,7 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
pipe.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
Expand Down
16 changes: 8 additions & 8 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) {
p.aggregations[scope] = append(p.aggregations[scope], iSync)
}

// addCallback registers a single instrument callback to be run when
// `produce()` is called.
func (p *pipeline) addCallback(cback func(context.Context) error) {
p.Lock()
defer p.Unlock()
p.callbacks = append(p.callbacks, cback)
}

type multiCallback func(context.Context) error

// addMultiCallback registers a multi-instrument callback to be run when
Expand Down Expand Up @@ -281,6 +273,14 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
return measures, errs.errorOrNil()
}

// addCallback registers a single instrument callback to be run when
// `produce()` is called.
func (i *inserter[N]) addCallback(cback func(context.Context) error) {
i.pipeline.Lock()
defer i.pipeline.Unlock()
i.pipeline.callbacks = append(i.pipeline.callbacks, cback)
}

var aggIDCount uint64

// aggVal is the cached value in an aggregators cache.
Expand Down

0 comments on commit a918b25

Please sign in to comment.