From 0e94b04c7c12522f8ca0c0ea1b257c1f50602cd8 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 1 Aug 2023 12:43:29 +0100 Subject: [PATCH] Fix a bug which prevented the `app_agent_receiver` integration from processing traces (#4655) * Save otel factories in traces instance * Try an http request periodically --- CHANGELOG.md | 2 + .../app_agent_receiver_test.go | 169 ++++++++++++++++++ pkg/traces/instance.go | 13 +- .../servicegraphprocessor/processor_test.go | 2 +- pkg/traces/traces_test.go | 2 +- .../traceutils/otel_meter_settings.go | 0 .../{internal => }/traceutils/server.go | 26 +-- 7 files changed, 193 insertions(+), 21 deletions(-) create mode 100644 pkg/integrations/v2/app_agent_receiver/app_agent_receiver_test.go rename pkg/traces/{internal => }/traceutils/otel_meter_settings.go (100%) rename pkg/traces/{internal => }/traceutils/server.go (90%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 36c9fae4f050..c3939d59ad0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,8 @@ Main (unreleased) - Fix potential goroutine leak in log file tailing in static mode. (@thampiotr) +- Fix a bug which prevented the `app_agent_receiver` integration from processing traces. (@ptodev) + v0.35.2 (2023-07-27) -------------------- diff --git a/pkg/integrations/v2/app_agent_receiver/app_agent_receiver_test.go b/pkg/integrations/v2/app_agent_receiver/app_agent_receiver_test.go new file mode 100644 index 000000000000..46cfba937093 --- /dev/null +++ b/pkg/integrations/v2/app_agent_receiver/app_agent_receiver_test.go @@ -0,0 +1,169 @@ +package app_agent_receiver + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/grafana/agent/pkg/integrations/v2" + "github.com/grafana/agent/pkg/server" + "github.com/grafana/agent/pkg/traces" + "github.com/grafana/agent/pkg/traces/traceutils" + "github.com/grafana/agent/pkg/util" + "github.com/phayes/freeport" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" + "gopkg.in/yaml.v2" +) + +func Test_ReceiveTracesAndRemoteWrite(t *testing.T) { + var err error + + // + // Prepare the traces instance + // + tracesCh := make(chan ptrace.Traces) + tracesAddr := traceutils.NewTestServer(t, func(t ptrace.Traces) { + tracesCh <- t + }) + + tracesCfgText := util.Untab(fmt.Sprintf(` +configs: +- name: TEST_TRACES + receivers: + jaeger: + protocols: + thrift_compact: + remote_write: + - endpoint: %s + insecure: true + batch: + timeout: 100ms + send_batch_size: 1 + `, tracesAddr)) + + var tracesCfg traces.Config + dec := yaml.NewDecoder(strings.NewReader(tracesCfgText)) + dec.SetStrict(true) + err = dec.Decode(&tracesCfg) + require.NoError(t, err) + + traces, err := traces.New(nil, nil, prometheus.NewRegistry(), tracesCfg, &server.HookLogger{}) + require.NoError(t, err) + t.Cleanup(traces.Stop) + + // + // Prepare the app_agent_receiver integration + // + integrationPort, err := freeport.GetFreePort() + require.NoError(t, err) + + var integrationCfg Config + cb := fmt.Sprintf(` +instance: TEST_APP_AGENT_RECEIVER +server: + cors_allowed_origins: + - '*' + host: '0.0.0.0' + max_allowed_payload_size: 5e+07 + port: %d + rate_limiting: + burstiness: 100 + enabled: true + rps: 100 +sourcemaps: + download: true +traces_instance: TEST_TRACES +`, integrationPort) + err = yaml.Unmarshal([]byte(cb), &integrationCfg) + require.NoError(t, err) + + logger := util.TestLogger(t) + globals := integrations.Globals{ + Tracing: traces, + } + + integration, err := integrationCfg.NewIntegration(logger, globals) + require.NoError(t, err) + + ctx := context.Background() + t.Cleanup(func() { ctx.Done() }) + // + // Start the app_agent_receiver integration + // + go func() { + err = integration.RunIntegration(ctx) + require.NoError(t, err) + }() + + // + // Send data to the integration's /collect endpoint + // + const PAYLOAD = ` +{ + "traces": { + "resourceSpans": [{ + "scopeSpans": [{ + "spans": [{ + "name": "TestSpan", + "attributes": [{ + "key": "foo", + "value": { "intValue": "11111" } + }, + { + "key": "boo", + "value": { "intValue": "22222" } + }, + { + "key": "user.email", + "value": { "stringValue": "user@email.com" } + }] + }] + }] + }] + }, + "logs": [], + "exceptions": [], + "measurements": [], + "meta": {} +} +` + + integrationURL := fmt.Sprintf("http://127.0.0.1:%d/collect", integrationPort) + + var httpResponse *http.Response + require.EventuallyWithT(t, func(c *assert.CollectT) { + req, err := http.NewRequest("POST", integrationURL, bytes.NewBuffer([]byte(PAYLOAD))) + assert.NoError(c, err) + + httpResponse, err = http.DefaultClient.Do(req) + assert.NoError(c, err) + }, 5*time.Second, 250*time.Millisecond) + + // + // Check that the data was received by the integration + // + resBody, err := io.ReadAll(httpResponse.Body) + require.NoError(t, err) + require.Equal(t, "ok", string(resBody[:])) + + require.Equal(t, http.StatusAccepted, httpResponse.StatusCode) + + // + // Check that the traces subsystem remote wrote the integration + // + select { + case <-time.After(10 * time.Second): + require.Fail(t, "failed to receive a span after 10 seconds") + case tr := <-tracesCh: + require.Equal(t, 1, tr.SpanCount()) + // Nothing to do, send succeeded. + } +} diff --git a/pkg/traces/instance.go b/pkg/traces/instance.go index ca0eb8cd0fe8..dbab2df3aa4b 100644 --- a/pkg/traces/instance.go +++ b/pkg/traces/instance.go @@ -23,8 +23,8 @@ import ( "github.com/grafana/agent/pkg/metrics/instance" "github.com/grafana/agent/pkg/traces/automaticloggingprocessor" "github.com/grafana/agent/pkg/traces/contextkeys" - "github.com/grafana/agent/pkg/traces/internal/traceutils" "github.com/grafana/agent/pkg/traces/servicegraphprocessor" + "github.com/grafana/agent/pkg/traces/traceutils" "github.com/grafana/agent/pkg/util" prom_client "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" @@ -125,6 +125,7 @@ func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig if err != nil { return fmt.Errorf("failed to load tracing factories: %w", err) } + i.factories = factories appinfo := component.BuildInfo{ Command: "agent", @@ -160,11 +161,11 @@ func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig i.service, err = service.New(ctx, service.Settings{ BuildInfo: appinfo, - Receivers: receiver.NewBuilder(otelConfig.Receivers, factories.Receivers), - Processors: processor.NewBuilder(otelConfig.Processors, factories.Processors), - Exporters: otelexporter.NewBuilder(otelConfig.Exporters, factories.Exporters), - Connectors: connector.NewBuilder(otelConfig.Connectors, factories.Connectors), - Extensions: extension.NewBuilder(otelConfig.Extensions, factories.Extensions), + Receivers: receiver.NewBuilder(otelConfig.Receivers, i.factories.Receivers), + Processors: processor.NewBuilder(otelConfig.Processors, i.factories.Processors), + Exporters: otelexporter.NewBuilder(otelConfig.Exporters, i.factories.Exporters), + Connectors: connector.NewBuilder(otelConfig.Connectors, i.factories.Connectors), + Extensions: extension.NewBuilder(otelConfig.Extensions, i.factories.Extensions), OtelMetricViews: servicegraphprocessor.OtelMetricViews(), OtelMetricReader: promExporter, DisableProcessMetrics: true, diff --git a/pkg/traces/servicegraphprocessor/processor_test.go b/pkg/traces/servicegraphprocessor/processor_test.go index f9e0d9cb6219..a2577a3d034d 100644 --- a/pkg/traces/servicegraphprocessor/processor_test.go +++ b/pkg/traces/servicegraphprocessor/processor_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/grafana/agent/pkg/traces/internal/traceutils" + "github.com/grafana/agent/pkg/traces/traceutils" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" diff --git a/pkg/traces/traces_test.go b/pkg/traces/traces_test.go index 1ebdf1c09cb3..7557a036d29f 100644 --- a/pkg/traces/traces_test.go +++ b/pkg/traces/traces_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/grafana/agent/pkg/server" - "github.com/grafana/agent/pkg/traces/internal/traceutils" + "github.com/grafana/agent/pkg/traces/traceutils" "github.com/grafana/agent/pkg/util" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/traces/internal/traceutils/otel_meter_settings.go b/pkg/traces/traceutils/otel_meter_settings.go similarity index 100% rename from pkg/traces/internal/traceutils/otel_meter_settings.go rename to pkg/traces/traceutils/otel_meter_settings.go diff --git a/pkg/traces/internal/traceutils/server.go b/pkg/traces/traceutils/server.go similarity index 90% rename from pkg/traces/internal/traceutils/server.go rename to pkg/traces/traceutils/server.go index af37633fbaca..4889e873df42 100644 --- a/pkg/traces/internal/traceutils/server.go +++ b/pkg/traces/traceutils/server.go @@ -27,40 +27,40 @@ import ( "gopkg.in/yaml.v3" ) -// Server is a Tracing testing server that invokes a function every time a span +// server is a Tracing testing server that invokes a function every time a span // is received. -type Server struct { +type server struct { service *service.Service } -// NewTestServer creates a new Server for testing, where received traces will +// NewTestServer creates a new server for testing, where received traces will // call the callback function. The returned string is the address where traces // can be sent using OTLP. func NewTestServer(t *testing.T, callback func(ptrace.Traces)) string { t.Helper() - srv, listenAddr, err := NewServerWithRandomPort(callback) + srv, listenAddr, err := newServerWithRandomPort(callback) if err != nil { t.Fatalf("failed to create OTLP server: %s", err) } t.Cleanup(func() { - err := srv.Stop() + err := srv.stop() assert.NoError(t, err) }) return listenAddr } -// NewServerWithRandomPort calls NewServer with a random port >49152 and +// newServerWithRandomPort calls NewServer with a random port >49152 and // <65535. It will try up to five times before failing. -func NewServerWithRandomPort(callback func(ptrace.Traces)) (srv *Server, addr string, err error) { +func newServerWithRandomPort(callback func(ptrace.Traces)) (srv *server, addr string, err error) { var lastError error for i := 0; i < 5; i++ { port := rand.Intn(65535-49152) + 49152 listenAddr := fmt.Sprintf("127.0.0.1:%d", port) - srv, err = NewServer(listenAddr, callback) + srv, err = newServer(listenAddr, callback) if err != nil { lastError = err continue @@ -72,9 +72,9 @@ func NewServerWithRandomPort(callback func(ptrace.Traces)) (srv *Server, addr st return nil, "", fmt.Errorf("failed 5 times to create a server. last error: %w", lastError) } -// NewServer creates an OTLP-accepting server that calls a function when a +// newServer creates an OTLP-accepting server that calls a function when a // trace is received. This is primarily useful for testing. -func NewServer(addr string, callback func(ptrace.Traces)) (*Server, error) { +func newServer(addr string, callback func(ptrace.Traces)) (*server, error) { conf := util.Untab(fmt.Sprintf(` processors: func_processor: @@ -163,13 +163,13 @@ func NewServer(addr string, callback func(ptrace.Traces)) (*Server, error) { return nil, fmt.Errorf("failed to start Otel service: %w", err) } - return &Server{ + return &server{ service: svc, }, nil } -// Stop stops the testing server. -func (s *Server) Stop() error { +// stop stops the testing server. +func (s *server) stop() error { shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()