Skip to content

Commit

Permalink
Fix a bug which prevented the app_agent_receiver integration from p…
Browse files Browse the repository at this point in the history
…rocessing traces (#4655)

* Save otel factories in traces instance

* Try an http request periodically
  • Loading branch information
ptodev authored and clayton-cornell committed Aug 14, 2023
1 parent 704b6d1 commit 0e94b04
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Main (unreleased)

- Fix potential goroutine leak in log file tailing in static mode. (@thampiotr)

- Fix a bug which prevented the `app_agent_receiver` integration from processing traces. (@ptodev)

v0.35.2 (2023-07-27)
--------------------

Expand Down
169 changes: 169 additions & 0 deletions pkg/integrations/v2/app_agent_receiver/app_agent_receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package app_agent_receiver

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/grafana/agent/pkg/integrations/v2"
"github.com/grafana/agent/pkg/server"
"github.com/grafana/agent/pkg/traces"
"github.com/grafana/agent/pkg/traces/traceutils"
"github.com/grafana/agent/pkg/util"
"github.com/phayes/freeport"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"gopkg.in/yaml.v2"
)

func Test_ReceiveTracesAndRemoteWrite(t *testing.T) {
var err error

//
// Prepare the traces instance
//
tracesCh := make(chan ptrace.Traces)
tracesAddr := traceutils.NewTestServer(t, func(t ptrace.Traces) {
tracesCh <- t
})

tracesCfgText := util.Untab(fmt.Sprintf(`
configs:
- name: TEST_TRACES
receivers:
jaeger:
protocols:
thrift_compact:
remote_write:
- endpoint: %s
insecure: true
batch:
timeout: 100ms
send_batch_size: 1
`, tracesAddr))

var tracesCfg traces.Config
dec := yaml.NewDecoder(strings.NewReader(tracesCfgText))
dec.SetStrict(true)
err = dec.Decode(&tracesCfg)
require.NoError(t, err)

traces, err := traces.New(nil, nil, prometheus.NewRegistry(), tracesCfg, &server.HookLogger{})
require.NoError(t, err)
t.Cleanup(traces.Stop)

//
// Prepare the app_agent_receiver integration
//
integrationPort, err := freeport.GetFreePort()
require.NoError(t, err)

var integrationCfg Config
cb := fmt.Sprintf(`
instance: TEST_APP_AGENT_RECEIVER
server:
cors_allowed_origins:
- '*'
host: '0.0.0.0'
max_allowed_payload_size: 5e+07
port: %d
rate_limiting:
burstiness: 100
enabled: true
rps: 100
sourcemaps:
download: true
traces_instance: TEST_TRACES
`, integrationPort)
err = yaml.Unmarshal([]byte(cb), &integrationCfg)
require.NoError(t, err)

logger := util.TestLogger(t)
globals := integrations.Globals{
Tracing: traces,
}

integration, err := integrationCfg.NewIntegration(logger, globals)
require.NoError(t, err)

ctx := context.Background()
t.Cleanup(func() { ctx.Done() })
//
// Start the app_agent_receiver integration
//
go func() {
err = integration.RunIntegration(ctx)
require.NoError(t, err)
}()

//
// Send data to the integration's /collect endpoint
//
const PAYLOAD = `
{
"traces": {
"resourceSpans": [{
"scopeSpans": [{
"spans": [{
"name": "TestSpan",
"attributes": [{
"key": "foo",
"value": { "intValue": "11111" }
},
{
"key": "boo",
"value": { "intValue": "22222" }
},
{
"key": "user.email",
"value": { "stringValue": "user@email.com" }
}]
}]
}]
}]
},
"logs": [],
"exceptions": [],
"measurements": [],
"meta": {}
}
`

integrationURL := fmt.Sprintf("http://127.0.0.1:%d/collect", integrationPort)

var httpResponse *http.Response
require.EventuallyWithT(t, func(c *assert.CollectT) {
req, err := http.NewRequest("POST", integrationURL, bytes.NewBuffer([]byte(PAYLOAD)))
assert.NoError(c, err)

httpResponse, err = http.DefaultClient.Do(req)
assert.NoError(c, err)
}, 5*time.Second, 250*time.Millisecond)

//
// Check that the data was received by the integration
//
resBody, err := io.ReadAll(httpResponse.Body)
require.NoError(t, err)
require.Equal(t, "ok", string(resBody[:]))

require.Equal(t, http.StatusAccepted, httpResponse.StatusCode)

//
// Check that the traces subsystem remote wrote the integration
//
select {
case <-time.After(10 * time.Second):
require.Fail(t, "failed to receive a span after 10 seconds")
case tr := <-tracesCh:
require.Equal(t, 1, tr.SpanCount())
// Nothing to do, send succeeded.
}
}
13 changes: 7 additions & 6 deletions pkg/traces/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/grafana/agent/pkg/metrics/instance"
"github.com/grafana/agent/pkg/traces/automaticloggingprocessor"
"github.com/grafana/agent/pkg/traces/contextkeys"
"github.com/grafana/agent/pkg/traces/internal/traceutils"
"github.com/grafana/agent/pkg/traces/servicegraphprocessor"
"github.com/grafana/agent/pkg/traces/traceutils"
"github.com/grafana/agent/pkg/util"
prom_client "github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -125,6 +125,7 @@ func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig
if err != nil {
return fmt.Errorf("failed to load tracing factories: %w", err)
}
i.factories = factories

appinfo := component.BuildInfo{
Command: "agent",
Expand Down Expand Up @@ -160,11 +161,11 @@ func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig

i.service, err = service.New(ctx, service.Settings{
BuildInfo: appinfo,
Receivers: receiver.NewBuilder(otelConfig.Receivers, factories.Receivers),
Processors: processor.NewBuilder(otelConfig.Processors, factories.Processors),
Exporters: otelexporter.NewBuilder(otelConfig.Exporters, factories.Exporters),
Connectors: connector.NewBuilder(otelConfig.Connectors, factories.Connectors),
Extensions: extension.NewBuilder(otelConfig.Extensions, factories.Extensions),
Receivers: receiver.NewBuilder(otelConfig.Receivers, i.factories.Receivers),
Processors: processor.NewBuilder(otelConfig.Processors, i.factories.Processors),
Exporters: otelexporter.NewBuilder(otelConfig.Exporters, i.factories.Exporters),
Connectors: connector.NewBuilder(otelConfig.Connectors, i.factories.Connectors),
Extensions: extension.NewBuilder(otelConfig.Extensions, i.factories.Extensions),
OtelMetricViews: servicegraphprocessor.OtelMetricViews(),
OtelMetricReader: promExporter,
DisableProcessMetrics: true,
Expand Down
2 changes: 1 addition & 1 deletion pkg/traces/servicegraphprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
"time"

"github.com/grafana/agent/pkg/traces/internal/traceutils"
"github.com/grafana/agent/pkg/traces/traceutils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
Expand Down
2 changes: 1 addition & 1 deletion pkg/traces/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/grafana/agent/pkg/server"
"github.com/grafana/agent/pkg/traces/internal/traceutils"
"github.com/grafana/agent/pkg/traces/traceutils"
"github.com/grafana/agent/pkg/util"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,40 @@ import (
"gopkg.in/yaml.v3"
)

// Server is a Tracing testing server that invokes a function every time a span
// server is a Tracing testing server that invokes a function every time a span
// is received.
type Server struct {
type server struct {
service *service.Service
}

// NewTestServer creates a new Server for testing, where received traces will
// NewTestServer creates a new server for testing, where received traces will
// call the callback function. The returned string is the address where traces
// can be sent using OTLP.
func NewTestServer(t *testing.T, callback func(ptrace.Traces)) string {
t.Helper()

srv, listenAddr, err := NewServerWithRandomPort(callback)
srv, listenAddr, err := newServerWithRandomPort(callback)
if err != nil {
t.Fatalf("failed to create OTLP server: %s", err)
}
t.Cleanup(func() {
err := srv.Stop()
err := srv.stop()
assert.NoError(t, err)
})

return listenAddr
}

// NewServerWithRandomPort calls NewServer with a random port >49152 and
// newServerWithRandomPort calls NewServer with a random port >49152 and
// <65535. It will try up to five times before failing.
func NewServerWithRandomPort(callback func(ptrace.Traces)) (srv *Server, addr string, err error) {
func newServerWithRandomPort(callback func(ptrace.Traces)) (srv *server, addr string, err error) {
var lastError error

for i := 0; i < 5; i++ {
port := rand.Intn(65535-49152) + 49152
listenAddr := fmt.Sprintf("127.0.0.1:%d", port)

srv, err = NewServer(listenAddr, callback)
srv, err = newServer(listenAddr, callback)
if err != nil {
lastError = err
continue
Expand All @@ -72,9 +72,9 @@ func NewServerWithRandomPort(callback func(ptrace.Traces)) (srv *Server, addr st
return nil, "", fmt.Errorf("failed 5 times to create a server. last error: %w", lastError)
}

// NewServer creates an OTLP-accepting server that calls a function when a
// newServer creates an OTLP-accepting server that calls a function when a
// trace is received. This is primarily useful for testing.
func NewServer(addr string, callback func(ptrace.Traces)) (*Server, error) {
func newServer(addr string, callback func(ptrace.Traces)) (*server, error) {
conf := util.Untab(fmt.Sprintf(`
processors:
func_processor:
Expand Down Expand Up @@ -163,13 +163,13 @@ func NewServer(addr string, callback func(ptrace.Traces)) (*Server, error) {
return nil, fmt.Errorf("failed to start Otel service: %w", err)
}

return &Server{
return &server{
service: svc,
}, nil
}

// Stop stops the testing server.
func (s *Server) Stop() error {
// stop stops the testing server.
func (s *server) stop() error {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand Down

0 comments on commit 0e94b04

Please sign in to comment.