Skip to content

Commit

Permalink
Fix batchProcessor to set correct span format flags (#4796)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

Recently I've noticed that collector sends wrong tags "format" and
"transport" in metric `jaeger_collector_spans_received_total`.

Root cause: when batchProcessor was extended to be called from OTLP
Receiver, it was given SpanOptions parameter that could specify a
different span format OTLP, vs. always setting Proto. But the processor
was still using hardcoded Proto format when invoking the next processor.

## How was this change tested?
Added unit test

## Checklist
- [ x ] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [ x ] I have signed all commits
- [ - ] I have added unit tests for the new functionality (not needed)
- [ x ] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Alexander Kozlov <aleksandr.kozlov@workato.com>
Signed-off-by: Yuri Shkuro <github@ysh.us>
Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
Co-authored-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
3 people authored Oct 17, 2023
1 parent 7cd109f commit 6919fd7
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
4 changes: 2 additions & 2 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
}
}
_, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
})
if err != nil {
Expand Down
62 changes: 62 additions & 0 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type mockSpanProcessor struct {
mux sync.Mutex
spans []*model.Span
tenants map[string]bool
transport processor.InboundTransport
spanFormat processor.SpanFormat
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) {
Expand All @@ -51,6 +53,8 @@ func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.Spa
p.tenants = make(map[string]bool)
}
p.tenants[opts.Tenant] = true
p.transport = opts.InboundTransport
p.spanFormat = opts.SpanFormat
return oks, p.expectedError
}

Expand All @@ -66,11 +70,25 @@ func (p *mockSpanProcessor) getTenants() map[string]bool {
return p.tenants
}

func (p *mockSpanProcessor) getTransport() processor.InboundTransport {
p.mux.Lock()
defer p.mux.Unlock()
return p.transport
}

func (p *mockSpanProcessor) getSpanFormat() processor.SpanFormat {
p.mux.Lock()
defer p.mux.Unlock()
return p.spanFormat
}

func (p *mockSpanProcessor) reset() {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = nil
p.tenants = nil
p.transport = ""
p.spanFormat = ""
}

func (p *mockSpanProcessor) Close() error {
Expand Down Expand Up @@ -363,3 +381,47 @@ func TestGetTenant(t *testing.T) {
})
}
}

func TestBatchConsumer(t *testing.T) {
tests := []struct {
name string
batch model.Batch
transport processor.InboundTransport
spanFormat processor.SpanFormat
expectedTransport processor.InboundTransport
expectedSpanFormat processor.SpanFormat
}{
{
name: "batchconsumer passes provided span options to processor",
batch: model.Batch{
Process: &model.Process{ServiceName: "testservice"},
Spans: []*model.Span{
{OperationName: "test-op", Process: &model.Process{ServiceName: "foo"}},
},
},
transport: processor.GRPCTransport,
spanFormat: processor.OTLPSpanFormat,
expectedTransport: processor.GRPCTransport,
expectedSpanFormat: processor.OTLPSpanFormat,
},
}

logger, _ := testutils.NewLogger()
for _, tc := range tests {
tc := tc
t.Parallel()
t.Run(tc.name, func(t *testing.T) {
processor := mockSpanProcessor{}
batchConsumer := newBatchConsumer(logger, &processor, tc.transport, tc.spanFormat, tenancy.NewManager(&tenancy.Options{}))
err := batchConsumer.consume(context.Background(), &model.Batch{
Process: &model.Process{ServiceName: "testservice"},
Spans: []*model.Span{
{OperationName: "test-op", Process: &model.Process{ServiceName: "foo"}},
},
})
assert.NoError(t, err)
assert.Equal(t, tc.transport, processor.getTransport())
assert.Equal(t, tc.expectedSpanFormat, processor.getSpanFormat())
})
}
}

0 comments on commit 6919fd7

Please sign in to comment.