Skip to content

Commit

Permalink
add Metric Producer as a new interface, and scope to only scope metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Dec 7, 2022
1 parent 9c61547 commit a88a52e
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 22 deletions.
50 changes: 39 additions & 11 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
// manualReader is a simple Reader that allows an application to
// read metrics on demand.
type manualReader struct {
producer atomic.Value
sdkProducer atomic.Value
shutdownOnce sync.Once

mu sync.Mutex
externalProducers atomic.Value

temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
}
Expand All @@ -41,22 +44,36 @@ var _ = map[Reader]struct{}{&manualReader{}: {}}
// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader(opts ...ManualReaderOption) Reader {
cfg := newManualReaderConfig(opts)
return &manualReader{
r := &manualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
r.externalProducers.Store([]Producer{})
return r
}

// register stores the Producer which enables the caller to read
// metrics on demand.
func (mr *manualReader) register(p producer) {
// register stores the meterProviderProducer which enables the caller
// to read metrics from the SDK on demand.
func (mr *manualReader) register(p meterProviderProducer) {
// Only register once. If producer is already set, do nothing.
if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
global.Error(errDuplicateRegister, msg)
}
}

// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
func (mr *manualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
currentProducers := mr.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
mr.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
Expand All @@ -77,18 +94,18 @@ func (mr *manualReader) Shutdown(context.Context) error {
err := ErrReaderShutdown
mr.shutdownOnce.Do(func() {
// Any future call to Collect will now return ErrReaderShutdown.
mr.producer.Store(produceHolder{
mr.sdkProducer.Store(produceHolder{
produce: shutdownProducer{}.produce,
})
err = nil
})
return err
}

// Collect gathers all metrics from the SDK, calling any callbacks necessary.
// Collect will return an error if called after shutdown.
// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
p := mr.producer.Load()
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
}
Expand All @@ -103,7 +120,18 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
return metricdata.ResourceMetrics{}, err
}

return ph.produce(ctx)
rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, nil
}

// manualReaderConfig contains configuration options for a ManualReader.
Expand Down
44 changes: 36 additions & 8 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
cancel: cancel,
done: make(chan struct{}),
}
r.externalProducers.Store([]Producer{})

go func() {
defer func() { close(r.done) }()
Expand All @@ -126,7 +127,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
// periodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type periodicReader struct {
producer atomic.Value
sdkProducer atomic.Value

mu sync.Mutex
externalProducers atomic.Value

timeout time.Duration
exporter Exporter
Expand Down Expand Up @@ -166,14 +170,25 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
}

// register registers p as the producer of this reader.
func (r *periodicReader) register(p producer) {
func (r *periodicReader) register(p meterProviderProducer) {
// Only register once. If producer is already set, do nothing.
if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
}
}

// RegisterProducer registers p as an external Producer of this reader.
func (r *periodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
currentProducers := r.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
r.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
Expand All @@ -195,12 +210,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
// the SDK and other Producers. The returned metric data is not exported
// to the configured exporter, it is left to the caller to handle that if
// desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collect(ctx, r.producer.Load())
return r.collect(ctx, r.sdkProducer.Load())
}

// collect unwraps p as a produceHolder and returns its produce results.
Expand All @@ -218,7 +234,19 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
}
return ph.produce(ctx)

rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, nil
}

// export exports metric data m using r's exporter.
Expand Down Expand Up @@ -259,7 +287,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
<-r.done

// Any future call to Collect will now return ErrReaderShutdown.
ph := r.producer.Swap(produceHolder{
ph := r.sdkProducer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
})

Expand Down
19 changes: 16 additions & 3 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ type Reader interface {
// register registers a Reader with a MeterProvider.
// The producer argument allows the Reader to signal the sdk to collect
// and send aggregated metric measurements.
register(producer)
register(meterProviderProducer)

// RegisterProducer registers a Reader with an external Producer.
// The Producer argument allows the Reader to signal the Producer to
// collect and send aggregated metric measurements.
RegisterProducer(Producer)

// temporality reports the Temporality for the instrument kind provided.
temporality(InstrumentKind) metricdata.Temporality
Expand Down Expand Up @@ -84,14 +89,22 @@ type Reader interface {
Shutdown(context.Context) error
}

// producer produces metrics for a Reader.
type producer interface {
// meterProviderProducer produces metrics for a Reader.
type meterProviderProducer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (metricdata.ResourceMetrics, error)
}

// Producer produces metrics for a Reader from an external source.
type Producer interface {
// Produce returns aggregated metrics from an external source.
//
// This method should be safe to call concurrently.
Produce(context.Context) ([]metricdata.ScopeMetrics, error)
}

// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
Expand Down

0 comments on commit a88a52e

Please sign in to comment.