Skip to content

Commit

Permalink
pkg/tracer: added single span sampler
Browse files Browse the repository at this point in the history
  • Loading branch information
dianashevchenko committed Jun 29, 2022
1 parent d8fa045 commit 1f9b759
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 33 deletions.
16 changes: 11 additions & 5 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,13 @@ type config struct {
// statsd is used for tracking metrics associated with the runtime and the tracer.
statsd statsdClient

// samplingRules contains user-defined rules determine the sampling rate to apply
// to spans.
samplingRules []SamplingRule
// traceSamplingRules contains user-defined rules determine the sampling rate to apply
// to trace spans.
traceSamplingRules []SamplingRule

// spanSamplingRules contains user-defined rules determine the sampling rate to apply
// to single spans, regardless of the trace sampling decision.
spanSamplingRules []SamplingRule

// tickChan specifies a channel which will receive the time every time the tracer must flush.
// It defaults to time.Ticker; replaced in tests.
Expand Down Expand Up @@ -654,11 +658,13 @@ func WithDogstatsdAddress(addr string) StartOption {
}
}

// WithSamplingRules specifies the sampling rates to apply to spans based on the
// WithSamplingRules specifies the sampling rates to apply to trace spans based on the
// provided rules.
// todo(shevchenko): there might be a need to add an analogous option for
// single span sampling rules
func WithSamplingRules(rules []SamplingRule) StartOption {
return func(cfg *config) {
cfg.samplingRules = rules
cfg.traceSamplingRules = rules
}
}

Expand Down
39 changes: 26 additions & 13 deletions ddtrace/tracer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,32 @@ func samplingRulesFromEnv() ([]SamplingRule, error) {
if err != nil {
errs = append(errs, err.Error())
}
traceRules, err := processSamplingRulesFromEnv([]byte(os.Getenv("DD_TRACE_SAMPLING_RULES")), TraceSamplingRuleType)
traceRules, err := traceSamplingRulesFromEnv()
if err != nil {
errs = append(errs, err.Error())
}

rules := append(spanRules, traceRules...)
rules := []SamplingRule{}
if traceRules != nil {
rules = traceRules
}
if spanRules != nil {
rules = append(rules, spanRules...)
}
if len(errs) != 0 {
return rules, fmt.Errorf("%s", strings.Join(errs, "\n\t"))
}
return rules, nil
}

// traceSamplingRulesFromEnv parses sampling rules from the DD_TRACE_SAMPLING_RULES environment variable.
func traceSamplingRulesFromEnv() ([]SamplingRule, error) {
rulesFromEnv := os.Getenv("DD_TRACE_SAMPLING_RULES")
if rulesFromEnv == "" {
return nil, nil
}
return processSamplingRulesFromEnv([]byte(rulesFromEnv), TraceSamplingRuleType)
}

// spanSamplingRulesFromEnv parses sampling rules from the DD_SPAN_SAMPLING_RULES and
// DD_SPAN_SAMPLING_RULES_FILE environment variables.
func spanSamplingRulesFromEnv() ([]SamplingRule, error) {
Expand Down Expand Up @@ -286,6 +300,7 @@ func processSamplingRulesFromEnv(b []byte, ruleType SamplingRuleType) ([]Samplin
Rate: rate,
MaxPerSecond: v.MaxPerSecond,
Type: SingleSpanSamplingType,
limiter: newSingleSpanRateLimiter(v.MaxPerSecond),
})
continue
}
Expand Down Expand Up @@ -369,13 +384,6 @@ func (rs *rulesSampler) apply(span *span) bool {
if rule.match(span) {
matched = true
rate = rule.Rate
if rule.Type == SingleSpanSamplingType {
span.setMetric(ext.SpanSamplingMechanism, ext.SingleSpanSamplingMechanism)
span.setMetric(ext.SingleSpanSamplingRuleRate, rule.Rate)
if rule.MaxPerSecond != 0 {
span.setMetric(ext.SingleSpanSamplingMPS, rule.MaxPerSecond)
}
}
break
}
}
Expand Down Expand Up @@ -433,6 +441,7 @@ type SamplingRule struct {

exactService string
exactName string
limiter *rateLimiter
}

// ServiceRule returns a SamplingRule that applies the provided sampling rate
Expand Down Expand Up @@ -488,9 +497,10 @@ func (sr *SamplingRule) match(s *span) bool {
// MarshalJSON implements the json.Marshaler interface.
func (sr *SamplingRule) MarshalJSON() ([]byte, error) {
s := struct {
Service string `json:"service"`
Name string `json:"name"`
Rate float64 `json:"sample_rate"`
Service string `json:"service"`
Name string `json:"name"`
Rate float64 `json:"sample_rate"`
MaxPerSecond *float64 `json:"max_per_second,omitempty"`
}{}
if sr.exactService != "" {
s.Service = sr.exactService
Expand All @@ -503,6 +513,9 @@ func (sr *SamplingRule) MarshalJSON() ([]byte, error) {
s.Name = fmt.Sprintf("%s", sr.Name)
}
s.Rate = sr.Rate
if sr.MaxPerSecond != 0 {
s.MaxPerSecond = &sr.MaxPerSecond
}
return json.Marshal(&s)
}

Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func TestRulesSampler(t *testing.T) {
rules, _ := samplingRulesFromEnv()

assert := assert.New(t)
rs := newRulesSampler(rules)
rs := newSingleSpanRulesSampler(rules)

span := makeSpan(tt.spanName, tt.spanSrv)
result := rs.apply(span)
Expand Down
88 changes: 88 additions & 0 deletions ddtrace/tracer/single_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracer

import (
"golang.org/x/time/rate"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
"math"
"time"
)

// singleSpanRulesSampler allows a user-defined list of rules to apply to spans
// to sample single spans.
// These rules match based on the span's Service and Name. If empty value is supplied
// to either Service or Name field, it will default to "*", allow all
// When making a sampling decision, the rules are checked in order until
// a match is found.
// If a match is found, the rate from that rule is used.
// If no match is found, no changes or further sampling is applied to the spans.
// The rate is used to determine if the span should be sampled, but an upper
// limit can be defined using the max_per_second field when supplying the rule.
// If such value is absent in the rule, the default is allow all.
// Its value is the max number of spans to sample per second.
// Spans that matched the rules but exceeded the rate limit are not sampled.
type singleSpanRulesSampler struct {
rules []SamplingRule // the rules to match spans with
}

// newRulesSampler configures a *rulesSampler instance using the given set of rules.
// Invalid rules or environment variable values are tolerated, by logging warnings and then ignoring them.
func newSingleSpanRulesSampler(rules []SamplingRule) *singleSpanRulesSampler {
return &singleSpanRulesSampler{
rules: rules,
}
}

// apply uses the sampling rules to determine the sampling rate for the
// provided span. If the rules don't match, then it returns false and the span is not
// modified.
func (rs *singleSpanRulesSampler) apply(span *span) bool {
var matched bool
for _, rule := range rs.rules {
if rule.match(span) {
matched = true
rs.applyRate(span, rule, rule.Rate, time.Now())
break
}
}
return matched
}

func (rs *singleSpanRulesSampler) applyRate(span *span, rule SamplingRule, rate float64, now time.Time) {
span.SetTag(keyRulesSamplerAppliedRate, rate)
if !sampledByRate(span.SpanID, rate) {
span.setSamplingPriority(ext.PriorityUserReject, samplernames.RuleRate, rate)
return
}

sampled := true
if rule.limiter != nil {
sampled, rate = rule.limiter.allowOne(now)
if !sampled {
return
}
}
span.setMetric(ext.SpanSamplingMechanism, ext.SingleSpanSamplingMechanism)
span.setMetric(ext.SingleSpanSamplingRuleRate, rate)
if rule.MaxPerSecond != 0 {
span.setMetric(ext.SingleSpanSamplingMPS, rule.MaxPerSecond)
}
}

// newRateLimiter returns a rate limiter which restricts the number of single spans sampled per second.
// This defaults to infinite, allow all behaviour. The MaxPerSecond value of the rule may override the default.
func newSingleSpanRateLimiter(mps float64) *rateLimiter {
limit := math.MaxFloat64
if mps > 0 {
limit = mps
}
return &rateLimiter{
limiter: rate.NewLimiter(rate.Limit(limit), int(math.Ceil(limit))),
prevTime: time.Now(),
}
}
2 changes: 1 addition & 1 deletion ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (t *trace) push(sp *span) {
}
}

// finishedOne aknowledges that another span in the trace has finished, and checks
// finishedOne acknowledges that another span in the trace has finished, and checks
// if the trace is complete, in which case it calls the onFinish function. It uses
// the given priority, if non-nil, to mark the root span.
func (t *trace) finishedOne(s *span) {
Expand Down
40 changes: 27 additions & 13 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ type tracer struct {
// or operation name.
rulesSampling *rulesSampler

// singleSpanRulesSampling holds an instance of the single span rules sampler. These are user-defined
// rules for applying a sampling rate to spans that match the designated service
// or operation name. Such spans are sampled if trace sampling decision is 'drop' and
// may be sent separately
singleSpanRulesSampling *singleSpanRulesSampler

// obfuscator holds the obfuscator used to obfuscate resources in aggregated stats.
// obfuscator may be nil if disabled.
obfuscator *obfuscate.Obfuscator
Expand Down Expand Up @@ -176,12 +182,19 @@ const payloadQueueSize = 1000

func newUnstartedTracer(opts ...StartOption) *tracer {
c := newConfig(opts...)
envRules, err := samplingRulesFromEnv()
traceRules, err := traceSamplingRulesFromEnv()
if err != nil {
log.Warn("DIAGNOSTICS Error(s) parsing DD_TRACE_SAMPLING_RULES: found errors: %s", err)
}
if traceRules != nil {
c.traceSamplingRules = traceRules
}
spanRules, err := spanSamplingRulesFromEnv()
if err != nil {
log.Warn("DIAGNOSTICS Error(s) parsing sampling rules: found errors: %s", err)
log.Warn("DIAGNOSTICS Error(s) parsing DD_SPAN_SAMPLING_RULES: found errors: %s", err)
}
if envRules != nil {
c.samplingRules = envRules
if spanRules != nil {
c.spanSamplingRules = spanRules
}
sampler := newPrioritySampler()
var writer traceWriter
Expand All @@ -191,15 +204,16 @@ func newUnstartedTracer(opts ...StartOption) *tracer {
writer = newAgentTraceWriter(c, sampler)
}
t := &tracer{
config: c,
traceWriter: writer,
out: make(chan []*span, payloadQueueSize),
stop: make(chan struct{}),
flush: make(chan chan<- struct{}),
rulesSampling: newRulesSampler(c.samplingRules),
prioritySampling: sampler,
pid: strconv.Itoa(os.Getpid()),
stats: newConcentrator(c, defaultStatsBucketSize),
config: c,
traceWriter: writer,
out: make(chan []*span, payloadQueueSize),
stop: make(chan struct{}),
flush: make(chan chan<- struct{}),
rulesSampling: newRulesSampler(c.traceSamplingRules),
singleSpanRulesSampling: newSingleSpanRulesSampler(c.spanSamplingRules),
prioritySampling: sampler,
pid: strconv.Itoa(os.Getpid()),
stats: newConcentrator(c, defaultStatsBucketSize),
obfuscator: obfuscate.NewObfuscator(obfuscate.Config{
SQL: obfuscate.SQLConfig{
TableNames: c.agent.HasFlag("table_names"),
Expand Down

0 comments on commit 1f9b759

Please sign in to comment.