Skip to content

Commit

Permalink
ddtrace/tracer: fix regression when flushing based on size
Browse files Browse the repository at this point in the history
During a previous change released as part of v1.20.0 in PR #498, the
flush channel was accidentally made blocking. This would cause a
deadlock on occasion when flushing after the payload was full.

Tests failed to catch this problem because they were not using the
constructor and instead had their own, which was made correctly. This
change also addresses that problem and removes the constructor from the
tests, now correctly reproducing the problem.
  • Loading branch information
gbbr committed Jan 15, 2020
1 parent fff4ab8 commit c89dae6
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 41 deletions.
8 changes: 1 addition & 7 deletions ddtrace/tracer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,7 @@ func (tg *testStatsdClient) Wait(n int, d time.Duration) error {

func TestReportRuntimeMetrics(t *testing.T) {
var tg testStatsdClient
trc := &tracer{
stopped: make(chan struct{}),
exitChan: make(chan struct{}),
config: &config{
statsd: &tg,
},
}
trc := newUnstartedTracer(withStatsdClient(&tg))

trc.wg.Add(1)
go func() {
Expand Down
24 changes: 4 additions & 20 deletions ddtrace/tracer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,22 +496,6 @@ func TestSamplingLimiter(t *testing.T) {

func BenchmarkRulesSampler(b *testing.B) {
const batchSize = 500
newTracer := func(opts ...StartOption) *tracer {
c := new(config)
defaults(c)
for _, fn := range opts {
fn(c)
}
return &tracer{
config: c,
payloadChan: make(chan []*span, batchSize),
flushChan: make(chan struct{}, 1),
stopped: make(chan struct{}),
exitChan: make(chan struct{}, 1),
rulesSampling: newRulesSampler(c.samplingRules),
prioritySampling: newPrioritySampler(),
}
}

benchmarkStartSpan := func(b *testing.B, t *tracer) {
internal.SetGlobalTracer(t)
Expand Down Expand Up @@ -553,7 +537,7 @@ func BenchmarkRulesSampler(b *testing.B) {
}

b.Run("no-rules", func(b *testing.B) {
tracer := newTracer()
tracer := newUnstartedTracer()
benchmarkStartSpan(b, tracer)
})

Expand All @@ -563,7 +547,7 @@ func BenchmarkRulesSampler(b *testing.B) {
NameServiceRule("db.query", "postgres.db", 1.0),
NameRule("notweb.request", 1.0),
}
tracer := newTracer(WithSamplingRules(rules))
tracer := newUnstartedTracer(WithSamplingRules(rules))
benchmarkStartSpan(b, tracer)
})

Expand All @@ -573,7 +557,7 @@ func BenchmarkRulesSampler(b *testing.B) {
NameServiceRule("db.query", "postgres.db", 1.0),
NameRule("web.request", 1.0),
}
tracer := newTracer(WithSamplingRules(rules))
tracer := newUnstartedTracer(WithSamplingRules(rules))
benchmarkStartSpan(b, tracer)
})

Expand Down Expand Up @@ -603,7 +587,7 @@ func BenchmarkRulesSampler(b *testing.B) {
NameRule("notweb.request", 1.0),
NameRule("web.request", 1.0),
}
tracer := newTracer(WithSamplingRules(rules))
tracer := newUnstartedTracer(WithSamplingRules(rules))
benchmarkStartSpan(b, tracer)
})
}
11 changes: 8 additions & 3 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func Inject(ctx ddtrace.SpanContext, carrier interface{}) error {
// payloadQueueSize is the buffer size of the trace channel.
const payloadQueueSize = 1000

func newTracer(opts ...StartOption) *tracer {
func newUnstartedTracer(opts ...StartOption) *tracer {
c := new(config)
defaults(c)
for _, fn := range opts {
Expand Down Expand Up @@ -170,10 +170,10 @@ func newTracer(opts ...StartOption) *tracer {
}
}

t := &tracer{
return &tracer{
config: c,
payload: newPayload(),
flushChan: make(chan struct{}),
flushChan: make(chan struct{}, 1),
exitChan: make(chan struct{}),
payloadChan: make(chan []*span, payloadQueueSize),
stopped: make(chan struct{}),
Expand All @@ -182,6 +182,11 @@ func newTracer(opts ...StartOption) *tracer {
prioritySampling: newPrioritySampler(),
pid: strconv.Itoa(os.Getpid()),
}
}

func newTracer(opts ...StartOption) *tracer {
t := newUnstartedTracer(opts...)
c := t.config
t.config.statsd.Incr("datadog.tracer.started", nil, 1)
if c.runtimeMetrics {
log.Debug("Runtime metrics enabled.")
Expand Down
12 changes: 2 additions & 10 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,16 +870,8 @@ func TestWorker(t *testing.T) {
}
}

func newTracerChannels() *tracer {
return &tracer{
payload: newPayload(),
payloadChan: make(chan []*span, payloadQueueSize),
flushChan: make(chan struct{}, 1),
}
}

func TestPushPayload(t *testing.T) {
tracer := newTracerChannels()
tracer := newUnstartedTracer()
s := newBasicSpan("3MB")
s.Meta["key"] = strings.Repeat("X", payloadSizeLimit/2+10)

Expand All @@ -899,7 +891,7 @@ func TestPushTrace(t *testing.T) {

tp := new(testLogger)
log.UseLogger(tp)
tracer := newTracerChannels()
tracer := newUnstartedTracer()
trace := []*span{
&span{
Name: "pylons.request",
Expand Down
2 changes: 1 addition & 1 deletion internal/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ package version
// Tag specifies the current release tag. It needs to be manually
// updated. A test checks that the value of Tag never points to a
// git tag that is older than HEAD.
const Tag = "v1.21.0"
const Tag = "v1.20.1"

0 comments on commit c89dae6

Please sign in to comment.