Skip to content

Commit

Permalink
ddtrace/tracer: fix regression when flushing based on size (#570)
Browse files Browse the repository at this point in the history
During a previous change released as part of v1.20.0 in PR #498, the
flush channel was accidentally made blocking. This would cause a
deadlock on occasion when flushing after the payload size became large.

Tests failed to catch this problem because they were not using the
constructor and instead had their own incorrect constructor. This
change also addresses that problem so that tests would now correctly
catch the issue.
  • Loading branch information
gbbr authored Jan 15, 2020
1 parent fff4ab8 commit 5942c57
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 57 deletions.
24 changes: 2 additions & 22 deletions ddtrace/tracer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,7 @@ func (tg *testStatsdClient) Wait(n int, d time.Duration) error {

func TestReportRuntimeMetrics(t *testing.T) {
var tg testStatsdClient
trc := &tracer{
stopped: make(chan struct{}),
exitChan: make(chan struct{}),
config: &config{
statsd: &tg,
},
}
trc := newUnstartedTracer(withStatsdClient(&tg))

trc.wg.Add(1)
go func() {
Expand All @@ -274,21 +268,7 @@ func TestReportRuntimeMetrics(t *testing.T) {
func TestReportHealthMetrics(t *testing.T) {
assert := assert.New(t)
var tg testStatsdClient
trc := &tracer{
config: &config{
statsd: &tg,
sampler: NewAllSampler(),
transport: newDummyTransport(),
},
payload: newPayload(),
flushChan: make(chan struct{}),
exitChan: make(chan struct{}),
payloadChan: make(chan []*span, payloadQueueSize),
stopped: make(chan struct{}),
rulesSampling: newRulesSampler(nil),
climit: make(chan struct{}, concurrentConnectionLimit),
prioritySampling: newPrioritySampler(),
}
trc := newUnstartedTracer(withStatsdClient(&tg))
internal.SetGlobalTracer(trc)
defer internal.SetGlobalTracer(&internal.NoopTracer{})

Expand Down
24 changes: 4 additions & 20 deletions ddtrace/tracer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,22 +496,6 @@ func TestSamplingLimiter(t *testing.T) {

func BenchmarkRulesSampler(b *testing.B) {
const batchSize = 500
newTracer := func(opts ...StartOption) *tracer {
c := new(config)
defaults(c)
for _, fn := range opts {
fn(c)
}
return &tracer{
config: c,
payloadChan: make(chan []*span, batchSize),
flushChan: make(chan struct{}, 1),
stopped: make(chan struct{}),
exitChan: make(chan struct{}, 1),
rulesSampling: newRulesSampler(c.samplingRules),
prioritySampling: newPrioritySampler(),
}
}

benchmarkStartSpan := func(b *testing.B, t *tracer) {
internal.SetGlobalTracer(t)
Expand Down Expand Up @@ -553,7 +537,7 @@ func BenchmarkRulesSampler(b *testing.B) {
}

b.Run("no-rules", func(b *testing.B) {
tracer := newTracer()
tracer := newUnstartedTracer()
benchmarkStartSpan(b, tracer)
})

Expand All @@ -563,7 +547,7 @@ func BenchmarkRulesSampler(b *testing.B) {
NameServiceRule("db.query", "postgres.db", 1.0),
NameRule("notweb.request", 1.0),
}
tracer := newTracer(WithSamplingRules(rules))
tracer := newUnstartedTracer(WithSamplingRules(rules))
benchmarkStartSpan(b, tracer)
})

Expand All @@ -573,7 +557,7 @@ func BenchmarkRulesSampler(b *testing.B) {
NameServiceRule("db.query", "postgres.db", 1.0),
NameRule("web.request", 1.0),
}
tracer := newTracer(WithSamplingRules(rules))
tracer := newUnstartedTracer(WithSamplingRules(rules))
benchmarkStartSpan(b, tracer)
})

Expand Down Expand Up @@ -603,7 +587,7 @@ func BenchmarkRulesSampler(b *testing.B) {
NameRule("notweb.request", 1.0),
NameRule("web.request", 1.0),
}
tracer := newTracer(WithSamplingRules(rules))
tracer := newUnstartedTracer(WithSamplingRules(rules))
benchmarkStartSpan(b, tracer)
})
}
12 changes: 8 additions & 4 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func Inject(ctx ddtrace.SpanContext, carrier interface{}) error {
// payloadQueueSize is the buffer size of the trace channel.
const payloadQueueSize = 1000

func newTracer(opts ...StartOption) *tracer {
func newUnstartedTracer(opts ...StartOption) *tracer {
c := new(config)
defaults(c)
for _, fn := range opts {
Expand All @@ -169,11 +169,10 @@ func newTracer(opts ...StartOption) *tracer {
c.statsd = client
}
}

t := &tracer{
return &tracer{
config: c,
payload: newPayload(),
flushChan: make(chan struct{}),
flushChan: make(chan struct{}, 1),
exitChan: make(chan struct{}),
payloadChan: make(chan []*span, payloadQueueSize),
stopped: make(chan struct{}),
Expand All @@ -182,6 +181,11 @@ func newTracer(opts ...StartOption) *tracer {
prioritySampling: newPrioritySampler(),
pid: strconv.Itoa(os.Getpid()),
}
}

func newTracer(opts ...StartOption) *tracer {
t := newUnstartedTracer(opts...)
c := t.config
t.config.statsd.Incr("datadog.tracer.started", nil, 1)
if c.runtimeMetrics {
log.Debug("Runtime metrics enabled.")
Expand Down
12 changes: 2 additions & 10 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,16 +870,8 @@ func TestWorker(t *testing.T) {
}
}

func newTracerChannels() *tracer {
return &tracer{
payload: newPayload(),
payloadChan: make(chan []*span, payloadQueueSize),
flushChan: make(chan struct{}, 1),
}
}

func TestPushPayload(t *testing.T) {
tracer := newTracerChannels()
tracer := newUnstartedTracer()
s := newBasicSpan("3MB")
s.Meta["key"] = strings.Repeat("X", payloadSizeLimit/2+10)

Expand All @@ -899,7 +891,7 @@ func TestPushTrace(t *testing.T) {

tp := new(testLogger)
log.UseLogger(tp)
tracer := newTracerChannels()
tracer := newUnstartedTracer()
trace := []*span{
&span{
Name: "pylons.request",
Expand Down
2 changes: 1 addition & 1 deletion internal/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ package version
// Tag specifies the current release tag. It needs to be manually
// updated. A test checks that the value of Tag never points to a
// git tag that is older than HEAD.
const Tag = "v1.21.0"
const Tag = "v1.20.1"

0 comments on commit 5942c57

Please sign in to comment.