diff --git a/ddtrace/mocktracer/mocktracer_test.go b/ddtrace/mocktracer/mocktracer_test.go index 23a16d3311..38f98b10f8 100644 --- a/ddtrace/mocktracer/mocktracer_test.go +++ b/ddtrace/mocktracer/mocktracer_test.go @@ -195,6 +195,7 @@ func TestTracerInject(t *testing.T) { t.Run("ok", func(t *testing.T) { mt := newMockTracer() + defer mt.Stop() assert := assert.New(t) sp := mt.StartSpan("op", tracer.WithSpanID(2)) @@ -300,6 +301,7 @@ func TestTracerExtract(t *testing.T) { assert := assert.New(t) mt := newMockTracer() + defer mt.Stop() sp := mt.StartSpan("op", tracer.WithSpanID(2)) sp.SetTag(ext.ManualDrop, true) sp.SetBaggageItem("a", "B") diff --git a/internal/datastreams/processor.go b/internal/datastreams/processor.go index 0f0bc38faa..c2f8c811a8 100644 --- a/internal/datastreams/processor.go +++ b/internal/datastreams/processor.go @@ -311,16 +311,16 @@ func (p *Processor) flushInput() { func (p *Processor) run(tick <-chan time.Time) { for { select { + case <-p.stop: + // drop in flight payloads on the input channel + p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) + return case now := <-tick: p.sendToAgent(p.flush(now)) case done := <-p.flushRequest: p.flushInput() p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) close(done) - case <-p.stop: - // drop in flight payloads on the input channel - p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) - return default: s := p.in.pop() if s == nil {