Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddtrace/tracer: Don't drop trace if some spans must be kept #963

Merged
merged 12 commits into from
Jul 22, 2021
Merged
29 changes: 12 additions & 17 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func (s *span) finish(finishTime int64) {
}
s.finished = true

keep := true
if t, ok := internal.GetGlobalTracer().(*tracer); ok {
// we have an active tracer
feats := t.features.Load()
Expand All @@ -358,19 +359,12 @@ func (s *span) finish(finishTime int64) {
}
if feats.DropP0s {
// the agent supports dropping p0's in the client
if shouldDrop(s) {
// ...and this span can be dropped
atomic.AddUint64(&t.droppedP0Spans, 1)
if s == s.context.trace.root {
atomic.AddUint64(&t.droppedP0Traces, 1)
}
return
}
keep = shouldKeep(s)
}
}
if s.context.drop {
// not sampled by local sampler
return
if keep {
// a single kept span keeps the whole trace.
s.context.trace.keep()
gbbr marked this conversation as resolved.
Show resolved Hide resolved
}
s.context.finish()
}
Expand Down Expand Up @@ -401,20 +395,21 @@ func newAggregableSpan(s *span, cfg *config) *aggregableSpan {
}
}

// shouldDrop reports whether it's fine to drop the span s.
func shouldDrop(s *span) bool {
// shouldKeep reports whether the trace should be kept.
// a single span being kept implies the whole trace being kept.
func shouldKeep(s *span) bool {
if p, ok := s.context.samplingPriority(); ok && p > 0 {
// positive sampling priorities stay
return false
return true
}
if atomic.LoadInt64(&s.context.errors) > 0 {
// traces with any span containing an error get kept
return false
return true
}
if v, ok := s.Metrics[ext.EventSampleRate]; ok {
return !sampledByRate(s.TraceID, v)
return sampledByRate(s.TraceID, v)
}
return true
return false
}

// shouldComputeStats mentions whether this span needs to have stats computed for.
Expand Down
18 changes: 9 additions & 9 deletions ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,26 @@ func TestShouldDrop(t *testing.T) {
rate float64
want bool
}{
{1, 0, 0, false},
{2, 1, 0, false},
{0, 1, 0, false},
{0, 0, 1, false},
{0, 0, 0.5, false},
{0, 0, 0.00001, true},
{0, 0, 0, true},
{1, 0, 0, true},
{2, 1, 0, true},
{0, 1, 0, true},
{0, 0, 1, true},
{0, 0, 0.5, true},
{0, 0, 0.00001, false},
{0, 0, 0, false},
} {
t.Run("", func(t *testing.T) {
s := newSpan("", "", "", 1, 1, 0)
s.SetTag(ext.SamplingPriority, tt.prio)
s.SetTag(ext.EventSampleRate, tt.rate)
atomic.StoreInt64(&s.context.errors, tt.errors)
assert.Equal(t, shouldDrop(s), tt.want)
assert.Equal(t, shouldKeep(s), tt.want)
})
}

t.Run("none", func(t *testing.T) {
s := newSpan("", "", "", 1, 1, 0)
assert.Equal(t, shouldDrop(s), true)
assert.Equal(t, shouldKeep(s), false)
})
}

Expand Down
61 changes: 47 additions & 14 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)
Expand All @@ -25,7 +26,6 @@ type spanContext struct {

trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
drop bool // when true, the span will not be sent to the agent
errors int64 // number of spans with errors in this trace

// the below group should propagate cross-process
Expand All @@ -52,7 +52,6 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
}
if parent != nil {
context.trace = parent.trace
context.drop = parent.drop
context.origin = parent.origin
context.errors = parent.errors
parent.ForeachBaggageItem(func(k, v string) bool {
Expand Down Expand Up @@ -128,16 +127,30 @@ func (c *spanContext) baggageItem(key string) string {
// finish marks this span as finished in the trace.
func (c *spanContext) finish() { c.trace.finishedOne(c.span) }

// samplingDecision is the decision to send a trace to the agent or not.
type samplingDecision int64

const (
// decisionNone is the default state of a trace.
// If no decision is made about the trace, the trace won't be sent to the agent.
gbbr marked this conversation as resolved.
Show resolved Hide resolved
decisionNone samplingDecision = iota
// decisionDrop prevents the trace from being sent to the agent.
decisionDrop
// decisionKeep ensures the trace will be sent to the agent.
decisionKeep
)

// trace contains shared context information about a trace, such as sampling
// priority, the root reference and a buffer of the spans which are part of the
// trace, if these exist.
type trace struct {
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.

// root specifies the root of the trace, if known; it is nil when a span
// context is extracted from a carrier, at which point there are no spans in
Expand Down Expand Up @@ -179,6 +192,14 @@ func (t *trace) setSamplingPriority(p float64) {
t.setSamplingPriorityLocked(p)
}

func (t *trace) keep() {
atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionKeep))
}

func (t *trace) drop() {
atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionDrop))
}

func (t *trace) setSamplingPriorityLocked(p float64) {
if t.locked {
return
Expand Down Expand Up @@ -246,11 +267,23 @@ func (t *trace) finishedOne(s *span) {
if len(t.spans) != t.finished {
return
}
if tr, ok := internal.GetGlobalTracer().(*tracer); ok {
// we have a tracer that can receive completed traces.
tr.pushTrace(t.spans)
atomic.AddInt64(&tr.spansFinished, int64(len(t.spans)))
defer func() {
t.spans = nil
t.finished = 0 // important, because a buffer can be used for several flushes
}()
tr, ok := internal.GetGlobalTracer().(*tracer)
if !ok {
return
}
// we have a tracer that can receive completed traces.
atomic.AddInt64(&tr.spansFinished, int64(len(t.spans)))
sd := samplingDecision(atomic.LoadInt64((*int64)(&t.samplingDecision)))
if sd != decisionKeep {
if t.priority != nil && *(t.priority) == ext.PriorityAutoReject {
gbbr marked this conversation as resolved.
Show resolved Hide resolved
atomic.AddUint64(&tr.droppedP0Spans, uint64(len(t.spans)))
atomic.AddUint64(&tr.droppedP0Traces, 1)
}
return
}
t.spans = nil
t.finished = 0 // important, because a buffer can be used for several flushes
tr.pushTrace(t.spans)
}
15 changes: 10 additions & 5 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,8 @@ func TestSpanContextParent(t *testing.T) {
baggage: map[string]string{"A": "A", "B": "B"},
hasBaggage: 1,
trace: newTrace(),
drop: true,
},
"nil-trace": &spanContext{
drop: true,
},
"nil-trace": &spanContext{},
"priority": &spanContext{
baggage: map[string]string{"A": "A", "B": "B"},
hasBaggage: 1,
Expand All @@ -317,6 +314,14 @@ func TestSpanContextParent(t *testing.T) {
priority: func() *float64 { v := new(float64); *v = 2; return v }(),
},
},
"sampling_decision": &spanContext{
baggage: map[string]string{"A": "A", "B": "B"},
hasBaggage: 1,
trace: &trace{
spans: []*span{newBasicSpan("abc")},
samplingDecision: decisionKeep,
},
},
"origin": &spanContext{
trace: &trace{spans: []*span{newBasicSpan("abc")}},
origin: "synthetics",
Expand All @@ -334,8 +339,8 @@ func TestSpanContextParent(t *testing.T) {
assert.Contains(ctx.trace.spans, s)
if parentCtx.trace != nil {
assert.Equal(ctx.trace.priority, parentCtx.trace.priority)
assert.Equal(ctx.trace.samplingDecision, parentCtx.trace.samplingDecision)
}
assert.Equal(parentCtx.drop, ctx.drop)
assert.Equal(parentCtx.baggage, ctx.baggage)
assert.Equal(parentCtx.origin, ctx.origin)
})
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (t *tracer) sample(span *span) {
}
sampler := t.config.sampler
if !sampler.Sample(span) {
span.context.drop = true
span.context.trace.drop()
return
}
if rs, ok := sampler.(RateSampler); ok && rs.Rate() < 1 {
Expand Down
60 changes: 60 additions & 0 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,66 @@ func TestTracerStartSpan(t *testing.T) {
})
}

func TestSamplingDecision(t *testing.T) {
t.Run("sampled", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
tracer.prioritySampling.defaultRate = 0
tracer.config.serviceName = "test_service"
span := tracer.StartSpan("name_1").(*span)
child := tracer.StartSpan("name_2", ChildOf(span.context))
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.Equal(t, decisionKeep, span.context.trace.samplingDecision)
})

t.Run("dropped", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
tracer.features.DropP0s = true
tracer.prioritySampling.defaultRate = 0
tracer.config.serviceName = "test_service"
span := tracer.StartSpan("name_1").(*span)
child := tracer.StartSpan("name_2", ChildOf(span.context))
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.Equal(t, decisionNone, span.context.trace.samplingDecision)
})

t.Run("events_sampled", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
tracer.features.DropP0s = true
tracer.prioritySampling.defaultRate = 0
tracer.config.serviceName = "test_service"
span := tracer.StartSpan("name_1").(*span)
child := tracer.StartSpan("name_2", ChildOf(span.context))
child.SetTag(ext.EventSampleRate, 1)
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.Equal(t, decisionKeep, span.context.trace.samplingDecision)
})

t.Run("client_dropped", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
tracer.features.DropP0s = true
tracer.config.sampler = NewRateSampler(0)
tracer.prioritySampling.defaultRate = 0
tracer.config.serviceName = "test_service"
span := tracer.StartSpan("name_1").(*span)
child := tracer.StartSpan("name_2", ChildOf(span.context))
child.SetTag(ext.EventSampleRate, 1)
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.Equal(t, decisionDrop, span.context.trace.samplingDecision)
})
}

func TestTracerRuntimeMetrics(t *testing.T) {
t.Run("on", func(t *testing.T) {
tp := new(testLogger)
Expand Down