diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index f74232ad0d..c789fadbbb 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -118,9 +118,13 @@ type config struct { // statsd is used for tracking metrics associated with the runtime and the tracer. statsd statsdClient - // samplingRules contains user-defined rules determine the sampling rate to apply - // to spans. - samplingRules []SamplingRule + // traceSamplingRules contains user-defined rules determine the sampling rate to apply + // to trace spans. + traceSamplingRules []SamplingRule + + // spanSamplingRules contains user-defined rules determine the sampling rate to apply + // to single spans, regardless of the trace sampling decision. + spanSamplingRules []SamplingRule // tickChan specifies a channel which will receive the time every time the tracer must flush. // It defaults to time.Ticker; replaced in tests. @@ -654,11 +658,13 @@ func WithDogstatsdAddress(addr string) StartOption { } } -// WithSamplingRules specifies the sampling rates to apply to spans based on the +// WithSamplingRules specifies the sampling rates to apply to trace spans based on the // provided rules. +// todo(shevchenko): there might be a need to add an analogous option for +// single span sampling rules func WithSamplingRules(rules []SamplingRule) StartOption { return func(cfg *config) { - cfg.samplingRules = rules + cfg.traceSamplingRules = rules } } diff --git a/ddtrace/tracer/sampler.go b/ddtrace/tracer/sampler.go index b0ee284ea5..0e624a66a1 100644 --- a/ddtrace/tracer/sampler.go +++ b/ddtrace/tracer/sampler.go @@ -196,18 +196,32 @@ func samplingRulesFromEnv() ([]SamplingRule, error) { if err != nil { errs = append(errs, err.Error()) } - traceRules, err := processSamplingRulesFromEnv([]byte(os.Getenv("DD_TRACE_SAMPLING_RULES")), TraceSamplingRuleType) + traceRules, err := traceSamplingRulesFromEnv() if err != nil { errs = append(errs, err.Error()) } - - rules := append(spanRules, traceRules...) + rules := []SamplingRule{} + if traceRules != nil { + rules = traceRules + } + if spanRules != nil { + rules = append(rules, spanRules...) + } if len(errs) != 0 { return rules, fmt.Errorf("%s", strings.Join(errs, "\n\t")) } return rules, nil } +// traceSamplingRulesFromEnv parses sampling rules from the DD_TRACE_SAMPLING_RULES environment variable. +func traceSamplingRulesFromEnv() ([]SamplingRule, error) { + rulesFromEnv := os.Getenv("DD_TRACE_SAMPLING_RULES") + if rulesFromEnv == "" { + return nil, nil + } + return processSamplingRulesFromEnv([]byte(rulesFromEnv), TraceSamplingRuleType) +} + // spanSamplingRulesFromEnv parses sampling rules from the DD_SPAN_SAMPLING_RULES and // DD_SPAN_SAMPLING_RULES_FILE environment variables. func spanSamplingRulesFromEnv() ([]SamplingRule, error) { @@ -286,6 +300,7 @@ func processSamplingRulesFromEnv(b []byte, ruleType SamplingRuleType) ([]Samplin Rate: rate, MaxPerSecond: v.MaxPerSecond, Type: SingleSpanSamplingType, + limiter: newSingleSpanRateLimiter(v.MaxPerSecond), }) continue } @@ -369,13 +384,6 @@ func (rs *rulesSampler) apply(span *span) bool { if rule.match(span) { matched = true rate = rule.Rate - if rule.Type == SingleSpanSamplingType { - span.setMetric(ext.SpanSamplingMechanism, ext.SingleSpanSamplingMechanism) - span.setMetric(ext.SingleSpanSamplingRuleRate, rule.Rate) - if rule.MaxPerSecond != 0 { - span.setMetric(ext.SingleSpanSamplingMPS, rule.MaxPerSecond) - } - } break } } @@ -433,6 +441,7 @@ type SamplingRule struct { exactService string exactName string + limiter *rateLimiter } // ServiceRule returns a SamplingRule that applies the provided sampling rate @@ -488,9 +497,10 @@ func (sr *SamplingRule) match(s *span) bool { // MarshalJSON implements the json.Marshaler interface. func (sr *SamplingRule) MarshalJSON() ([]byte, error) { s := struct { - Service string `json:"service"` - Name string `json:"name"` - Rate float64 `json:"sample_rate"` + Service string `json:"service"` + Name string `json:"name"` + Rate float64 `json:"sample_rate"` + MaxPerSecond *float64 `json:"max_per_second,omitempty"` }{} if sr.exactService != "" { s.Service = sr.exactService @@ -503,6 +513,9 @@ func (sr *SamplingRule) MarshalJSON() ([]byte, error) { s.Name = fmt.Sprintf("%s", sr.Name) } s.Rate = sr.Rate + if sr.MaxPerSecond != 0 { + s.MaxPerSecond = &sr.MaxPerSecond + } return json.Marshal(&s) } diff --git a/ddtrace/tracer/sampler_test.go b/ddtrace/tracer/sampler_test.go index 3b22080495..6a74d7d3b5 100644 --- a/ddtrace/tracer/sampler_test.go +++ b/ddtrace/tracer/sampler_test.go @@ -434,7 +434,7 @@ func TestRulesSampler(t *testing.T) { rules, _ := samplingRulesFromEnv() assert := assert.New(t) - rs := newRulesSampler(rules) + rs := newSingleSpanRulesSampler(rules) span := makeSpan(tt.spanName, tt.spanSrv) result := rs.apply(span) diff --git a/ddtrace/tracer/single_sampler.go b/ddtrace/tracer/single_sampler.go new file mode 100644 index 0000000000..f8528ad466 --- /dev/null +++ b/ddtrace/tracer/single_sampler.go @@ -0,0 +1,88 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracer + +import ( + "golang.org/x/time/rate" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" + "math" + "time" +) + +// singleSpanRulesSampler allows a user-defined list of rules to apply to spans +// to sample single spans. +// These rules match based on the span's Service and Name. If empty value is supplied +// to either Service or Name field, it will default to "*", allow all +// When making a sampling decision, the rules are checked in order until +// a match is found. +// If a match is found, the rate from that rule is used. +// If no match is found, no changes or further sampling is applied to the spans. +// The rate is used to determine if the span should be sampled, but an upper +// limit can be defined using the max_per_second field when supplying the rule. +// If such value is absent in the rule, the default is allow all. +// Its value is the max number of spans to sample per second. +// Spans that matched the rules but exceeded the rate limit are not sampled. +type singleSpanRulesSampler struct { + rules []SamplingRule // the rules to match spans with +} + +// newRulesSampler configures a *rulesSampler instance using the given set of rules. +// Invalid rules or environment variable values are tolerated, by logging warnings and then ignoring them. +func newSingleSpanRulesSampler(rules []SamplingRule) *singleSpanRulesSampler { + return &singleSpanRulesSampler{ + rules: rules, + } +} + +// apply uses the sampling rules to determine the sampling rate for the +// provided span. If the rules don't match, then it returns false and the span is not +// modified. +func (rs *singleSpanRulesSampler) apply(span *span) bool { + var matched bool + for _, rule := range rs.rules { + if rule.match(span) { + matched = true + rs.applyRate(span, rule, rule.Rate, time.Now()) + break + } + } + return matched +} + +func (rs *singleSpanRulesSampler) applyRate(span *span, rule SamplingRule, rate float64, now time.Time) { + span.SetTag(keyRulesSamplerAppliedRate, rate) + if !sampledByRate(span.SpanID, rate) { + span.setSamplingPriority(ext.PriorityUserReject, samplernames.RuleRate, rate) + return + } + + sampled := true + if rule.limiter != nil { + sampled, rate = rule.limiter.allowOne(now) + if !sampled { + return + } + } + span.setMetric(ext.SpanSamplingMechanism, ext.SingleSpanSamplingMechanism) + span.setMetric(ext.SingleSpanSamplingRuleRate, rate) + if rule.MaxPerSecond != 0 { + span.setMetric(ext.SingleSpanSamplingMPS, rule.MaxPerSecond) + } +} + +// newRateLimiter returns a rate limiter which restricts the number of single spans sampled per second. +// This defaults to infinite, allow all behaviour. The MaxPerSecond value of the rule may override the default. +func newSingleSpanRateLimiter(mps float64) *rateLimiter { + limit := math.MaxFloat64 + if mps > 0 { + limit = mps + } + return &rateLimiter{ + limiter: rate.NewLimiter(rate.Limit(limit), int(math.Ceil(limit))), + prevTime: time.Now(), + } +} diff --git a/ddtrace/tracer/spancontext.go b/ddtrace/tracer/spancontext.go index 0349ef1618..ab32f76b23 100644 --- a/ddtrace/tracer/spancontext.go +++ b/ddtrace/tracer/spancontext.go @@ -262,7 +262,7 @@ func (t *trace) push(sp *span) { } } -// finishedOne aknowledges that another span in the trace has finished, and checks +// finishedOne acknowledges that another span in the trace has finished, and checks // if the trace is complete, in which case it calls the onFinish function. It uses // the given priority, if non-nil, to mark the root span. func (t *trace) finishedOne(s *span) { diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index 7edeafd3e8..bfd90d8ab9 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -79,6 +79,12 @@ type tracer struct { // or operation name. rulesSampling *rulesSampler + // singleSpanRulesSampling holds an instance of the single span rules sampler. These are user-defined + // rules for applying a sampling rate to spans that match the designated service + // or operation name. Such spans are sampled if trace sampling decision is 'drop' and + // may be sent separately + singleSpanRulesSampling *singleSpanRulesSampler + // obfuscator holds the obfuscator used to obfuscate resources in aggregated stats. // obfuscator may be nil if disabled. obfuscator *obfuscate.Obfuscator @@ -176,12 +182,19 @@ const payloadQueueSize = 1000 func newUnstartedTracer(opts ...StartOption) *tracer { c := newConfig(opts...) - envRules, err := samplingRulesFromEnv() + traceRules, err := traceSamplingRulesFromEnv() + if err != nil { + log.Warn("DIAGNOSTICS Error(s) parsing DD_TRACE_SAMPLING_RULES: found errors: %s", err) + } + if traceRules != nil { + c.traceSamplingRules = traceRules + } + spanRules, err := spanSamplingRulesFromEnv() if err != nil { - log.Warn("DIAGNOSTICS Error(s) parsing sampling rules: found errors: %s", err) + log.Warn("DIAGNOSTICS Error(s) parsing DD_SPAN_SAMPLING_RULES: found errors: %s", err) } - if envRules != nil { - c.samplingRules = envRules + if spanRules != nil { + c.spanSamplingRules = spanRules } sampler := newPrioritySampler() var writer traceWriter @@ -191,15 +204,16 @@ func newUnstartedTracer(opts ...StartOption) *tracer { writer = newAgentTraceWriter(c, sampler) } t := &tracer{ - config: c, - traceWriter: writer, - out: make(chan []*span, payloadQueueSize), - stop: make(chan struct{}), - flush: make(chan chan<- struct{}), - rulesSampling: newRulesSampler(c.samplingRules), - prioritySampling: sampler, - pid: strconv.Itoa(os.Getpid()), - stats: newConcentrator(c, defaultStatsBucketSize), + config: c, + traceWriter: writer, + out: make(chan []*span, payloadQueueSize), + stop: make(chan struct{}), + flush: make(chan chan<- struct{}), + rulesSampling: newRulesSampler(c.traceSamplingRules), + singleSpanRulesSampling: newSingleSpanRulesSampler(c.spanSamplingRules), + prioritySampling: sampler, + pid: strconv.Itoa(os.Getpid()), + stats: newConcentrator(c, defaultStatsBucketSize), obfuscator: obfuscate.NewObfuscator(obfuscate.Config{ SQL: obfuscate.SQLConfig{ TableNames: c.agent.HasFlag("table_names"),