From cb93051f5fd3d4866cce8cc8b5c879aa8893fdf0 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Thu, 29 Dec 2022 11:26:11 -0600 Subject: [PATCH] ddtrace/tracer: clean up resources in tests (#1643) Tests have been failing to clean up after themselves and causing memory usage issues during unit test runs. One cause of this is failing to shut down a tracer after starting it, which is solved simply by defering tracer.Close() for those tests. Other tests use newConfig() to generate a tracer config, which they use for various purposes. Unfortunately newConfig() actually creates a statsd client for the new config, which needs to be Close()'d in order to release its resources. It doesn't make much sense to need to Close() a config, or have to Close() something *in* a config, so this problem requires a more complicated solution. Instead, the config still calculates the statsd address, but the actual instantiation of the statsd client is moved into newTracer, where the worker routines and other things that require cleanup are started. This unfortunately requires several other pieces of code to keep a new reference to the statsd client, rather than just to the config as they did previously. --- ddtrace/tracer/metrics.go | 8 +++--- ddtrace/tracer/metrics_test.go | 3 ++- ddtrace/tracer/option.go | 29 +++++++++++++-------- ddtrace/tracer/option_test.go | 38 ++++++++++++++++++++++----- ddtrace/tracer/sampler_test.go | 1 + ddtrace/tracer/span_test.go | 10 ++++++++ ddtrace/tracer/stats.go | 13 +++++----- ddtrace/tracer/textmap_test.go | 19 ++++++++++++++ ddtrace/tracer/tracer.go | 22 +++++++++++----- ddtrace/tracer/tracer_test.go | 1 + ddtrace/tracer/transport.go | 2 +- ddtrace/tracer/writer.go | 30 +++++++++++++--------- ddtrace/tracer/writer_test.go | 47 ++++++++++++++++++++++++++-------- 13 files changed, 163 insertions(+), 60 deletions(-) diff --git a/ddtrace/tracer/metrics.go b/ddtrace/tracer/metrics.go index 858b6ae1c6..c648e9b855 100644 --- a/ddtrace/tracer/metrics.go +++ b/ddtrace/tracer/metrics.go @@ -46,7 +46,7 @@ func (t *tracer) reportRuntimeMetrics(interval time.Duration) { runtime.ReadMemStats(&ms) debug.ReadGCStats(&gc) - statsd := t.config.statsd + statsd := t.statsd // CPU statistics statsd.Gauge("runtime.go.num_cpu", float64(runtime.NumCPU()), nil, 1) statsd.Gauge("runtime.go.num_goroutine", float64(runtime.NumGoroutine()), nil, 1) @@ -99,9 +99,9 @@ func (t *tracer) reportHealthMetrics(interval time.Duration) { for { select { case <-ticker.C: - t.config.statsd.Count("datadog.tracer.spans_started", int64(atomic.SwapUint32(&t.spansStarted, 0)), nil, 1) - t.config.statsd.Count("datadog.tracer.spans_finished", int64(atomic.SwapUint32(&t.spansFinished, 0)), nil, 1) - t.config.statsd.Count("datadog.tracer.traces_dropped", int64(atomic.SwapUint32(&t.tracesDropped, 0)), []string{"reason:trace_too_large"}, 1) + t.statsd.Count("datadog.tracer.spans_started", int64(atomic.SwapUint32(&t.spansStarted, 0)), nil, 1) + t.statsd.Count("datadog.tracer.spans_finished", int64(atomic.SwapUint32(&t.spansFinished, 0)), nil, 1) + t.statsd.Count("datadog.tracer.traces_dropped", int64(atomic.SwapUint32(&t.tracesDropped, 0)), []string{"reason:trace_too_large"}, 1) case <-t.stop: return } diff --git a/ddtrace/tracer/metrics_test.go b/ddtrace/tracer/metrics_test.go index 06c0ae7946..c4a60569a4 100644 --- a/ddtrace/tracer/metrics_test.go +++ b/ddtrace/tracer/metrics_test.go @@ -48,7 +48,7 @@ type testStatsdCall struct { func withStatsdClient(s statsdClient) StartOption { return func(c *config) { - c.statsd = s + c.statsdClient = s } } @@ -246,6 +246,7 @@ func (tg *testStatsdClient) Wait(n int, d time.Duration) error { func TestReportRuntimeMetrics(t *testing.T) { var tg testStatsdClient trc := newUnstartedTracer(withStatsdClient(&tg)) + defer trc.statsd.Close() trc.wg.Add(1) go func() { diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index 77a732de71..164852f0a2 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -114,8 +114,9 @@ type config struct { // combination of the environment variables DD_AGENT_HOST and DD_DOGSTATSD_PORT. dogstatsdAddr string - // statsd is used for tracking metrics associated with the runtime and the tracer. - statsd statsdClient + // statsdClient is set when a user provides a custom statsd client for tracking metrics + // associated with the runtime and the tracer. + statsdClient statsdClient // spanRules contains user-defined rules to determine the sampling rate to apply // to trace spans. @@ -295,7 +296,7 @@ func newConfig(opts ...StartOption) *config { log.SetLevel(log.LevelDebug) } c.loadAgentFeatures() - if c.statsd == nil { + if c.statsdClient == nil { // configure statsd client addr := c.dogstatsdAddr if addr == "" { @@ -316,17 +317,23 @@ func newConfig(opts ...StartOption) *config { // not a valid TCP address, leave it as it is (could be a socket connection) } c.dogstatsdAddr = addr - client, err := statsd.New(addr, statsd.WithMaxMessagesPerPayload(40), statsd.WithTags(statsTags(c))) - if err != nil { - log.Warn("Runtime and health metrics disabled: %v", err) - c.statsd = &statsd.NoOpClient{} - } else { - c.statsd = client - } } + return c } +func newStatsdClient(c *config) (statsdClient, error) { + if c.statsdClient != nil { + return c.statsdClient, nil + } + + client, err := statsd.New(c.dogstatsdAddr, statsd.WithMaxMessagesPerPayload(40), statsd.WithTags(statsTags(c))) + if err != nil { + return &statsd.NoOpClient{}, err + } + return client, nil +} + // defaultHTTPClient returns the default http.Client to start the tracer with. func defaultHTTPClient() *http.Client { if _, err := os.Stat(defaultSocketAPM); err == nil { @@ -476,7 +483,7 @@ func statsTags(c *config) []string { // withNoopStats is used for testing to disable statsd client func withNoopStats() StartOption { return func(c *config) { - c.statsd = &statsd.NoOpClient{} + c.statsdClient = &statsd.NoOpClient{} } } diff --git a/ddtrace/tracer/option_test.go b/ddtrace/tracer/option_test.go index 4ae9aefd8a..29848b43ce 100644 --- a/ddtrace/tracer/option_test.go +++ b/ddtrace/tracer/option_test.go @@ -41,9 +41,11 @@ func withTickChan(ch <-chan time.Time) StartOption { // testStatsd asserts that the given statsd.Client can successfully send metrics // to a UDP listener located at addr. func testStatsd(t *testing.T, cfg *config, addr string) { - client := cfg.statsd + client, err := newStatsdClient(cfg) + require.NoError(t, err) + defer client.Close() require.Equal(t, addr, cfg.dogstatsdAddr) - _, err := net.ResolveUDPAddr("udp", addr) + _, err = net.ResolveUDPAddr("udp", addr) require.NoError(t, err) client.Count("name", 1, []string{"tag"}, 1) @@ -57,7 +59,9 @@ func TestStatsdUDPConnect(t *testing.T) { cfg := newConfig() addr := net.JoinHostPort(defaultHostname, "8111") - client := cfg.statsd + client, err := newStatsdClient(cfg) + require.NoError(t, err) + defer client.Close() require.Equal(t, addr, cfg.dogstatsdAddr) udpaddr, err := net.ResolveUDPAddr("udp", addr) require.NoError(t, err) @@ -118,8 +122,11 @@ func TestAutoDetectStatsd(t *testing.T) { conn.SetDeadline(time.Now().Add(5 * time.Second)) cfg := newConfig() + statsd, err := newStatsdClient(cfg) + require.NoError(t, err) + defer statsd.Close() require.Equal(t, cfg.dogstatsdAddr, "unix://"+addr) - cfg.statsd.Count("name", 1, []string{"tag"}, 1) + statsd.Count("name", 1, []string{"tag"}, 1) buf := make([]byte, 17) n, err := conn.Read(buf) @@ -246,11 +253,14 @@ func TestTracerOptionsDefaults(t *testing.T) { defer globalconfig.SetAnalyticsRate(math.NaN()) assert := assert.New(t) assert.True(math.IsNaN(globalconfig.AnalyticsRate())) - newTracer(WithAnalyticsRate(0.5)) + tracer := newTracer(WithAnalyticsRate(0.5)) + defer tracer.Stop() assert.Equal(0.5, globalconfig.AnalyticsRate()) - newTracer(WithAnalytics(false)) + tracer = newTracer(WithAnalytics(false)) + defer tracer.Stop() assert.True(math.IsNaN(globalconfig.AnalyticsRate())) - newTracer(WithAnalytics(true)) + tracer = newTracer(WithAnalytics(true)) + defer tracer.Stop() assert.Equal(1., globalconfig.AnalyticsRate()) }) @@ -274,6 +284,7 @@ func TestTracerOptionsDefaults(t *testing.T) { t.Run("dogstatsd", func(t *testing.T) { t.Run("default", func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.Equal(t, c.dogstatsdAddr, "localhost:8125") }) @@ -282,6 +293,7 @@ func TestTracerOptionsDefaults(t *testing.T) { os.Setenv("DD_AGENT_HOST", "my-host") defer os.Unsetenv("DD_AGENT_HOST") tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.Equal(t, c.dogstatsdAddr, "my-host:8125") }) @@ -290,6 +302,7 @@ func TestTracerOptionsDefaults(t *testing.T) { os.Setenv("DD_DOGSTATSD_PORT", "123") defer os.Unsetenv("DD_DOGSTATSD_PORT") tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.Equal(t, c.dogstatsdAddr, "localhost:123") }) @@ -300,6 +313,7 @@ func TestTracerOptionsDefaults(t *testing.T) { defer os.Unsetenv("DD_AGENT_HOST") defer os.Unsetenv("DD_DOGSTATSD_PORT") tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.Equal(t, c.dogstatsdAddr, "my-host:123") }) @@ -308,12 +322,14 @@ func TestTracerOptionsDefaults(t *testing.T) { os.Setenv("DD_ENV", "testEnv") defer os.Unsetenv("DD_ENV") tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.Equal(t, "testEnv", c.env) }) t.Run("option", func(t *testing.T) { tracer := newTracer(WithDogstatsdAddress("10.1.0.12:4002")) + defer tracer.Stop() c := tracer.config assert.Equal(t, c.dogstatsdAddr, "10.1.0.12:4002") }) @@ -323,6 +339,7 @@ func TestTracerOptionsDefaults(t *testing.T) { os.Setenv("DD_AGENT_HOST", "trace-agent") defer os.Unsetenv("DD_AGENT_HOST") tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.Equal(t, "http://trace-agent:8126", c.agentURL) }) @@ -331,6 +348,7 @@ func TestTracerOptionsDefaults(t *testing.T) { t.Run("env", func(t *testing.T) { t.Setenv("DD_TRACE_AGENT_URL", "https://custom:1234") tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.Equal(t, "https://custom:1234", c.agentURL) }) @@ -340,6 +358,7 @@ func TestTracerOptionsDefaults(t *testing.T) { t.Setenv("DD_TRACE_AGENT_PORT", "3333") t.Setenv("DD_TRACE_AGENT_URL", "https://custom:1234") tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.Equal(t, "https://custom:1234", c.agentURL) }) @@ -347,6 +366,7 @@ func TestTracerOptionsDefaults(t *testing.T) { t.Run("code-override", func(t *testing.T) { t.Setenv("DD_TRACE_AGENT_URL", "https://custom:1234") tracer := newTracer(WithAgentAddr("testhost:3333")) + defer tracer.Stop() c := tracer.config assert.Equal(t, "http://testhost:3333", c.agentURL) }) @@ -358,6 +378,7 @@ func TestTracerOptionsDefaults(t *testing.T) { assert := assert.New(t) env := "production" tracer := newTracer(WithEnv(env)) + defer tracer.Stop() c := tracer.config assert.Equal(env, c.env) }) @@ -365,6 +386,7 @@ func TestTracerOptionsDefaults(t *testing.T) { t.Run("trace_enabled", func(t *testing.T) { t.Run("default", func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.True(t, c.enabled) }) @@ -373,6 +395,7 @@ func TestTracerOptionsDefaults(t *testing.T) { os.Setenv("DD_TRACE_ENABLED", "false") defer os.Unsetenv("DD_TRACE_ENABLED") tracer := newTracer() + defer tracer.Stop() c := tracer.config assert.False(t, c.enabled) }) @@ -387,6 +410,7 @@ func TestTracerOptionsDefaults(t *testing.T) { WithDebugMode(true), WithEnv("testEnv"), ) + defer tracer.Stop() c := tracer.config assert.Equal(float64(0.5), c.sampler.(RateSampler).Rate()) assert.Equal("http://ddagent.consul.local:58126", c.agentURL) diff --git a/ddtrace/tracer/sampler_test.go b/ddtrace/tracer/sampler_test.go index aa0e69fa29..9b71681b31 100644 --- a/ddtrace/tracer/sampler_test.go +++ b/ddtrace/tracer/sampler_test.go @@ -643,6 +643,7 @@ func TestRulesSamplerConcurrency(t *testing.T) { NameRule("notweb.request", 1.0), } tracer := newTracer(WithSamplingRules(rules)) + defer tracer.Stop() span := func(wg *sync.WaitGroup) { defer wg.Done() tracer.StartSpan("db.query", ServiceName("postgres.db")).Finish() diff --git a/ddtrace/tracer/span_test.go b/ddtrace/tracer/span_test.go index f4c0307d6c..01b2f9432a 100644 --- a/ddtrace/tracer/span_test.go +++ b/ddtrace/tracer/span_test.go @@ -76,6 +76,7 @@ func TestSpanFinish(t *testing.T) { assert := assert.New(t) wait := time.Millisecond * 2 tracer := newTracer(withTransport(newDefaultTransport())) + defer tracer.Stop() span := tracer.newRootSpan("pylons.request", "pylons", "/") // the finish should set finished and the duration @@ -364,6 +365,7 @@ func TestTraceManualKeepAndManualDrop(t *testing.T) { } { t.Run(fmt.Sprintf("%s/local", scenario.tag), func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() span := tracer.newRootSpan("root span", "my service", "my resource") span.SetTag(scenario.tag, true) assert.Equal(t, scenario.keep, shouldKeep(span)) @@ -371,6 +373,7 @@ func TestTraceManualKeepAndManualDrop(t *testing.T) { t.Run(fmt.Sprintf("%s/non-local", scenario.tag), func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() spanCtx := &spanContext{traceID: 42, spanID: 42} spanCtx.setSamplingPriority(scenario.p, samplernames.RemoteRate) span := tracer.StartSpan("non-local root span", ChildOf(spanCtx)).(*span) @@ -396,6 +399,7 @@ func TestSpanSetDatadogTags(t *testing.T) { func TestSpanStart(t *testing.T) { assert := assert.New(t) tracer := newTracer(withTransport(newDefaultTransport())) + defer tracer.Stop() span := tracer.newRootSpan("pylons.request", "pylons", "/") // a new span sets the Start after the initialization @@ -405,6 +409,7 @@ func TestSpanStart(t *testing.T) { func TestSpanString(t *testing.T) { assert := assert.New(t) tracer := newTracer(withTransport(newDefaultTransport())) + defer tracer.Stop() span := tracer.newRootSpan("pylons.request", "pylons", "/") // don't bother checking the contents, just make sure it works. assert.NotEqual("", span.String()) @@ -463,6 +468,7 @@ func TestSpanSetMetric(t *testing.T) { t.Run(name, func(t *testing.T) { assert := assert.New(t) tracer := newTracer(withTransport(newDefaultTransport())) + defer tracer.Stop() span := tracer.newRootSpan("http.request", "mux.router", "/") tt(assert, span) }) @@ -472,6 +478,7 @@ func TestSpanSetMetric(t *testing.T) { func TestSpanError(t *testing.T) { assert := assert.New(t) tracer := newTracer(withTransport(newDefaultTransport())) + defer tracer.Stop() span := tracer.newRootSpan("pylons.request", "pylons", "/") // check the error is set in the default meta @@ -499,6 +506,7 @@ func TestSpanError(t *testing.T) { func TestSpanError_Typed(t *testing.T) { assert := assert.New(t) tracer := newTracer(withTransport(newDefaultTransport())) + defer tracer.Stop() span := tracer.newRootSpan("pylons.request", "pylons", "/") // check the error is set in the default meta @@ -513,6 +521,7 @@ func TestSpanError_Typed(t *testing.T) { func TestSpanErrorNil(t *testing.T) { assert := assert.New(t) tracer := newTracer(withTransport(newDefaultTransport())) + defer tracer.Stop() span := tracer.newRootSpan("pylons.request", "pylons", "/") // don't set the error if it's nil @@ -574,6 +583,7 @@ func TestSpanModifyWhileFlushing(t *testing.T) { func TestSpanSamplingPriority(t *testing.T) { assert := assert.New(t) tracer := newTracer(withTransport(newDefaultTransport())) + defer tracer.Stop() span := tracer.newRootSpan("my.name", "my.service", "my.resource") _, ok := span.Metrics[keySamplingPriority] diff --git a/ddtrace/tracer/stats.go b/ddtrace/tracer/stats.go index 3c9ee344f0..95c5caee11 100644 --- a/ddtrace/tracer/stats.go +++ b/ddtrace/tracer/stats.go @@ -54,10 +54,11 @@ type concentrator struct { // stopped reports whether the concentrator is stopped (when non-zero) stopped uint32 - wg sync.WaitGroup // waits for any active goroutines - bucketSize int64 // the size of a bucket in nanoseconds - stop chan struct{} // closing this channel triggers shutdown - cfg *config // tracer startup configuration + wg sync.WaitGroup // waits for any active goroutines + bucketSize int64 // the size of a bucket in nanoseconds + stop chan struct{} // closing this channel triggers shutdown + cfg *config // tracer startup configuration + statsdClient statsdClient // statsd client for sending metrics. } // newConcentrator creates a new concentrator using the given tracer @@ -113,10 +114,10 @@ func (c *concentrator) runFlusher(tick <-chan time.Time) { // statsd returns any tracer configured statsd client, or a no-op. func (c *concentrator) statsd() statsdClient { - if c.cfg.statsd == nil { + if c.statsdClient == nil { return &statsd.NoOpClient{} } - return c.cfg.statsd + return c.statsdClient } // runIngester runs the loop which accepts incoming data on the concentrator's In diff --git a/ddtrace/tracer/textmap_test.go b/ddtrace/tracer/textmap_test.go index 655038999b..5a3bceafe8 100644 --- a/ddtrace/tracer/textmap_test.go +++ b/ddtrace/tracer/textmap_test.go @@ -133,6 +133,7 @@ func TestTextMapPropagatorInjectHeader(t *testing.T) { ParentHeader: "pid", }) tracer := newTracer(WithPropagator(propagator)) + defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) root.SetBaggageItem("item", "x") @@ -160,6 +161,7 @@ func TestTextMapPropagatorOrigin(t *testing.T) { DefaultParentIDHeader: "1", }) tracer := newTracer() + defer tracer.Stop() ctx, err := tracer.Extract(src) if err != nil { t.Fatal(err) @@ -188,6 +190,7 @@ func TestTextMapPropagatorTraceTagsWithPriority(t *testing.T) { traceTagsHeader: "hello=world,_dd.p.dm=934086a6-4", }) tracer := newTracer() + defer tracer.Stop() ctx, err := tracer.Extract(src) assert.Nil(t, err) sctx, ok := ctx.(*spanContext) @@ -215,6 +218,7 @@ func TestTextMapPropagatorTraceTagsWithoutPriority(t *testing.T) { traceTagsHeader: "hello=world,_dd.p.dm=934086a6-4", }) tracer := newTracer() + defer tracer.Stop() ctx, err := tracer.Extract(src) assert.Nil(t, err) sctx, ok := ctx.(*spanContext) @@ -242,6 +246,7 @@ func TestExtractOriginSynthetics(t *testing.T) { DefaultParentIDHeader: "0", }) tracer := newTracer() + defer tracer.Stop() ctx, err := tracer.Extract(src) if err != nil { t.Fatal(err) @@ -263,6 +268,7 @@ func TestTextMapPropagator(t *testing.T) { traceTagsHeader: "hello=world,=", // invalid value }) tracer := newTracer() + defer tracer.Stop() ctx, err := tracer.Extract(src) assert.Nil(t, err) sctx, ok := ctx.(*spanContext) @@ -282,6 +288,7 @@ func TestTextMapPropagator(t *testing.T) { traceTagsHeader: traceTags, }) tracer := newTracer() + defer tracer.Stop() ctx, err := tracer.Extract(src) assert.Nil(t, err) sctx, ok := ctx.(*spanContext) @@ -291,6 +298,7 @@ func TestTextMapPropagator(t *testing.T) { t.Run("InjectTraceTagsTooLong", func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() child := tracer.StartSpan("test") for i := 0; i < 100; i++ { child.Context().(*spanContext).trace.setPropagatingTag(fmt.Sprintf("someKey%d", i), fmt.Sprintf("someValue%d", i)) @@ -309,6 +317,7 @@ func TestTextMapPropagator(t *testing.T) { t.Run("InvalidTraceTags", func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() internal.SetGlobalTracer(tracer) child := tracer.StartSpan("test") child.Context().(*spanContext).trace.setPropagatingTag("_dd.p.hello1", "world") // valid value @@ -332,6 +341,7 @@ func TestTextMapPropagator(t *testing.T) { ParentHeader: "pid", }) tracer := newTracer(WithPropagator(propagator)) + defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) root.SetTag(ext.SamplingPriority, -1) root.SetBaggageItem("item", "x") @@ -397,6 +407,7 @@ func TestEnvVars(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("inject with env=%q", testEnv), func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) ctx, ok := root.Context().(*spanContext) ctx.traceID = test.in[0] @@ -453,6 +464,7 @@ func TestEnvVars(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("extract with env=%q", testEnv), func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(test.in) assert.Nil(err) @@ -505,6 +517,7 @@ func TestEnvVars(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("inject with env=%q", testEnv), func(t *testing.T) { tracer := newTracer(WithPropagator(NewPropagator(&PropagatorConfig{B3: true}))) + defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) ctx, ok := root.Context().(*spanContext) ctx.traceID = test.in[0] @@ -555,6 +568,7 @@ func TestEnvVars(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("extract with env=%q", testEnv), func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() assert := assert.New(t) ctx, err := tracer.Extract(test.in) @@ -610,6 +624,7 @@ func TestEnvVars(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("inject and extract with env=%q", testEnv), func(t *testing.T) { tracer := newTracer() + defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) root.SetTag(ext.SamplingPriority, -1) root.SetBaggageItem("item", "x") @@ -641,6 +656,7 @@ func TestNonePropagator(t *testing.T) { t.Run("inject/none", func(t *testing.T) { t.Setenv(headerPropagationStyleInject, "none") tracer := newTracer() + defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) root.SetTag(ext.SamplingPriority, -1) root.SetBaggageItem("item", "x") @@ -660,6 +676,7 @@ func TestNonePropagator(t *testing.T) { t.Setenv(headerPropagationStyleInject, "none,b3") tp := new(testLogger) tracer := newTracer(WithLogger(tp)) + defer tracer.Stop() // reinitializing to capture log output, since propagators are parsed before logger is set tracer.config.propagator = NewPropagator(&PropagatorConfig{}) root := tracer.StartSpan("web.request").(*span) @@ -684,6 +701,7 @@ func TestNonePropagator(t *testing.T) { t.Setenv(headerPropagationStyleExtract, "none") assert := assert.New(t) tracer := newTracer() + defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) root.SetTag(ext.SamplingPriority, -1) root.SetBaggageItem("item", "x") @@ -698,6 +716,7 @@ func TestNonePropagator(t *testing.T) { t.Run("inject,extract/none", func(t *testing.T) { t.Setenv(headerPropagationStyle, "none") tracer := newTracer() + defer tracer.Stop() root := tracer.StartSpan("web.request").(*span) root.SetTag(ext.SamplingPriority, -1) root.SetBaggageItem("item", "x") diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index 7126ee5007..aae999cf07 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -88,6 +88,9 @@ type tracer struct { // obfuscator holds the obfuscator used to obfuscate resources in aggregated stats. // obfuscator may be nil if disabled. obfuscator *obfuscate.Obfuscator + + // statsd is used for tracking metrics associated with the runtime and the tracer. + statsd statsdClient } const ( @@ -192,11 +195,15 @@ const payloadQueueSize = 1000 func newUnstartedTracer(opts ...StartOption) *tracer { c := newConfig(opts...) sampler := newPrioritySampler() + statsd, err := newStatsdClient(c) + if err != nil { + log.Warn("Runtime and health metrics disabled: %v", err) + } var writer traceWriter if c.logToStdout { - writer = newLogTraceWriter(c) + writer = newLogTraceWriter(c, statsd) } else { - writer = newAgentTraceWriter(c, sampler) + writer = newAgentTraceWriter(c, sampler, statsd) } traces, spans, err := samplingRulesFromEnv() if err != nil { @@ -227,6 +234,7 @@ func newUnstartedTracer(opts ...StartOption) *tracer { Cache: c.agent.HasFlag("sql_cache"), }, }), + statsd: statsd, } return t } @@ -234,7 +242,7 @@ func newUnstartedTracer(opts ...StartOption) *tracer { func newTracer(opts ...StartOption) *tracer { t := newUnstartedTracer(opts...) c := t.config - t.config.statsd.Incr("datadog.tracer.started", nil, 1) + t.statsd.Incr("datadog.tracer.started", nil, 1) if c.runtimeMetrics { log.Debug("Runtime metrics enabled.") t.wg.Add(1) @@ -297,11 +305,11 @@ func (t *tracer) worker(tick <-chan time.Time) { t.traceWriter.add(trace.spans) } case <-tick: - t.config.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:scheduled"}, 1) + t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:scheduled"}, 1) t.traceWriter.flush() case done := <-t.flush: - t.config.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:invoked"}, 1) + t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:invoked"}, 1) t.traceWriter.flush() // TODO(x): In reality, the traceWriter.flush() call is not synchronous // when using the agent traceWriter. However, this functionnality is used @@ -547,12 +555,12 @@ func spanResourcePIISafe(s *span) bool { func (t *tracer) Stop() { t.stopOnce.Do(func() { close(t.stop) - t.config.statsd.Incr("datadog.tracer.stopped", nil, 1) + t.statsd.Incr("datadog.tracer.stopped", nil, 1) }) t.stats.Stop() t.wg.Wait() t.traceWriter.stop() - t.config.statsd.Close() + t.statsd.Close() appsec.Stop() } diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 5d64a3ef41..fd197efef2 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -1427,6 +1427,7 @@ func TestPushTrace(t *testing.T) { tp := new(testLogger) log.UseLogger(tp) tracer := newUnstartedTracer() + defer tracer.statsd.Close() trace := []*span{ &span{ Name: "pylons.request", diff --git a/ddtrace/tracer/transport.go b/ddtrace/tracer/transport.go index 6bfa86e2c8..e6428fc4f7 100644 --- a/ddtrace/tracer/transport.go +++ b/ddtrace/tracer/transport.go @@ -152,7 +152,7 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) { droppedTraces := int(atomic.SwapUint32(&t.droppedP0Traces, 0)) partialTraces := int(atomic.SwapUint32(&t.partialTraces, 0)) droppedSpans := int(atomic.SwapUint32(&t.droppedP0Spans, 0)) - if stats := t.config.statsd; stats != nil { + if stats := t.statsd; stats != nil { stats.Count("datadog.tracer.dropped_p0_traces", int64(droppedTraces), []string{fmt.Sprintf("partial:%s", strconv.FormatBool(partialTraces > 0))}, 1) stats.Count("datadog.tracer.dropped_p0_spans", int64(droppedSpans), nil, 1) diff --git a/ddtrace/tracer/writer.go b/ddtrace/tracer/writer.go index 58798a7a79..0e3de2908b 100644 --- a/ddtrace/tracer/writer.go +++ b/ddtrace/tracer/writer.go @@ -46,30 +46,34 @@ type agentTraceWriter struct { // prioritySampling is the prioritySampler into which agentTraceWriter will // read sampling rates sent by the agent prioritySampling *prioritySampler + + // statsd is used to send metrics + statsd statsdClient } -func newAgentTraceWriter(c *config, s *prioritySampler) *agentTraceWriter { +func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient statsdClient) *agentTraceWriter { return &agentTraceWriter{ config: c, payload: newPayload(), climit: make(chan struct{}, concurrentConnectionLimit), prioritySampling: s, + statsd: statsdClient, } } func (h *agentTraceWriter) add(trace []*span) { if err := h.payload.push(trace); err != nil { - h.config.statsd.Incr("datadog.tracer.traces_dropped", []string{"reason:encoding_error"}, 1) + h.statsd.Incr("datadog.tracer.traces_dropped", []string{"reason:encoding_error"}, 1) log.Error("Error encoding msgpack: %v", err) } if h.payload.size() > payloadSizeLimit { - h.config.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:size"}, 1) + h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:size"}, 1) h.flush() } } func (h *agentTraceWriter) stop() { - h.config.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:shutdown"}, 1) + h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:shutdown"}, 1) h.flush() h.wg.Wait() } @@ -87,19 +91,19 @@ func (h *agentTraceWriter) flush() { defer func(start time.Time) { <-h.climit h.wg.Done() - h.config.statsd.Timing("datadog.tracer.flush_duration", time.Since(start), nil, 1) + h.statsd.Timing("datadog.tracer.flush_duration", time.Since(start), nil, 1) }(time.Now()) size, count := p.size(), p.itemCount() log.Debug("Sending payload: size: %d traces: %d\n", size, count) rc, err := h.config.transport.send(p) if err != nil { - h.config.statsd.Count("datadog.tracer.traces_dropped", int64(count), []string{"reason:send_failed"}, 1) + h.statsd.Count("datadog.tracer.traces_dropped", int64(count), []string{"reason:send_failed"}, 1) log.Error("lost %d traces: %v", count, err) } else { - h.config.statsd.Count("datadog.tracer.flush_bytes", int64(size), nil, 1) - h.config.statsd.Count("datadog.tracer.flush_traces", int64(count), nil, 1) + h.statsd.Count("datadog.tracer.flush_bytes", int64(size), nil, 1) + h.statsd.Count("datadog.tracer.flush_traces", int64(count), nil, 1) if err := h.prioritySampling.readRatesJSON(rc); err != nil { - h.config.statsd.Incr("datadog.tracer.decode_error", nil, 1) + h.statsd.Incr("datadog.tracer.decode_error", nil, 1) } } }(oldp) @@ -116,12 +120,14 @@ type logTraceWriter struct { buf bytes.Buffer hasTraces bool w io.Writer + statsd statsdClient } -func newLogTraceWriter(c *config) *logTraceWriter { +func newLogTraceWriter(c *config, statsdClient statsdClient) *logTraceWriter { w := &logTraceWriter{ config: c, w: logWriter, + statsd: statsdClient, } w.resetBuffer() return w @@ -290,7 +296,7 @@ func (h *logTraceWriter) add(trace []*span) { n, err := h.writeTrace(trace) if err != nil { log.Error("Lost a trace: %s", err.cause) - h.config.statsd.Count("datadog.tracer.traces_dropped", 1, []string{"reason:" + err.dropReason}, 1) + h.statsd.Count("datadog.tracer.traces_dropped", 1, []string{"reason:" + err.dropReason}, 1) return } trace = trace[n:] @@ -303,7 +309,7 @@ func (h *logTraceWriter) add(trace []*span) { } func (h *logTraceWriter) stop() { - h.config.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:shutdown"}, 1) + h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:shutdown"}, 1) h.flush() } diff --git a/ddtrace/tracer/writer_test.go b/ddtrace/tracer/writer_test.go index 69d1522a17..c0671803bd 100644 --- a/ddtrace/tracer/writer_test.go +++ b/ddtrace/tracer/writer_test.go @@ -14,6 +14,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" ) @@ -95,7 +96,11 @@ func TestLogWriter(t *testing.T) { t.Run("basic", func(t *testing.T) { assert := assert.New(t) var buf bytes.Buffer - h := newLogTraceWriter(newConfig()) + cfg := newConfig() + statsd, err := newStatsdClient(cfg) + require.NoError(t, err) + defer statsd.Close() + h := newLogTraceWriter(cfg, statsd) h.w = &buf s := makeSpan(0) for i := 0; i < 20; i++ { @@ -104,7 +109,7 @@ func TestLogWriter(t *testing.T) { h.flush() v := struct{ Traces [][]map[string]interface{} }{} d := json.NewDecoder(&buf) - err := d.Decode(&v) + err = d.Decode(&v) assert.NoError(err) assert.Len(v.Traces, 20, "Expected 20 traces, but have %d", len(v.Traces)) for _, t := range v.Traces { @@ -117,7 +122,11 @@ func TestLogWriter(t *testing.T) { t.Run("inf+nan", func(t *testing.T) { assert := assert.New(t) var buf bytes.Buffer - h := newLogTraceWriter(newConfig()) + cfg := newConfig() + statsd, err := newStatsdClient(cfg) + require.NoError(t, err) + defer statsd.Close() + h := newLogTraceWriter(cfg, statsd) h.w = &buf s := makeSpan(0) s.Metrics["nan"] = math.NaN() @@ -134,7 +143,11 @@ func TestLogWriter(t *testing.T) { t.Run("fullspan", func(t *testing.T) { assert := assert.New(t) var buf bytes.Buffer - h := newLogTraceWriter(newConfig()) + cfg := newConfig() + statsd, err := newStatsdClient(cfg) + require.NoError(t, err) + defer statsd.Close() + h := newLogTraceWriter(cfg, statsd) h.w = &buf type jsonSpan struct { TraceID string `json:"trace_id"` @@ -201,7 +214,7 @@ func TestLogWriter(t *testing.T) { h.flush() d := json.NewDecoder(&buf) var payload jsonPayload - err := d.Decode(&payload) + err = d.Decode(&payload) assert.NoError(err) assert.Equal(jsonPayload{[][]jsonSpan{{expected}}}, payload) }) @@ -229,14 +242,18 @@ func TestLogWriterOverflow(t *testing.T) { assert := assert.New(t) var buf bytes.Buffer var tg testStatsdClient - h := newLogTraceWriter(newConfig(withStatsdClient(&tg))) + cfg := newConfig(withStatsdClient(&tg)) + statsd, err := newStatsdClient(cfg) + require.NoError(t, err) + defer statsd.Close() + h := newLogTraceWriter(cfg, statsd) h.w = &buf s := makeSpan(10000) h.add([]*span{s}) h.flush() v := struct{ Traces [][]map[string]interface{} }{} d := json.NewDecoder(&buf) - err := d.Decode(&v) + err = d.Decode(&v) assert.Equal(io.EOF, err) assert.Contains(tg.CallNames(), "datadog.tracer.traces_dropped") }) @@ -245,7 +262,11 @@ func TestLogWriterOverflow(t *testing.T) { assert := assert.New(t) var buf bytes.Buffer var tg testStatsdClient - h := newLogTraceWriter(newConfig(withStatsdClient(&tg))) + cfg := newConfig(withStatsdClient(&tg)) + statsd, err := newStatsdClient(cfg) + require.NoError(t, err) + defer statsd.Close() + h := newLogTraceWriter(cfg, statsd) h.w = &buf s := makeSpan(10) var trace []*span @@ -256,7 +277,7 @@ func TestLogWriterOverflow(t *testing.T) { h.flush() v := struct{ Traces [][]map[string]interface{} }{} d := json.NewDecoder(&buf) - err := d.Decode(&v) + err = d.Decode(&v) assert.NoError(err) assert.Len(v.Traces, 1, "Expected 1 trace, but have %d", len(v.Traces)) spann := len(v.Traces[0]) @@ -272,7 +293,11 @@ func TestLogWriterOverflow(t *testing.T) { t.Run("two-large", func(t *testing.T) { assert := assert.New(t) var buf bytes.Buffer - h := newLogTraceWriter(newConfig()) + cfg := newConfig() + statsd, err := newStatsdClient(cfg) + require.NoError(t, err) + defer statsd.Close() + h := newLogTraceWriter(cfg, statsd) h.w = &buf s := makeSpan(4000) h.add([]*span{s}) @@ -280,7 +305,7 @@ func TestLogWriterOverflow(t *testing.T) { h.flush() v := struct{ Traces [][]map[string]interface{} }{} d := json.NewDecoder(&buf) - err := d.Decode(&v) + err = d.Decode(&v) assert.NoError(err) assert.Len(v.Traces, 1, "Expected 1 trace, but have %d", len(v.Traces)) assert.Len(v.Traces[0], 1, "Expected 1 span, but have %d", len(v.Traces[0]))