From 91218a6a5e58d613c3c92d3ff54fd8992061f1fd Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Fri, 25 Oct 2024 09:39:24 -0400 Subject: [PATCH] fix goroutine leak in mocktracer --- ddtrace/mocktracer/mocktracer_test.go | 2 ++ internal/datastreams/processor.go | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/ddtrace/mocktracer/mocktracer_test.go b/ddtrace/mocktracer/mocktracer_test.go index 23a16d3311..38f98b10f8 100644 --- a/ddtrace/mocktracer/mocktracer_test.go +++ b/ddtrace/mocktracer/mocktracer_test.go @@ -195,6 +195,7 @@ func TestTracerInject(t *testing.T) { t.Run("ok", func(t *testing.T) { mt := newMockTracer() + defer mt.Stop() assert := assert.New(t) sp := mt.StartSpan("op", tracer.WithSpanID(2)) @@ -300,6 +301,7 @@ func TestTracerExtract(t *testing.T) { assert := assert.New(t) mt := newMockTracer() + defer mt.Stop() sp := mt.StartSpan("op", tracer.WithSpanID(2)) sp.SetTag(ext.ManualDrop, true) sp.SetBaggageItem("a", "B") diff --git a/internal/datastreams/processor.go b/internal/datastreams/processor.go index 0f0bc38faa..c2f8c811a8 100644 --- a/internal/datastreams/processor.go +++ b/internal/datastreams/processor.go @@ -311,16 +311,16 @@ func (p *Processor) flushInput() { func (p *Processor) run(tick <-chan time.Time) { for { select { + case <-p.stop: + // drop in flight payloads on the input channel + p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) + return case now := <-tick: p.sendToAgent(p.flush(now)) case done := <-p.flushRequest: p.flushInput() p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) close(done) - case <-p.stop: - // drop in flight payloads on the input channel - p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) - return default: s := p.in.pop() if s == nil {