From a88a52e329924be18f4fadea1930c73d214d6986 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 7 Dec 2022 21:09:38 +0000 Subject: [PATCH] add Metric Producer as a new interface, and scope to only scope metrics --- sdk/metric/manual_reader.go | 50 +++++++++++++++++++++++++++-------- sdk/metric/periodic_reader.go | 44 ++++++++++++++++++++++++------ sdk/metric/reader.go | 19 ++++++++++--- 3 files changed, 91 insertions(+), 22 deletions(-) diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 0ebfadf33a32..e352a6338784 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -28,9 +28,12 @@ import ( // manualReader is a simple Reader that allows an application to // read metrics on demand. type manualReader struct { - producer atomic.Value + sdkProducer atomic.Value shutdownOnce sync.Once + mu sync.Mutex + externalProducers atomic.Value + temporalitySelector TemporalitySelector aggregationSelector AggregationSelector } @@ -41,22 +44,36 @@ var _ = map[Reader]struct{}{&manualReader{}: {}} // NewManualReader returns a Reader which is directly called to collect metrics. func NewManualReader(opts ...ManualReaderOption) Reader { cfg := newManualReaderConfig(opts) - return &manualReader{ + r := &manualReader{ temporalitySelector: cfg.temporalitySelector, aggregationSelector: cfg.aggregationSelector, } + r.externalProducers.Store([]Producer{}) + return r } -// register stores the Producer which enables the caller to read -// metrics on demand. -func (mr *manualReader) register(p producer) { +// register stores the meterProviderProducer which enables the caller +// to read metrics from the SDK on demand. +func (mr *manualReader) register(p meterProviderProducer) { // Only register once. If producer is already set, do nothing. - if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { msg := "did not register manual reader" global.Error(errDuplicateRegister, msg) } } +// RegisterProducer stores the external Producer which enables the caller +// to read metrics on demand. +func (mr *manualReader) RegisterProducer(p Producer) { + mr.mu.Lock() + defer mr.mu.Unlock() + currentProducers := mr.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + mr.externalProducers.Store(newProducers) +} + // temporality reports the Temporality for the instrument kind provided. func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality { return mr.temporalitySelector(kind) @@ -77,7 +94,7 @@ func (mr *manualReader) Shutdown(context.Context) error { err := ErrReaderShutdown mr.shutdownOnce.Do(func() { // Any future call to Collect will now return ErrReaderShutdown. - mr.producer.Store(produceHolder{ + mr.sdkProducer.Store(produceHolder{ produce: shutdownProducer{}.produce, }) err = nil @@ -85,10 +102,10 @@ func (mr *manualReader) Shutdown(context.Context) error { return err } -// Collect gathers all metrics from the SDK, calling any callbacks necessary. -// Collect will return an error if called after shutdown. +// Collect gathers all metrics from the SDK and other Producers, calling any +// callbacks necessary. Collect will return an error if called after shutdown. func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - p := mr.producer.Load() + p := mr.sdkProducer.Load() if p == nil { return metricdata.ResourceMetrics{}, ErrReaderNotRegistered } @@ -103,7 +120,18 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + for _, producer := range mr.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) + } + return rm, nil } // manualReaderConfig contains configuration options for a ManualReader. diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 00ba13055951..95cd52d46cfc 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade cancel: cancel, done: make(chan struct{}), } + r.externalProducers.Store([]Producer{}) go func() { defer func() { close(r.done) }() @@ -126,7 +127,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade // periodicReader is a Reader that continuously collects and exports metric // data at a set interval. type periodicReader struct { - producer atomic.Value + sdkProducer atomic.Value + + mu sync.Mutex + externalProducers atomic.Value timeout time.Duration exporter Exporter @@ -166,14 +170,25 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) { } // register registers p as the producer of this reader. -func (r *periodicReader) register(p producer) { +func (r *periodicReader) register(p meterProviderProducer) { // Only register once. If producer is already set, do nothing. - if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { msg := "did not register periodic reader" global.Error(errDuplicateRegister, msg) } } +// RegisterProducer registers p as an external Producer of this reader. +func (r *periodicReader) RegisterProducer(p Producer) { + r.mu.Lock() + defer r.mu.Unlock() + currentProducers := r.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + r.externalProducers.Store(newProducers) +} + // temporality reports the Temporality for the instrument kind provided. func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality { return r.exporter.Temporality(kind) @@ -195,12 +210,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error { } // Collect gathers and returns all metric data related to the Reader from -// the SDK. The returned metric data is not exported to the configured -// exporter, it is left to the caller to handle that if desired. +// the SDK and other Producers. The returned metric data is not exported +// to the configured exporter, it is left to the caller to handle that if +// desired. // // An error is returned if this is called after Shutdown. func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - return r.collect(ctx, r.producer.Load()) + return r.collect(ctx, r.sdkProducer.Load()) } // collect unwraps p as a produceHolder and returns its produce results. @@ -218,7 +234,19 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata err := fmt.Errorf("periodic reader: invalid producer: %T", p) return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + for _, producer := range r.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) + } + return rm, nil } // export exports metric data m using r's exporter. @@ -259,7 +287,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { <-r.done // Any future call to Collect will now return ErrReaderShutdown. - ph := r.producer.Swap(produceHolder{ + ph := r.sdkProducer.Swap(produceHolder{ produce: shutdownProducer{}.produce, }) diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index aa9d50ef666c..cfe6fb0b59bd 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -51,7 +51,12 @@ type Reader interface { // register registers a Reader with a MeterProvider. // The producer argument allows the Reader to signal the sdk to collect // and send aggregated metric measurements. - register(producer) + register(meterProviderProducer) + + // RegisterProducer registers a Reader with an external Producer. + // The Producer argument allows the Reader to signal the Producer to + // collect and send aggregated metric measurements. + RegisterProducer(Producer) // temporality reports the Temporality for the instrument kind provided. temporality(InstrumentKind) metricdata.Temporality @@ -84,14 +89,22 @@ type Reader interface { Shutdown(context.Context) error } -// producer produces metrics for a Reader. -type producer interface { +// meterProviderProducer produces metrics for a Reader. +type meterProviderProducer interface { // produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. produce(context.Context) (metricdata.ResourceMetrics, error) } +// Producer produces metrics for a Reader from an external source. +type Producer interface { + // Produce returns aggregated metrics from an external source. + // + // This method should be safe to call concurrently. + Produce(context.Context) ([]metricdata.ScopeMetrics, error) +} + // produceHolder is used as an atomic.Value to wrap the non-concrete producer // type. type produceHolder struct {