diff --git a/CHANGELOG.md b/CHANGELOG.md index 12083fcdb67..94f5ca008ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,6 +93,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE` - `OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE` - Adds `otlpgrpc.WithTimeout` option for configuring timeout to the otlp/gRPC exporter. (#1821) +- Adds `jaeger.WithMaxPacketSize` option for configuring maximum UDP packet size used when connecting to the Jaeger agent. (#1853) ### Fixed @@ -104,6 +105,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Zipkin Exporter: Ensure mapping between OTel and Zipkin span data complies with the specification. (#1688) - Fixed typo for default service name in Jaeger Exporter. (#1797) - Fix flaky OTLP for the reconnnection of the client connection. (#1527, #1814) +- Fix Jaeger exporter dropping of span batches that exceed the UDP packet size limit. + Instead, the exporter now splits the batch into smaller sendable batches. (#1828) ### Changed diff --git a/exporters/trace/jaeger/agent.go b/exporters/trace/jaeger/agent.go index 403f2326954..27db0e14a62 100644 --- a/exporters/trace/jaeger/agent.go +++ b/exporters/trace/jaeger/agent.go @@ -20,6 +20,7 @@ import ( "io" "log" "net" + "strings" "time" "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/third_party/thrift/lib/go/thrift" @@ -36,10 +37,11 @@ type agentClientUDP struct { genAgent.Agent io.Closer - connUDP udpConn - client *genAgent.AgentClient - maxPacketSize int // max size of datagram in bytes - thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span + connUDP udpConn + client *genAgent.AgentClient + maxPacketSize int // max size of datagram in bytes + thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span + thriftProtocol thrift.TProtocol } type udpConn interface { @@ -75,6 +77,7 @@ func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) { thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) protocolFactory := thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{}) + thriftProtocol := protocolFactory.GetProtocol(thriftBuffer) client := genAgent.NewAgentClientFactory(thriftBuffer, protocolFactory) var connUDP udpConn @@ -103,15 +106,78 @@ func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) { } return &agentClientUDP{ - connUDP: connUDP, - client: client, - maxPacketSize: params.MaxPacketSize, - thriftBuffer: thriftBuffer, + connUDP: connUDP, + client: client, + maxPacketSize: params.MaxPacketSize, + thriftBuffer: thriftBuffer, + thriftProtocol: thriftProtocol, }, nil } -// EmitBatch implements EmitBatch() of Agent interface +// EmitBatch buffers batch to fit into UDP packets and sends the data to the agent. func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error { + var errs []error + processSize, err := a.calcSizeOfSerializedThrift(ctx, batch.Process) + if err != nil { + // drop the batch if serialization of process fails. + return err + } + totalSize := processSize + var spans []*gen.Span + for _, span := range batch.Spans { + spanSize, err := a.calcSizeOfSerializedThrift(ctx, span) + if err != nil { + errs = append(errs, fmt.Errorf("thrift serialization failed: %v", span)) + continue + } + if spanSize+processSize >= a.maxPacketSize { + // drop the span that exceeds the limit. + errs = append(errs, fmt.Errorf("span too large to send: %v", span)) + continue + } + if totalSize+spanSize >= a.maxPacketSize { + if err := a.flush(ctx, &gen.Batch{ + Process: batch.Process, + Spans: spans, + }); err != nil { + errs = append(errs, err) + } + spans = spans[:0] + totalSize = processSize + } + totalSize += spanSize + spans = append(spans, span) + } + + if len(spans) > 0 { + if err := a.flush(ctx, &gen.Batch{ + Process: batch.Process, + Spans: spans, + }); err != nil { + errs = append(errs, err) + } + } + + if len(errs) == 1 { + return errs[0] + } else if len(errs) > 1 { + joined := a.makeJoinedErrorString(errs) + return fmt.Errorf("multiple errors during transform: %s", joined) + } + return nil +} + +// makeJoinedErrorString join all the errors to one error message. +func (a *agentClientUDP) makeJoinedErrorString(errs []error) string { + var errMsgs []string + for _, err := range errs { + errMsgs = append(errMsgs, err.Error()) + } + return strings.Join(errMsgs, ", ") +} + +// flush will send the batch of spans to the agent. +func (a *agentClientUDP) flush(ctx context.Context, batch *gen.Batch) error { a.thriftBuffer.Reset() if err := a.client.EmitBatch(ctx, batch); err != nil { return err @@ -124,6 +190,13 @@ func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error return err } +// calcSizeOfSerializedThrift calculate the serialized thrift packet size. +func (a *agentClientUDP) calcSizeOfSerializedThrift(ctx context.Context, thriftStruct thrift.TStruct) (int, error) { + a.thriftBuffer.Reset() + err := thriftStruct.Write(ctx, a.thriftProtocol) + return a.thriftBuffer.Len(), err +} + // Close implements Close() of io.Closer and closes the underlying UDP connection. func (a *agentClientUDP) Close() error { return a.connUDP.Close() diff --git a/exporters/trace/jaeger/agent_test.go b/exporters/trace/jaeger/agent_test.go index 2b787d97a12..44a230cc343 100644 --- a/exporters/trace/jaeger/agent_test.go +++ b/exporters/trace/jaeger/agent_test.go @@ -14,12 +14,16 @@ package jaeger import ( + "context" "log" "net" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + tracesdk "go.opentelemetry.io/otel/sdk/trace" ) func TestNewAgentClientUDPWithParamsBadHostport(t *testing.T) { @@ -99,3 +103,74 @@ func TestNewAgentClientUDPWithParamsReconnectingDisabled(t *testing.T) { assert.NoError(t, agentClient.Close()) } + +type errorHandler struct{ t *testing.T } + +func (eh errorHandler) Handle(err error) { assert.NoError(eh.t, err) } + +func TestJaegerAgentUDPLimitBatching(t *testing.T) { + otel.SetErrorHandler(errorHandler{t}) + + // 1500 spans, size 79559, does not fit within one UDP packet with the default size of 65000. + n := 1500 + s := make([]*tracesdk.SpanSnapshot, n) + for i := 0; i < n; i++ { + s[i] = &tracesdk.SpanSnapshot{} + } + + exp, err := NewRawExporter( + WithAgentEndpoint(WithAgentHost("localhost"), WithAgentPort("6831")), + ) + require.NoError(t, err) + + ctx := context.Background() + assert.NoError(t, exp.ExportSpans(ctx, s)) + assert.NoError(t, exp.Shutdown(ctx)) +} + +// generateALargeSpan generates a span with a long name. +func generateALargeSpan() *tracesdk.SpanSnapshot { + span := &tracesdk.SpanSnapshot{ + Name: "a-longer-name-that-makes-it-exceeds-limit", + } + return span +} + +func TestSpanExceedsMaxPacketLimit(t *testing.T) { + otel.SetErrorHandler(errorHandler{t}) + + // 106 is the serialized size of a span with default values. + maxSize := 106 + span := generateALargeSpan() + + largeSpans := []*tracesdk.SpanSnapshot{span, {}} + normalSpans := []*tracesdk.SpanSnapshot{{}, {}} + + exp, err := NewRawExporter( + WithAgentEndpoint(WithAgentHost("localhost"), WithAgentPort("6831"), WithMaxPacketSize(maxSize+1)), + ) + require.NoError(t, err) + + ctx := context.Background() + assert.Error(t, exp.ExportSpans(ctx, largeSpans)) + assert.NoError(t, exp.ExportSpans(ctx, normalSpans)) + assert.NoError(t, exp.Shutdown(ctx)) +} + +func TestEmitBatchWithMultipleErrors(t *testing.T) { + otel.SetErrorHandler(errorHandler{t}) + + span := generateALargeSpan() + largeSpans := []*tracesdk.SpanSnapshot{span, span} + // make max packet size smaller than span + maxSize := len(span.Name) + exp, err := NewRawExporter( + WithAgentEndpoint(WithAgentHost("localhost"), WithAgentPort("6831"), WithMaxPacketSize(maxSize)), + ) + require.NoError(t, err) + + ctx := context.Background() + err = exp.ExportSpans(ctx, largeSpans) + assert.Error(t, err) + require.Contains(t, err.Error(), "multiple errors") +} diff --git a/exporters/trace/jaeger/uploader.go b/exporters/trace/jaeger/uploader.go index ee50cc6280c..1f8638bb34d 100644 --- a/exporters/trace/jaeger/uploader.go +++ b/exporters/trace/jaeger/uploader.go @@ -114,6 +114,13 @@ func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption } } +// WithMaxPacketSize sets the maximum UDP packet size for transport to the Jaeger agent. +func WithMaxPacketSize(size int) AgentEndpointOption { + return func(o *AgentEndpointOptions) { + o.MaxPacketSize = size + } +} + // WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector. This will // use the following environment variables for configuration if no explicit option is provided: //