From f0ef7b35140ef7baae4083cc46d565d9b1d16112 Mon Sep 17 00:00:00 2001 From: Gabriel Aszalos Date: Wed, 18 Sep 2019 10:42:52 +0300 Subject: [PATCH 1/2] ddtrace/tracer: simplify channels This change slightly simplifies concurrency, moves some test methods to the test files, improves naming to be more consistent and adds more documentation. --- ddtrace/tracer/tracer.go | 56 ++++++++++++++++------------------- ddtrace/tracer/tracer_test.go | 26 ++++++++++------ 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index 28ea197832..b143ed6294 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -30,11 +30,15 @@ type tracer struct { *config *payload - flushAllReq chan chan<- struct{} - flushTracesReq chan struct{} - exitReq chan struct{} + // chanFlush triggers a flush of the buffered payload. If the sent channel is + // not nil (only in tests), it will receive confirmation of a finished flush. + chanFlush chan chan<- struct{} - payloadQueue chan []*span + // chanExit requests that the tracer stops. + chanExit chan struct{} + + // chanPayload receives traces to be added to the payload. + chanPayload chan []*span // stopped is a channel that will be closed when the worker has exited. stopped chan struct{} @@ -46,6 +50,7 @@ type tracer struct { // prioritySampling holds an instance of the priority sampler. prioritySampling *prioritySampler + // pid of the process pid string } @@ -129,10 +134,9 @@ func newTracer(opts ...StartOption) *tracer { t := &tracer{ config: c, payload: newPayload(), - flushAllReq: make(chan chan<- struct{}), - flushTracesReq: make(chan struct{}, 1), - exitReq: make(chan struct{}), - payloadQueue: make(chan []*span, payloadQueueSize), + chanFlush: make(chan chan<- struct{}), + chanExit: make(chan struct{}), + chanPayload: make(chan []*span, payloadQueueSize), stopped: make(chan struct{}), prioritySampling: newPrioritySampler(), pid: strconv.Itoa(os.Getpid()), @@ -152,21 +156,20 @@ func (t *tracer) worker() { for { select { - case trace := <-t.payloadQueue: + case trace := <-t.chanPayload: t.pushPayload(trace) case <-ticker.C: - t.flush() - - case done := <-t.flushAllReq: - t.flush() - done <- struct{}{} + t.flushPayload() - case <-t.flushTracesReq: - t.flush() + case confirm := <-t.chanFlush: + t.flushPayload() + if confirm != nil { + confirm <- struct{}{} + } - case <-t.exitReq: - t.flush() + case <-t.chanExit: + t.flushPayload() return } } @@ -179,7 +182,7 @@ func (t *tracer) pushTrace(trace []*span) { default: } select { - case t.payloadQueue <- trace: + case t.chanPayload <- trace: default: log.Error("payload queue full, dropping %d traces", len(trace)) } @@ -273,7 +276,7 @@ func (t *tracer) Stop() { case <-t.stopped: return default: - t.exitReq <- struct{}{} + t.chanExit <- struct{}{} <-t.stopped } } @@ -289,7 +292,7 @@ func (t *tracer) Extract(carrier interface{}) (ddtrace.SpanContext, error) { } // flush will push any currently buffered traces to the server. -func (t *tracer) flush() { +func (t *tracer) flushPayload() { if t.payload.itemCount() == 0 { return } @@ -305,15 +308,6 @@ func (t *tracer) flush() { t.payload.reset() } -// forceFlush forces a flush of data (traces and services) to the agent. -// Flushes are done by a background task on a regular basis, so you never -// need to call this manually, mostly useful for testing and debugging. -func (t *tracer) forceFlush() { - done := make(chan struct{}) - t.flushAllReq <- done - <-done -} - // pushPayload pushes the trace onto the payload. If the payload becomes // larger than the threshold as a result, it sends a flush request. func (t *tracer) pushPayload(trace []*span) { @@ -323,7 +317,7 @@ func (t *tracer) pushPayload(trace []*span) { if t.payload.size() > payloadSizeLimit { // getting large select { - case t.flushTracesReq <- struct{}{}: + case t.chanFlush <- nil: default: // flush already queued } diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 1bf0268a7d..d4145f622e 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -27,6 +27,14 @@ import ( "github.com/tinylib/msgp/msgp" ) +// forceFlush forces a flush of data (traces and services) to the agent +// synchronously. +func (t *tracer) forceFlush() { + confirm := make(chan struct{}) + t.chanFlush <- confirm + <-confirm +} + func (t *tracer) newEnvSpan(service, env string) *span { return t.StartSpan("test.op", SpanType("test"), ServiceName(service), ResourceName("/"), Tag(ext.Environment, env)).(*span) } @@ -830,9 +838,9 @@ func TestWorker(t *testing.T) { func newTracerChannels() *tracer { return &tracer{ - payload: newPayload(), - payloadQueue: make(chan []*span, payloadQueueSize), - flushTracesReq: make(chan struct{}, 1), + payload: newPayload(), + chanPayload: make(chan []*span, payloadQueueSize), + chanFlush: make(chan chan<- struct{}, 1), } } @@ -844,12 +852,12 @@ func TestPushPayload(t *testing.T) { // half payload size reached, we have 1 item, no flush request tracer.pushPayload([]*span{s}) assert.Equal(t, tracer.payload.itemCount(), 1) - assert.Len(t, tracer.flushTracesReq, 0) + assert.Len(t, tracer.chanFlush, 0) // payload size exceeded, we have 2 items and a flush request tracer.pushPayload([]*span{s}) assert.Equal(t, tracer.payload.itemCount(), 2) - assert.Len(t, tracer.flushTracesReq, 1) + assert.Len(t, tracer.chanFlush, 1) } func TestPushTrace(t *testing.T) { @@ -872,17 +880,17 @@ func TestPushTrace(t *testing.T) { } tracer.pushTrace(trace) - assert.Len(tracer.payloadQueue, 1) - assert.Len(tracer.flushTracesReq, 0, "no flush requested yet") + assert.Len(tracer.chanPayload, 1) + assert.Len(tracer.chanFlush, 0, "no flush requested yet") - t0 := <-tracer.payloadQueue + t0 := <-tracer.chanPayload assert.Equal(trace, t0) many := payloadQueueSize + 2 for i := 0; i < many; i++ { tracer.pushTrace(make([]*span, i)) } - assert.Len(tracer.payloadQueue, payloadQueueSize) + assert.Len(tracer.chanPayload, payloadQueueSize) log.Flush() assert.True(len(tp.Lines()) >= 2) } From 142eddfa4f256a0731c767fb57889f26f05245a0 Mon Sep 17 00:00:00 2001 From: Gabriel Aszalos Date: Wed, 18 Sep 2019 17:49:34 +0300 Subject: [PATCH 2/2] Go from "chan*" to "*Chan" --- ddtrace/tracer/tracer.go | 30 +++++++++++++++--------------- ddtrace/tracer/tracer_test.go | 18 +++++++++--------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index b143ed6294..3f2370899a 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -30,15 +30,15 @@ type tracer struct { *config *payload - // chanFlush triggers a flush of the buffered payload. If the sent channel is + // flushChan triggers a flush of the buffered payload. If the sent channel is // not nil (only in tests), it will receive confirmation of a finished flush. - chanFlush chan chan<- struct{} + flushChan chan chan<- struct{} - // chanExit requests that the tracer stops. - chanExit chan struct{} + // exitChan requests that the tracer stops. + exitChan chan struct{} - // chanPayload receives traces to be added to the payload. - chanPayload chan []*span + // payloadChan receives traces to be added to the payload. + payloadChan chan []*span // stopped is a channel that will be closed when the worker has exited. stopped chan struct{} @@ -134,9 +134,9 @@ func newTracer(opts ...StartOption) *tracer { t := &tracer{ config: c, payload: newPayload(), - chanFlush: make(chan chan<- struct{}), - chanExit: make(chan struct{}), - chanPayload: make(chan []*span, payloadQueueSize), + flushChan: make(chan chan<- struct{}), + exitChan: make(chan struct{}), + payloadChan: make(chan []*span, payloadQueueSize), stopped: make(chan struct{}), prioritySampling: newPrioritySampler(), pid: strconv.Itoa(os.Getpid()), @@ -156,19 +156,19 @@ func (t *tracer) worker() { for { select { - case trace := <-t.chanPayload: + case trace := <-t.payloadChan: t.pushPayload(trace) case <-ticker.C: t.flushPayload() - case confirm := <-t.chanFlush: + case confirm := <-t.flushChan: t.flushPayload() if confirm != nil { confirm <- struct{}{} } - case <-t.chanExit: + case <-t.exitChan: t.flushPayload() return } @@ -182,7 +182,7 @@ func (t *tracer) pushTrace(trace []*span) { default: } select { - case t.chanPayload <- trace: + case t.payloadChan <- trace: default: log.Error("payload queue full, dropping %d traces", len(trace)) } @@ -276,7 +276,7 @@ func (t *tracer) Stop() { case <-t.stopped: return default: - t.chanExit <- struct{}{} + t.exitChan <- struct{}{} <-t.stopped } } @@ -317,7 +317,7 @@ func (t *tracer) pushPayload(trace []*span) { if t.payload.size() > payloadSizeLimit { // getting large select { - case t.chanFlush <- nil: + case t.flushChan <- nil: default: // flush already queued } diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index d4145f622e..dfd881d8e4 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -31,7 +31,7 @@ import ( // synchronously. func (t *tracer) forceFlush() { confirm := make(chan struct{}) - t.chanFlush <- confirm + t.flushChan <- confirm <-confirm } @@ -839,8 +839,8 @@ func TestWorker(t *testing.T) { func newTracerChannels() *tracer { return &tracer{ payload: newPayload(), - chanPayload: make(chan []*span, payloadQueueSize), - chanFlush: make(chan chan<- struct{}, 1), + payloadChan: make(chan []*span, payloadQueueSize), + flushChan: make(chan chan<- struct{}, 1), } } @@ -852,12 +852,12 @@ func TestPushPayload(t *testing.T) { // half payload size reached, we have 1 item, no flush request tracer.pushPayload([]*span{s}) assert.Equal(t, tracer.payload.itemCount(), 1) - assert.Len(t, tracer.chanFlush, 0) + assert.Len(t, tracer.flushChan, 0) // payload size exceeded, we have 2 items and a flush request tracer.pushPayload([]*span{s}) assert.Equal(t, tracer.payload.itemCount(), 2) - assert.Len(t, tracer.chanFlush, 1) + assert.Len(t, tracer.flushChan, 1) } func TestPushTrace(t *testing.T) { @@ -880,17 +880,17 @@ func TestPushTrace(t *testing.T) { } tracer.pushTrace(trace) - assert.Len(tracer.chanPayload, 1) - assert.Len(tracer.chanFlush, 0, "no flush requested yet") + assert.Len(tracer.payloadChan, 1) + assert.Len(tracer.flushChan, 0, "no flush requested yet") - t0 := <-tracer.chanPayload + t0 := <-tracer.payloadChan assert.Equal(trace, t0) many := payloadQueueSize + 2 for i := 0; i < many; i++ { tracer.pushTrace(make([]*span, i)) } - assert.Len(tracer.chanPayload, payloadQueueSize) + assert.Len(tracer.payloadChan, payloadQueueSize) log.Flush() assert.True(len(tp.Lines()) >= 2) }