From e4bdfe7b566d7d599988825ae7a67eaf5713db2e Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 12 Oct 2022 13:44:18 -0700 Subject: [PATCH] Fix sdktrace.TraceProvider Shutdown/ForceFlush when no processor register (#3268) * Fix sdktrace.TraceProvider Shutdown/ForceFlush when no processor register Signed-off-by: Bogdan Drutu * Update CHANGELOG.md Signed-off-by: Bogdan Drutu Co-authored-by: Tyler Yahn --- CHANGELOG.md | 4 +++ sdk/trace/provider.go | 41 +++++++++++++---------------- sdk/trace/provider_test.go | 51 +++++++++++++++++++------------------ sdk/trace/span.go | 15 +++++------ sdk/trace/span_processor.go | 5 ++++ sdk/trace/tracer.go | 2 +- 6 files changed, 60 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93b3a7d745d..d13497c6ae0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Changed + +- `sdktrace.TraceProvider.Shutdown` and `sdktrace.TraceProvider.ForceFlush` to not return error when no processor register. (#3268) + ## [1.11.0/0.32.3] 2022-10-12 ### Added diff --git a/sdk/trace/provider.go b/sdk/trace/provider.go index 292ea5481bc..327b8b41638 100644 --- a/sdk/trace/provider.go +++ b/sdk/trace/provider.go @@ -116,12 +116,13 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider { spanLimits: o.spanLimits, resource: o.resource, } - global.Info("TracerProvider created", "config", o) + spss := spanProcessorStates{} for _, sp := range o.processors { - tp.RegisterSpanProcessor(sp) + spss = append(spss, newSpanProcessorState(sp)) } + tp.spanProcessors.Store(spss) return tp } @@ -159,44 +160,38 @@ func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.T } // RegisterSpanProcessor adds the given SpanProcessor to the list of SpanProcessors. -func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) { +func (p *TracerProvider) RegisterSpanProcessor(sp SpanProcessor) { p.mu.Lock() defer p.mu.Unlock() newSPS := spanProcessorStates{} - if old, ok := p.spanProcessors.Load().(spanProcessorStates); ok { - newSPS = append(newSPS, old...) - } - newSpanSync := &spanProcessorState{ - sp: s, - state: &sync.Once{}, - } - newSPS = append(newSPS, newSpanSync) + newSPS = append(newSPS, p.spanProcessors.Load().(spanProcessorStates)...) + newSPS = append(newSPS, newSpanProcessorState(sp)) p.spanProcessors.Store(newSPS) } // UnregisterSpanProcessor removes the given SpanProcessor from the list of SpanProcessors. -func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) { +func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) { p.mu.Lock() defer p.mu.Unlock() - spss := spanProcessorStates{} - old, ok := p.spanProcessors.Load().(spanProcessorStates) - if !ok || len(old) == 0 { + old := p.spanProcessors.Load().(spanProcessorStates) + if len(old) == 0 { return } + spss := spanProcessorStates{} spss = append(spss, old...) // stop the span processor if it is started and remove it from the list var stopOnce *spanProcessorState var idx int for i, sps := range spss { - if sps.sp == s { + if sps.sp == sp { stopOnce = sps idx = i } } if stopOnce != nil { stopOnce.state.Do(func() { - if err := s.Shutdown(context.Background()); err != nil { + if err := sp.Shutdown(context.Background()); err != nil { otel.Handle(err) } }) @@ -213,10 +208,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) { // ForceFlush immediately exports all spans that have not yet been exported for // all the registered span processors. func (p *TracerProvider) ForceFlush(ctx context.Context) error { - spss, ok := p.spanProcessors.Load().(spanProcessorStates) - if !ok { - return fmt.Errorf("failed to load span processors") - } + spss := p.spanProcessors.Load().(spanProcessorStates) if len(spss) == 0 { return nil } @@ -237,10 +229,11 @@ func (p *TracerProvider) ForceFlush(ctx context.Context) error { // Shutdown shuts down the span processors in the order they were registered. func (p *TracerProvider) Shutdown(ctx context.Context) error { - spss, ok := p.spanProcessors.Load().(spanProcessorStates) - if !ok { - return fmt.Errorf("failed to load span processors") + spss := p.spanProcessors.Load().(spanProcessorStates) + if len(spss) == 0 { + return nil } + var retErr error for _, sps := range spss { select { diff --git a/sdk/trace/provider_test.go b/sdk/trace/provider_test.go index 6ec3df6794d..2cf19aaa029 100644 --- a/sdk/trace/provider_test.go +++ b/sdk/trace/provider_test.go @@ -28,41 +28,45 @@ import ( "go.opentelemetry.io/otel/trace" ) -type basicSpanProcesor struct { - running bool +type basicSpanProcessor struct { + flushed bool + closed bool injectShutdownError error } -func (t *basicSpanProcesor) Shutdown(context.Context) error { - t.running = false +func (t *basicSpanProcessor) Shutdown(context.Context) error { + t.closed = true return t.injectShutdownError } -func (t *basicSpanProcesor) OnStart(context.Context, ReadWriteSpan) {} -func (t *basicSpanProcesor) OnEnd(ReadOnlySpan) {} -func (t *basicSpanProcesor) ForceFlush(context.Context) error { +func (t *basicSpanProcessor) OnStart(context.Context, ReadWriteSpan) {} +func (t *basicSpanProcessor) OnEnd(ReadOnlySpan) {} +func (t *basicSpanProcessor) ForceFlush(context.Context) error { + t.flushed = true return nil } +func TestForceFlushAndShutdownTraceProviderWithoutProcessor(t *testing.T) { + stp := NewTracerProvider() + assert.NoError(t, stp.ForceFlush(context.Background())) + assert.NoError(t, stp.Shutdown(context.Background())) +} + func TestShutdownTraceProvider(t *testing.T) { stp := NewTracerProvider() - sp := &basicSpanProcesor{} + sp := &basicSpanProcessor{} stp.RegisterSpanProcessor(sp) - sp.running = true - - _ = stp.Shutdown(context.Background()) - - if sp.running { - t.Errorf("Error shutdown basicSpanProcesor\n") - } + assert.NoError(t, stp.ForceFlush(context.Background())) + assert.True(t, sp.flushed, "error ForceFlush basicSpanProcessor") + assert.NoError(t, stp.Shutdown(context.Background())) + assert.True(t, sp.closed, "error Shutdown basicSpanProcessor") } func TestFailedProcessorShutdown(t *testing.T) { stp := NewTracerProvider() spErr := errors.New("basic span processor shutdown failure") - sp := &basicSpanProcesor{ - running: true, + sp := &basicSpanProcessor{ injectShutdownError: spErr, } stp.RegisterSpanProcessor(sp) @@ -76,12 +80,10 @@ func TestFailedProcessorsShutdown(t *testing.T) { stp := NewTracerProvider() spErr1 := errors.New("basic span processor shutdown failure1") spErr2 := errors.New("basic span processor shutdown failure2") - sp1 := &basicSpanProcesor{ - running: true, + sp1 := &basicSpanProcessor{ injectShutdownError: spErr1, } - sp2 := &basicSpanProcesor{ - running: true, + sp2 := &basicSpanProcessor{ injectShutdownError: spErr2, } stp.RegisterSpanProcessor(sp1) @@ -90,16 +92,15 @@ func TestFailedProcessorsShutdown(t *testing.T) { err := stp.Shutdown(context.Background()) assert.Error(t, err) assert.EqualError(t, err, "basic span processor shutdown failure1; basic span processor shutdown failure2") - assert.False(t, sp1.running) - assert.False(t, sp2.running) + assert.True(t, sp1.closed) + assert.True(t, sp2.closed) } func TestFailedProcessorShutdownInUnregister(t *testing.T) { handler.Reset() stp := NewTracerProvider() spErr := errors.New("basic span processor shutdown failure") - sp := &basicSpanProcesor{ - running: true, + sp := &basicSpanProcessor{ injectShutdownError: spErr, } stp.RegisterSpanProcessor(sp) diff --git a/sdk/trace/span.go b/sdk/trace/span.go index 9760923f702..c7cf8e94ec9 100644 --- a/sdk/trace/span.go +++ b/sdk/trace/span.go @@ -423,14 +423,13 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) { } s.mu.Unlock() - if sps, ok := s.tracer.provider.spanProcessors.Load().(spanProcessorStates); ok { - if len(sps) == 0 { - return - } - snap := s.snapshot() - for _, sp := range sps { - sp.sp.OnEnd(snap) - } + sps := s.tracer.provider.spanProcessors.Load().(spanProcessorStates) + if len(sps) == 0 { + return + } + snap := s.snapshot() + for _, sp := range sps { + sp.sp.OnEnd(snap) } } diff --git a/sdk/trace/span_processor.go b/sdk/trace/span_processor.go index b649a2ff049..e6ae1935219 100644 --- a/sdk/trace/span_processor.go +++ b/sdk/trace/span_processor.go @@ -64,4 +64,9 @@ type spanProcessorState struct { sp SpanProcessor state *sync.Once } + +func newSpanProcessorState(sp SpanProcessor) *spanProcessorState { + return &spanProcessorState{sp: sp, state: &sync.Once{}} +} + type spanProcessorStates []*spanProcessorState diff --git a/sdk/trace/tracer.go b/sdk/trace/tracer.go index 7b11fc465c6..f17d924b89e 100644 --- a/sdk/trace/tracer.go +++ b/sdk/trace/tracer.go @@ -51,7 +51,7 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...trace.SpanS s := tr.newSpan(ctx, name, &config) if rw, ok := s.(ReadWriteSpan); ok && s.IsRecording() { - sps, _ := tr.provider.spanProcessors.Load().(spanProcessorStates) + sps := tr.provider.spanProcessors.Load().(spanProcessorStates) for _, sp := range sps { sp.sp.OnStart(ctx, rw) }