Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddtrace/tracer: simplify channels #498

Merged
merged 2 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 25 additions & 31 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ type tracer struct {
*config
*payload

flushAllReq chan chan<- struct{}
flushTracesReq chan struct{}
exitReq chan struct{}
// chanFlush triggers a flush of the buffered payload. If the sent channel is
// not nil (only in tests), it will receive confirmation of a finished flush.
chanFlush chan chan<- struct{}
gbbr marked this conversation as resolved.
Show resolved Hide resolved

payloadQueue chan []*span
// chanExit requests that the tracer stops.
chanExit chan struct{}

// chanPayload receives traces to be added to the payload.
chanPayload chan []*span

// stopped is a channel that will be closed when the worker has exited.
stopped chan struct{}
Expand All @@ -46,6 +50,7 @@ type tracer struct {

// prioritySampling holds an instance of the priority sampler.
prioritySampling *prioritySampler

// pid of the process
pid string
}
Expand Down Expand Up @@ -129,10 +134,9 @@ func newTracer(opts ...StartOption) *tracer {
t := &tracer{
config: c,
payload: newPayload(),
flushAllReq: make(chan chan<- struct{}),
flushTracesReq: make(chan struct{}, 1),
exitReq: make(chan struct{}),
payloadQueue: make(chan []*span, payloadQueueSize),
chanFlush: make(chan chan<- struct{}),
chanExit: make(chan struct{}),
chanPayload: make(chan []*span, payloadQueueSize),
stopped: make(chan struct{}),
prioritySampling: newPrioritySampler(),
pid: strconv.Itoa(os.Getpid()),
Expand All @@ -152,21 +156,20 @@ func (t *tracer) worker() {

for {
select {
case trace := <-t.payloadQueue:
case trace := <-t.chanPayload:
t.pushPayload(trace)

case <-ticker.C:
t.flush()

case done := <-t.flushAllReq:
t.flush()
done <- struct{}{}
t.flushPayload()

case <-t.flushTracesReq:
t.flush()
case confirm := <-t.chanFlush:
t.flushPayload()
if confirm != nil {
confirm <- struct{}{}
}

case <-t.exitReq:
t.flush()
case <-t.chanExit:
t.flushPayload()
return
}
}
Expand All @@ -179,7 +182,7 @@ func (t *tracer) pushTrace(trace []*span) {
default:
}
select {
case t.payloadQueue <- trace:
case t.chanPayload <- trace:
default:
log.Error("payload queue full, dropping %d traces", len(trace))
}
Expand Down Expand Up @@ -273,7 +276,7 @@ func (t *tracer) Stop() {
case <-t.stopped:
return
default:
t.exitReq <- struct{}{}
t.chanExit <- struct{}{}
<-t.stopped
}
}
Expand All @@ -289,7 +292,7 @@ func (t *tracer) Extract(carrier interface{}) (ddtrace.SpanContext, error) {
}

// flush will push any currently buffered traces to the server.
func (t *tracer) flush() {
func (t *tracer) flushPayload() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment should be updated to flushPayload.

if t.payload.itemCount() == 0 {
return
}
Expand All @@ -305,15 +308,6 @@ func (t *tracer) flush() {
t.payload.reset()
}

// forceFlush forces a flush of data (traces and services) to the agent.
// Flushes are done by a background task on a regular basis, so you never
// need to call this manually, mostly useful for testing and debugging.
func (t *tracer) forceFlush() {
done := make(chan struct{})
t.flushAllReq <- done
<-done
}

// pushPayload pushes the trace onto the payload. If the payload becomes
// larger than the threshold as a result, it sends a flush request.
func (t *tracer) pushPayload(trace []*span) {
Expand All @@ -323,7 +317,7 @@ func (t *tracer) pushPayload(trace []*span) {
if t.payload.size() > payloadSizeLimit {
// getting large
select {
case t.flushTracesReq <- struct{}{}:
case t.chanFlush <- nil:
default:
// flush already queued
}
Expand Down
26 changes: 17 additions & 9 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ import (
"github.com/tinylib/msgp/msgp"
)

// forceFlush forces a flush of data (traces and services) to the agent
// synchronously.
func (t *tracer) forceFlush() {
confirm := make(chan struct{})
t.chanFlush <- confirm
<-confirm
}

func (t *tracer) newEnvSpan(service, env string) *span {
return t.StartSpan("test.op", SpanType("test"), ServiceName(service), ResourceName("/"), Tag(ext.Environment, env)).(*span)
}
Expand Down Expand Up @@ -830,9 +838,9 @@ func TestWorker(t *testing.T) {

func newTracerChannels() *tracer {
return &tracer{
payload: newPayload(),
payloadQueue: make(chan []*span, payloadQueueSize),
flushTracesReq: make(chan struct{}, 1),
payload: newPayload(),
chanPayload: make(chan []*span, payloadQueueSize),
chanFlush: make(chan chan<- struct{}, 1),
}
}

Expand All @@ -844,12 +852,12 @@ func TestPushPayload(t *testing.T) {
// half payload size reached, we have 1 item, no flush request
tracer.pushPayload([]*span{s})
assert.Equal(t, tracer.payload.itemCount(), 1)
assert.Len(t, tracer.flushTracesReq, 0)
assert.Len(t, tracer.chanFlush, 0)

// payload size exceeded, we have 2 items and a flush request
tracer.pushPayload([]*span{s})
assert.Equal(t, tracer.payload.itemCount(), 2)
assert.Len(t, tracer.flushTracesReq, 1)
assert.Len(t, tracer.chanFlush, 1)
}

func TestPushTrace(t *testing.T) {
Expand All @@ -872,17 +880,17 @@ func TestPushTrace(t *testing.T) {
}
tracer.pushTrace(trace)

assert.Len(tracer.payloadQueue, 1)
assert.Len(tracer.flushTracesReq, 0, "no flush requested yet")
assert.Len(tracer.chanPayload, 1)
assert.Len(tracer.chanFlush, 0, "no flush requested yet")

t0 := <-tracer.payloadQueue
t0 := <-tracer.chanPayload
assert.Equal(trace, t0)

many := payloadQueueSize + 2
for i := 0; i < many; i++ {
tracer.pushTrace(make([]*span, i))
}
assert.Len(tracer.payloadQueue, payloadQueueSize)
assert.Len(tracer.chanPayload, payloadQueueSize)
log.Flush()
assert.True(len(tp.Lines()) >= 2)
}
Expand Down