Skip to content

Commit

Permalink
ddtrace/tracer: add support for agent discovery endpoint, feature fla…
Browse files Browse the repository at this point in the history
…gs, stats & drops (#859)

This change adds a multitude of new features, such as support for feature flags, agent discovery, client-side stats computations and enabling the dropping of P0 traces in the client.

### Feature flags

Support for feature flags by means of:
* `WithFeatureFlags` tracer start option.
* `DD_TRACE_FEATURES` environment variable which takes a list of comma or space separated flag tokens.

### Agent discovery

When the feature flag `discovery` is set, support for detecting features by means of the new agent (7.27.0) discovery endpoint added in DataDog/datadog-agent#7344 and DataDog/datadog-agent#7495 becomes enabled.

### Client-computed stats and dropping p0's

When `discovery` is enabled, if the agent supports client-computed stats and dropping p0's in the client, the tracer can now do this by means of the newly added concentrator and drop logic.

### TODO

* Obfuscation is not yet supported in the client but will be added in a subsequent PR by factoring out the obfuscator code from the agent into a separate package to be shared with this module.
  • Loading branch information
gbbr authored Apr 28, 2021
1 parent 2f86ec3 commit b74a5df
Show file tree
Hide file tree
Showing 17 changed files with 1,411 additions and 59 deletions.
27 changes: 24 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,34 @@ jobs:
command: dockerize -wait http://localhost:8500 -timeout 1m

- run:
name: Testing
name: Testing integrations
command: |
INTEGRATION=1 go test -v -race -coverprofile=coverage.txt -covermode=atomic `go list ./contrib/... | grep -v -e grpc.v12 -e google.golang.org/api`
- run:
name: Testing outlier google.golang.org/api
command: |
go get google.golang.org/grpc@v1.29.0 # https://github.com/grpc/grpc-go/issues/3726
go test -v ./contrib/google.golang.org/api/...
go get google.golang.org/grpc@v1.2.0
go test -v ./contrib/google.golang.org/grpc.v12/...
- run:
name: Testing outlier gRPC v1.2
command: |
# This hacky approach is necessary because running the tests regularly
# do not allow using grpc-go@v1.2.0 alongside sketches-go@v1.0.0
go mod vendor
# Checkout grpc-go@v1.2.0
cd vendor/google.golang.org && rm -rf grpc
git clone git@github.com:grpc/grpc-go grpc && cd grpc
git fetch origin && git checkout v1.2.0 && cd ../..
# Checkout sketches-go@v1.0.0
cd vendor/github.com/DataDog && rm -rf sketches-go
git clone git@github.com:DataDog/sketches-go && cd sketches-go
git fetch origin && git checkout v1.0.0 && cd ../..
go test -mod=vendor -v ./contrib/google.golang.org/grpc.v12/...
- run:
name: Upload coverage report to Codecov
Expand Down
9 changes: 6 additions & 3 deletions ddtrace/internal/globaltracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ var (
// SetGlobalTracer sets the global tracer to t.
func SetGlobalTracer(t ddtrace.Tracer) {
mu.Lock()
defer mu.Unlock()
old := globalTracer
globalTracer = t
// Unlock before potentially calling Stop, to allow any shutdown mechanism
// to retrieve the active tracer without causing a deadlock on mutex mu.
mu.Unlock()
if !Testing {
// avoid infinite loop when calling (*mocktracer.Tracer).Stop
globalTracer.Stop()
old.Stop()
}
globalTracer = t
}

// GetGlobalTracer returns the currently active tracer.
Expand Down
10 changes: 6 additions & 4 deletions ddtrace/tracer/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type startupInfo struct {
Architecture string `json:"architecture"` // Architecture of host machine
GlobalService string `json:"global_service"` // Global service string. If not-nil should be same as Service. (#614)
LambdaMode string `json:"lambda_mode"` // Whether or not the client has enabled lambda mode
AgentFeatures agentFeatures `json:"agent_features"` // Lists the capabilities of the agent.
}

// checkEndpoint tries to connect to the URL specified by endpoint.
Expand All @@ -70,7 +71,7 @@ func checkEndpoint(endpoint string) error {
// JSON format.
func logStartup(t *tracer) {
tags := make(map[string]string)
for k, v := range t.globalTags {
for k, v := range t.config.globalTags {
tags[k] = fmt.Sprintf("%v", v)
}

Expand All @@ -83,7 +84,7 @@ func logStartup(t *tracer) {
LangVersion: runtime.Version(),
Env: t.config.env,
Service: t.config.serviceName,
AgentURL: t.transport.endpoint(),
AgentURL: t.config.transport.endpoint(),
Debug: t.config.debug,
AnalyticsEnabled: !math.IsNaN(globalconfig.AnalyticsRate()),
SampleRate: fmt.Sprintf("%f", t.rulesSampling.globalRate),
Expand All @@ -95,14 +96,15 @@ func logStartup(t *tracer) {
Architecture: runtime.GOARCH,
GlobalService: globalconfig.ServiceName(),
LambdaMode: fmt.Sprintf("%t", t.config.logToStdout),
AgentFeatures: t.features.Load(),
}
if _, err := samplingRulesFromEnv(); err != nil {
info.SamplingRulesError = fmt.Sprintf("%s", err)
}
if !t.config.logToStdout {
if err := checkEndpoint(t.transport.endpoint()); err != nil {
if err := checkEndpoint(t.config.transport.endpoint()); err != nil {
info.AgentError = fmt.Sprintf("%s", err)
log.Warn("DIAGNOSTICS Unable to reach agent: %s", err)
log.Warn("DIAGNOSTICS Unable to reach agent intake: %s", err)
}
}
bs, err := json.Marshal(info)
Expand Down
10 changes: 5 additions & 5 deletions ddtrace/tracer/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestStartupLog(t *testing.T) {
tp.Reset()
logStartup(tracer)
assert.Len(tp.Lines(), 2)
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sampling_rules":null,"sampling_rules_error":"","tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false"}`, tp.Lines()[1])
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sampling_rules":null,"sampling_rules_error":"","tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","agent_features":{"DropP0s":false,"V05":false,"Stats":false}}`, tp.Lines()[1])
})

t.Run("configured", func(t *testing.T) {
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestStartupLog(t *testing.T) {
tp.Reset()
logStartup(tracer)
assert.Len(tp.Lines(), 2)
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sampling_rules":\[{"service":"mysql","name":"","sample_rate":0\.75}\],"sampling_rules_error":"","tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false"}`, tp.Lines()[1])
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sampling_rules":\[{"service":"mysql","name":"","sample_rate":0\.75}\],"sampling_rules_error":"","tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","agent_features":{"DropP0s":false,"V05":false,"Stats":false}}`, tp.Lines()[1])
})

t.Run("errors", func(t *testing.T) {
Expand All @@ -67,7 +67,7 @@ func TestStartupLog(t *testing.T) {
tp.Reset()
logStartup(tracer)
assert.Len(tp.Lines(), 2)
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sampling_rules":\[{"service":"some.service","name":"","sample_rate":0\.234}\],"sampling_rules_error":"found errors:\\n\\tat index 1: rate not provided","tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false"}`, tp.Lines()[1])
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sampling_rules":\[{"service":"some.service","name":"","sample_rate":0\.234}\],"sampling_rules_error":"found errors:\\n\\tat index 1: rate not provided","tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","agent_features":{"DropP0s":false,"V05":false,"Stats":false}}`, tp.Lines()[1])
})

t.Run("lambda", func(t *testing.T) {
Expand All @@ -79,7 +79,7 @@ func TestStartupLog(t *testing.T) {
tp.Reset()
logStartup(tracer)
assert.Len(tp.Lines(), 1)
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test","agent_url":"http://localhost:9/v0.4/traces","agent_error":"","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sampling_rules":null,"sampling_rules_error":"","tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"true"}`, tp.Lines()[0])
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test","agent_url":"http://localhost:9/v0.4/traces","agent_error":"","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sampling_rules":null,"sampling_rules_error":"","tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"true","agent_features":{"DropP0s":false,"V05":false,"Stats":false}}`, tp.Lines()[0])
})
}

Expand All @@ -104,7 +104,7 @@ func TestLogAgentReachable(t *testing.T) {
tp.Reset()
logStartup(tracer)
assert.Len(tp.Lines(), 2)
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ WARN: DIAGNOSTICS Unable to reach agent: Post`, tp.Lines()[0])
assert.Regexp(`Datadog Tracer v[0-9]+\.[0-9]+\.[0-9]+ WARN: DIAGNOSTICS Unable to reach agent intake: Post`, tp.Lines()[0])
}

func TestLogFormat(t *testing.T) {
Expand Down
34 changes: 32 additions & 2 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ type config struct {
// debug, when true, writes details to logs.
debug bool

// lambda, when true, enables the lambda trace writer
// featureFlags specifies any enabled feature flags.
featureFlags map[string]struct{}

// logToStdout reports whether we should log all traces to the standard
// output instead of using the agent. This is used in Lambda environments.
logToStdout bool

// logStartup, when true, causes various startup info to be written
Expand Down Expand Up @@ -99,6 +103,12 @@ type config struct {
noDebugStack bool
}

// HasFeature reports whether feature f is enabled.
func (c *config) HasFeature(f string) bool {
_, ok := c.featureFlags[strings.TrimSpace(f)]
return ok
}

// StartOption represents a function that can be provided as a parameter to Start.
type StartOption func(*config)

Expand Down Expand Up @@ -133,6 +143,11 @@ func newConfig(opts ...StartOption) *config {
if v := os.Getenv("DD_ENV"); v != "" {
c.env = v
}
if v := os.Getenv("DD_TRACE_FEATURES"); v != "" {
WithFeatureFlags(strings.FieldsFunc(v, func(r rune) bool {
return r == ',' || r == ' '
})...)(c)
}
if v := os.Getenv("DD_SERVICE"); v != "" {
c.serviceName = v
globalconfig.SetServiceName(v)
Expand Down Expand Up @@ -200,7 +215,7 @@ func newConfig(opts ...StartOption) *config {
}
}
if c.transport == nil {
c.transport = newTransport(c.agentAddr, c.httpClient)
c.transport = newHTTPTransport(c.agentAddr, c.httpClient)
}
if c.propagator == nil {
c.propagator = NewPropagator(nil)
Expand Down Expand Up @@ -246,6 +261,21 @@ func statsTags(c *config) []string {
return tags
}

// WithFeatureFlags specifies a set of feature flags to enable. Please take into account
// that most, if not all features flags are considered to be experimental and result in
// unexpected bugs.
func WithFeatureFlags(feats ...string) StartOption {
return func(c *config) {
if c.featureFlags == nil {
c.featureFlags = make(map[string]struct{}, len(feats))
}
for _, f := range feats {
c.featureFlags[strings.TrimSpace(f)] = struct{}{}
}
log.Info("FEATURES enabled: %v", feats)
}
}

// WithLogger sets logger as the tracer's error printer.
func WithLogger(logger ddtrace.Logger) StartOption {
return func(c *config) {
Expand Down
107 changes: 99 additions & 8 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"

"github.com/tinylib/msgp/msgp"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -130,21 +132,32 @@ func (s *span) SetTag(key string, value interface{}) {
// setTagError sets the error tag. It accounts for various valid scenarios.
// This method is not safe for concurrent use.
func (s *span) setTagError(value interface{}, cfg errorConfig) {
setError := func(yes bool) {
if yes {
if s.Error == 0 {
// new error
atomic.AddInt64(&s.context.errors, 1)
}
s.Error = 1
} else {
if s.Error > 0 {
// flip from active to inactive
atomic.AddInt64(&s.context.errors, -1)
}
s.Error = 0
}
}
if s.finished {
return
}
switch v := value.(type) {
case bool:
// bool value as per Opentracing spec.
if !v {
s.Error = 0
} else {
s.Error = 1
}
setError(v)
case error:
// if anyone sets an error value as the tag, be nice here
// and provide all the benefits.
s.Error = 1
setError(true)
s.setMeta(ext.ErrorMsg, v.Error())
s.setMeta(ext.ErrorType, reflect.TypeOf(v).String())
if !cfg.noDebugStack {
Expand All @@ -159,11 +172,11 @@ func (s *span) setTagError(value interface{}, cfg errorConfig) {
}
case nil:
// no error
s.Error = 0
setError(false)
default:
// in all other cases, let's assume that setting this tag
// is the result of an error.
s.Error = 1
setError(true)
}
}

Expand Down Expand Up @@ -319,13 +332,91 @@ func (s *span) finish(finishTime int64) {
}
s.finished = true

if t, ok := internal.GetGlobalTracer().(*tracer); ok {
// we have an active tracer
feats := t.features.Load()
if feats.Stats && shouldComputeStats(s) {
// the agent supports computed stats
select {
case t.stats.In <- newAggregableSpan(s, t.config):
// ok
default:
log.Error("Stats channel full, disregarding span.")
}
}
if feats.DropP0s {
// the agent supports dropping p0's in the client
if shouldDrop(s) {
// ...and this span can be dropped
atomic.AddUint64(&t.droppedP0Spans, 1)
if s == s.context.trace.root {
atomic.AddUint64(&t.droppedP0Traces, 1)
}
return
}
}
}
if s.context.drop {
// not sampled by local sampler
return
}
s.context.finish()
}

// newAggregableSpan creates a new summary for the span s, within an application
// version version.
func newAggregableSpan(s *span, cfg *config) *aggregableSpan {
var statusCode uint32
if sc, ok := s.Meta["http.status_code"]; ok && sc != "" {
if c, err := strconv.Atoi(sc); err == nil {
statusCode = uint32(c)
}
}
key := aggregation{
Name: s.Name,
Resource: s.Resource,
Service: s.Service,
Type: s.Type,
Synthetics: strings.HasPrefix(s.Meta[keyOrigin], "synthetics"),
StatusCode: statusCode,
}
return &aggregableSpan{
key: key,
Start: s.Start,
Duration: s.Duration,
TopLevel: s.Metrics[keyTopLevel] == 1,
Error: s.Error,
}
}

// shouldDrop reports whether it's fine to drop the span s.
func shouldDrop(s *span) bool {
if p, ok := s.context.samplingPriority(); ok && p > 0 {
// positive sampling priorities stay
return false
}
if atomic.LoadInt64(&s.context.errors) > 0 {
// traces with any span containing an error get kept
return false
}
if v, ok := s.Metrics[ext.EventSampleRate]; ok {
return !sampledByRate(s.TraceID, v)
}
return true
}

// shouldComputeStats mentions whether this span needs to have stats computed for.
// Warning: callers must guard!
func shouldComputeStats(s *span) bool {
if v, ok := s.Metrics[keyMeasured]; ok && v == 1 {
return true
}
if v, ok := s.Metrics[keyTopLevel]; ok && v == 1 {
return true
}
return false
}

// String returns a human readable representation of the span. Not for
// production, just debugging.
func (s *span) String() string {
Expand Down
Loading

0 comments on commit b74a5df

Please sign in to comment.