Skip to content

Commit

Permalink
ddtrace/tracer: Add sampling decision in trace context
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Jul 16, 2021
1 parent 12f9ed5 commit 49851a7
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 30 deletions.
4 changes: 0 additions & 4 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,6 @@ func (s *span) finish(finishTime int64) {
keep = shouldKeep(s)
}
}
if s.context.drop {
// client sampling enabled. This will lead to inexact APM metrics but reduces performance impact.
keep = false
}
if keep {
// a single kept span keeps the whole trace.
s.context.trace.keep()
Expand Down
52 changes: 36 additions & 16 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type spanContext struct {

trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
drop bool // reports whether this span was dropped by the rate sampler
errors int64 // number of spans with errors in this trace

// the below group should propagate cross-process
Expand All @@ -52,7 +51,6 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
}
if parent != nil {
context.trace = parent.trace
context.drop = parent.drop
context.origin = parent.origin
context.errors = parent.errors
parent.ForeachBaggageItem(func(k, v string) bool {
Expand Down Expand Up @@ -128,17 +126,30 @@ func (c *spanContext) baggageItem(key string) string {
// finish marks this span as finished in the trace.
func (c *spanContext) finish() { c.trace.finishedOne(c.span) }

// samplingDecision is the decision to send a trace to the agent or not.
type samplingDecision int

const (
// decisionDefaultDrop is the default state of a trace.
// If no other decision is made about the trace, the trace won't be sent to the agent.
decisionDefaultDrop samplingDecision = iota
// decisionForceDrop prevents the trace from being sent to the agent.
decisionForceDrop
// decisionForceKeep ensures the trace will be sent to the agent.
decisionForceKeep
)

// trace contains shared context information about a trace, such as sampling
// priority, the root reference and a buffer of the spans which are part of the
// trace, if these exist.
type trace struct {
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
kept bool // kept indicates whether to send the trace to the agent or no.
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.

// root specifies the root of the trace, if known; it is nil when a span
// context is extracted from a carrier, at which point there are no spans in
Expand Down Expand Up @@ -183,7 +194,17 @@ func (t *trace) setSamplingPriority(p float64) {
func (t *trace) keep() {
t.mu.Lock()
defer t.mu.Unlock()
t.kept = true
if t.samplingDecision == decisionDefaultDrop {
t.samplingDecision = decisionForceKeep
}
}

func (t *trace) drop() {
t.mu.Lock()
defer t.mu.Unlock()
if t.samplingDecision == decisionDefaultDrop {
t.samplingDecision = decisionForceDrop
}
}

func (t *trace) setSamplingPriorityLocked(p float64) {
Expand Down Expand Up @@ -263,13 +284,12 @@ func (t *trace) finishedOne(s *span) {
}
// we have a tracer that can receive completed traces.
atomic.AddInt64(&tr.spansFinished, int64(len(t.spans)))
if !t.kept {
if !s.context.drop {
atomic.AddUint64(&tr.droppedP0Spans, uint64(len(t.spans)))
atomic.AddUint64(&tr.droppedP0Traces, 1)
}
if t.samplingDecision == decisionDefaultDrop {
atomic.AddUint64(&tr.droppedP0Spans, uint64(len(t.spans)))
atomic.AddUint64(&tr.droppedP0Traces, 1)
}
if t.samplingDecision != decisionForceKeep {
return
}
// we assume that sampler.Sample returns the same for all spans of a trace.
tr.pushTrace(t.spans)
}
15 changes: 10 additions & 5 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,8 @@ func TestSpanContextParent(t *testing.T) {
baggage: map[string]string{"A": "A", "B": "B"},
hasBaggage: 1,
trace: newTrace(),
drop: true,
},
"nil-trace": &spanContext{
drop: true,
},
"nil-trace": &spanContext{},
"priority": &spanContext{
baggage: map[string]string{"A": "A", "B": "B"},
hasBaggage: 1,
Expand All @@ -317,6 +314,14 @@ func TestSpanContextParent(t *testing.T) {
priority: func() *float64 { v := new(float64); *v = 2; return v }(),
},
},
"sampling_decision": &spanContext{
baggage: map[string]string{"A": "A", "B": "B"},
hasBaggage: 1,
trace: &trace{
spans: []*span{newBasicSpan("abc")},
samplingDecision: decisionForceKeep,
},
},
"origin": &spanContext{
trace: &trace{spans: []*span{newBasicSpan("abc")}},
origin: "synthetics",
Expand All @@ -334,8 +339,8 @@ func TestSpanContextParent(t *testing.T) {
assert.Contains(ctx.trace.spans, s)
if parentCtx.trace != nil {
assert.Equal(ctx.trace.priority, parentCtx.trace.priority)
assert.Equal(ctx.trace.samplingDecision, parentCtx.trace.samplingDecision)
}
assert.Equal(parentCtx.drop, ctx.drop)
assert.Equal(parentCtx.baggage, ctx.baggage)
assert.Equal(parentCtx.origin, ctx.origin)
})
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func (t *tracer) sample(span *span) {
}
sampler := t.config.sampler
if !sampler.Sample(span) {
span.context.drop = true
span.context.trace.drop()
return
}
if rs, ok := sampler.(RateSampler); ok && rs.Rate() < 1 {
Expand Down
24 changes: 20 additions & 4 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestTracerStartSpan(t *testing.T) {
})
}

func TestP0Dropping(t *testing.T) {
func TestSamplingDecision(t *testing.T) {
t.Run("sampled", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
Expand All @@ -249,7 +249,7 @@ func TestP0Dropping(t *testing.T) {
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.True(t, span.context.trace.kept)
assert.Equal(t, decisionForceKeep, span.context.trace.samplingDecision)
})

t.Run("dropped", func(t *testing.T) {
Expand All @@ -263,7 +263,7 @@ func TestP0Dropping(t *testing.T) {
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.False(t, span.context.trace.kept)
assert.Equal(t, decisionDefaultDrop, span.context.trace.samplingDecision)
})

t.Run("events_sampled", func(t *testing.T) {
Expand All @@ -278,7 +278,23 @@ func TestP0Dropping(t *testing.T) {
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.True(t, span.context.trace.kept)
assert.Equal(t, decisionForceKeep, span.context.trace.samplingDecision)
})

t.Run("client_dropped", func(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
tracer.features.DropP0s = true
tracer.config.sampler = NewRateSampler(0)
tracer.prioritySampling.defaultRate = 0
tracer.config.serviceName = "test_service"
span := tracer.StartSpan("name_1").(*span)
child := tracer.StartSpan("name_2", ChildOf(span.context))
child.SetTag(ext.EventSampleRate, 1)
child.Finish()
span.Finish()
assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority])
assert.Equal(t, decisionForceDrop, span.context.trace.samplingDecision)
})
}

Expand Down

0 comments on commit 49851a7

Please sign in to comment.