diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index d4a1f7110f1..3ab837c00d0 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -34,7 +34,7 @@ type ManualReader struct { mu sync.Mutex isShutdown bool - externalProducers []Producer + externalProducers atomic.Value temporalitySelector TemporalitySelector aggregationSelector AggregationSelector @@ -49,8 +49,8 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader { r := &ManualReader{ temporalitySelector: cfg.temporalitySelector, aggregationSelector: cfg.aggregationSelector, - externalProducers: cfg.producers, } + r.externalProducers.Store(cfg.producers) return r } @@ -88,7 +88,7 @@ func (mr *ManualReader) Shutdown(context.Context) error { defer mr.mu.Unlock() mr.isShutdown = true // release references to Producer(s) - mr.externalProducers = nil + mr.externalProducers.Store([]Producer{}) err = nil }) return err @@ -126,7 +126,7 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr return err } var errs []error - for _, producer := range mr.externalProducers { + for _, producer := range mr.externalProducers.Load().([]Producer) { externalMetrics, err := producer.Produce(ctx) if err != nil { errs = append(errs, err) diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 3f7fcb41b50..1d18f961197 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -119,18 +119,18 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri conf := newPeriodicReaderConfig(options) ctx, cancel := context.WithCancel(context.Background()) r := &PeriodicReader{ - interval: conf.interval, - timeout: conf.timeout, - exporter: exporter, - flushCh: make(chan chan error), - cancel: cancel, - done: make(chan struct{}), - externalProducers: conf.producers, + interval: conf.interval, + timeout: conf.timeout, + exporter: exporter, + flushCh: make(chan chan error), + cancel: cancel, + done: make(chan struct{}), rmPool: sync.Pool{ New: func() interface{} { return &metricdata.ResourceMetrics{} }}, } + r.externalProducers.Store(conf.producers) go func() { defer func() { close(r.done) }() @@ -147,7 +147,7 @@ type PeriodicReader struct { mu sync.Mutex isShutdown bool - externalProducers []Producer + externalProducers atomic.Value interval time.Duration timeout time.Duration @@ -263,7 +263,7 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd return err } var errs []error - for _, producer := range r.externalProducers { + for _, producer := range r.externalProducers.Load().([]Producer) { externalMetrics, err := producer.Produce(ctx) if err != nil { errs = append(errs, err) @@ -353,7 +353,7 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { defer r.mu.Unlock() r.isShutdown = true // release references to Producer(s) - r.externalProducers = nil + r.externalProducers.Store([]Producer{}) }) return err }