Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddtrace/tracer: Don't drop trace if some spans must be kept #963

Merged
merged 12 commits into from
Jul 22, 2021
Merged
1 change: 1 addition & 0 deletions ddtrace/tracer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (rs *rulesSampler) apply(span *span) bool {
var matched bool
rate := rs.globalRate
for _, rule := range rs.rules {
fmt.Println("try match", rule.exactService, rule.exactName, span.Service, span.Name)
gbbr marked this conversation as resolved.
Show resolved Hide resolved
if rule.match(span) {
matched = true
rate = rule.Rate
Expand Down
26 changes: 16 additions & 10 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func (s *span) finish(finishTime int64) {
}
s.finished = true

keep := true
if t, ok := internal.GetGlobalTracer().(*tracer); ok {
// we have an active tracer
feats := t.features.Load()
Expand All @@ -358,19 +359,23 @@ func (s *span) finish(finishTime int64) {
}
if feats.DropP0s {
// the agent supports dropping p0's in the client
if shouldDrop(s) {
keep = shouldKeep(s)
if keep {
// ...and this span can be dropped
atomic.AddUint64(&t.droppedP0Spans, 1)
if s == s.context.trace.root {
atomic.AddUint64(&t.droppedP0Traces, 1)
}
return
}
}
}
if s.context.drop {
// not sampled by local sampler
return
// client sampling enabled. This will lead to inexact APM metrics but reduces performance impact.
keep = false
gbbr marked this conversation as resolved.
Show resolved Hide resolved
}
if keep {
// a single kept span keeps the whole trace.
s.context.trace.keep()
gbbr marked this conversation as resolved.
Show resolved Hide resolved
}
s.context.finish()
}
Expand Down Expand Up @@ -401,20 +406,21 @@ func newAggregableSpan(s *span, cfg *config) *aggregableSpan {
}
}

// shouldDrop reports whether it's fine to drop the span s.
func shouldDrop(s *span) bool {
// shouldKeep reports whether the trace should be kept.
// a single span being kept implies the whole trace being kept.
func shouldKeep(s *span) bool {
if p, ok := s.context.samplingPriority(); ok && p > 0 {
// positive sampling priorities stay
return false
return true
}
if atomic.LoadInt64(&s.context.errors) > 0 {
// traces with any span containing an error get kept
return false
return true
}
if v, ok := s.Metrics[ext.EventSampleRate]; ok {
return !sampledByRate(s.TraceID, v)
return sampledByRate(s.TraceID, v)
}
return true
return false
}

// shouldComputeStats mentions whether this span needs to have stats computed for.
Expand Down
18 changes: 9 additions & 9 deletions ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,26 @@ func TestShouldDrop(t *testing.T) {
rate float64
want bool
}{
{1, 0, 0, false},
{2, 1, 0, false},
{0, 1, 0, false},
{0, 0, 1, false},
{0, 0, 0.5, false},
{0, 0, 0.00001, true},
{0, 0, 0, true},
{1, 0, 0, true},
{2, 1, 0, true},
{0, 1, 0, true},
{0, 0, 1, true},
{0, 0, 0.5, true},
{0, 0, 0.00001, false},
{0, 0, 0, false},
} {
t.Run("", func(t *testing.T) {
s := newSpan("", "", "", 1, 1, 0)
s.SetTag(ext.SamplingPriority, tt.prio)
s.SetTag(ext.EventSampleRate, tt.rate)
atomic.StoreInt64(&s.context.errors, tt.errors)
assert.Equal(t, shouldDrop(s), tt.want)
assert.Equal(t, shouldKeep(s), tt.want)
})
}

t.Run("none", func(t *testing.T) {
s := newSpan("", "", "", 1, 1, 0)
assert.Equal(t, shouldDrop(s), true)
assert.Equal(t, shouldKeep(s), false)
})
}

Expand Down
18 changes: 15 additions & 3 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type spanContext struct {

trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
drop bool // when true, the span will not be sent to the agent
drop bool // reports whether this span was dropped by the rate sampler
errors int64 // number of spans with errors in this trace

// the below group should propagate cross-process
Expand Down Expand Up @@ -138,6 +138,7 @@ type trace struct {
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
kept bool // kept indicates whether to send the trace to the agent or no.

// root specifies the root of the trace, if known; it is nil when a span
// context is extracted from a carrier, at which point there are no spans in
Expand Down Expand Up @@ -179,6 +180,12 @@ func (t *trace) setSamplingPriority(p float64) {
t.setSamplingPriorityLocked(p)
}

func (t *trace) keep() {
t.mu.Lock()
defer t.mu.Unlock()
t.kept = true
}

func (t *trace) setSamplingPriorityLocked(p float64) {
if t.locked {
return
Expand Down Expand Up @@ -246,11 +253,16 @@ func (t *trace) finishedOne(s *span) {
if len(t.spans) != t.finished {
return
}
defer func() {
t.spans = nil
t.finished = 0 // important, because a buffer can be used for several flushes
}()
if !t.kept {
return
}
if tr, ok := internal.GetGlobalTracer().(*tracer); ok {
// we have a tracer that can receive completed traces.
tr.pushTrace(t.spans)
atomic.AddInt64(&tr.spansFinished, int64(len(t.spans)))
}
t.spans = nil
t.finished = 0 // important, because a buffer can be used for several flushes
}
1 change: 1 addition & 0 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ const sampleRateMetricKey = "_sample_rate"

// Sample samples a span with the internal sampler.
func (t *tracer) sample(span *span) {
// this condition is true for all spans except the local root span.
if _, ok := span.context.samplingPriority(); ok {
// sampling decision was already made
return
Expand Down
44 changes: 44 additions & 0 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,50 @@ func TestTracerStartSpan(t *testing.T) {
})
}

func TestP0Dropping(t *testing.T) {
t.Run("p0 are kept by default", func(t *testing.T) {
gbbr marked this conversation as resolved.
Show resolved Hide resolved
tracer, _, _, stop := startTestTracer(t)
defer stop()
tracer.prioritySampling.defaultRate = 0
tracer.config.serviceName = "test_service"
span := tracer.StartSpan("name_1").(*span)
child := tracer.StartSpan("name_2", ChildOf(span.context))
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.True(t, span.context.trace.kept)
})

t.Run("p0 are dropped when DropP0s flag is on", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
tracer.features.DropP0s = true
tracer.prioritySampling.defaultRate = 0
tracer.config.serviceName = "test_service"
span := tracer.StartSpan("name_1").(*span)
child := tracer.StartSpan("name_2", ChildOf(span.context))
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.False(t, span.context.trace.kept)
})

t.Run("p0 are kept if at least one span should be kept", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
tracer.features.DropP0s = true
tracer.prioritySampling.defaultRate = 0
tracer.config.serviceName = "test_service"
span := tracer.StartSpan("name_1").(*span)
child := tracer.StartSpan("name_2", ChildOf(span.context))
child.SetTag(ext.EventSampleRate, 1)
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.True(t, span.context.trace.kept)
})
}

func TestTracerRuntimeMetrics(t *testing.T) {
t.Run("on", func(t *testing.T) {
tp := new(testLogger)
Expand Down