diff --git a/cmd/query/app/apiv3/otlp_translator.go b/cmd/query/app/apiv3/otlp_translator.go index e985cdc66d8..7c8277479e8 100644 --- a/cmd/query/app/apiv3/otlp_translator.go +++ b/cmd/query/app/apiv3/otlp_translator.go @@ -47,7 +47,7 @@ func modelToOTLP(spans []*model.Span) ([]*tracev1.ResourceSpans, error) { return chunk.ResourceSpans, nil } -func OTLP2model(OTLPSpans []byte) ([]model.Trace, error) { +func OTLP2model(OTLPSpans []byte) ([]*model.Batch, error) { ptraceUnmarshaler := ptrace.JSONUnmarshaler{} otlpTraces, err := ptraceUnmarshaler.UnmarshalTraces(OTLPSpans) if err != nil { @@ -59,12 +59,24 @@ func OTLP2model(OTLPSpans []byte) ([]model.Trace, error) { fmt.Println(err) return nil, fmt.Errorf("cannot transform OTLP to Jaeger: %w", err) } + + return jaegerBatches, nil +} + +func BatchesToTraces(jaegerBatches []*model.Batch) ([]model.Trace, error) { var jaegerTraces []model.Trace + spanMap := make(map[model.TraceID][]*model.Span) for _, v := range jaegerBatches { jaegerTrace := model.Trace{ Spans: v.Spans, } jaegerTrace.DenormalizeProcess(v.Process) + jaegerTrace.FlattenToSpansMaps(spanMap) + } + for _, v := range spanMap { + jaegerTrace := model.Trace{ + Spans: v, + } jaegerTraces = append(jaegerTraces, jaegerTrace) } return jaegerTraces, nil diff --git a/cmd/query/app/apiv3/package_test.go b/cmd/query/app/apiv3/package_test.go index c56173a61a1..61051eefc8b 100644 --- a/cmd/query/app/apiv3/package_test.go +++ b/cmd/query/app/apiv3/package_test.go @@ -6,9 +6,67 @@ package apiv3 import ( "testing" + "github.com/jaegertracing/jaeger/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/goleak" ) func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } + +func TestBatchesToTraces(t *testing.T) { + b1 := &model.Batch{ + Spans: []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"}, + {TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"}, + }, + Process: model.NewProcess("process1", model.KeyValues{}), + } + + b2 := &model.Batch{ + Spans: []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"}, + }, + Process: model.NewProcess("process2", model.KeyValues{}), + } + + mainBatch := []*model.Batch{b1, b2} + + traces, err := BatchesToTraces(mainBatch) + require.Nil(t, err) + + s1 := []*model.Span{ + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(1), + OperationName: "x", + Process: model.NewProcess("process1", model.KeyValues{}), + }, + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(2), + OperationName: "z", + Process: model.NewProcess("process2", model.KeyValues{}), + }, + } + + s2 := []*model.Span{ + { + TraceID: model.NewTraceID(1, 3), + SpanID: model.NewSpanID(2), + OperationName: "y", + Process: model.NewProcess("process1", model.KeyValues{}), + }, + } + + t1 := model.Trace{ + Spans: s1, + } + t2 := model.Trace{ + Spans: s2, + } + mainTrace := []model.Trace{t1, t2} + assert.Equal(t, mainTrace, traces) +} diff --git a/cmd/query/app/http_handler.go b/cmd/query/app/http_handler.go index af3d5a7abd6..bd8e9980bde 100644 --- a/cmd/query/app/http_handler.go +++ b/cmd/query/app/http_handler.go @@ -203,7 +203,13 @@ func (aH *APIHandler) transformOTLP(w http.ResponseWriter, r *http.Request) { } var uiErrors []structuredError - traces, err := apiv3.OTLP2model(body) + batches, err := apiv3.OTLP2model(body) + + if aH.handleError(w, err, http.StatusInternalServerError) { + return + } + + traces, err := apiv3.BatchesToTraces(batches) if aH.handleError(w, err, http.StatusInternalServerError) { return diff --git a/model/trace.go b/model/trace.go index 80dc8fa1d1d..2d0dd763c28 100644 --- a/model/trace.go +++ b/model/trace.go @@ -38,3 +38,15 @@ func (m *Trace) DenormalizeProcess(p *Process) { v.Process = p } } + +func (m *Trace) FlattenToSpansMaps(spanMap map[TraceID][]*Span) { + for _, v := range m.Spans { + val, ok := spanMap[v.TraceID] + if !ok { + spanMap[v.TraceID] = []*Span{v} + } else { + spanMap[v.TraceID] = append(val, v) + } + } + +} diff --git a/model/trace_test.go b/model/trace_test.go index 01a6f86c6c6..6256b974bc5 100644 --- a/model/trace_test.go +++ b/model/trace_test.go @@ -67,3 +67,68 @@ func TestTraceNormalizeTimestamps(t *testing.T) { assert.Equal(t, span.StartTime, tt1.UTC()) assert.Equal(t, span.Logs[0].Timestamp, tt2.UTC()) } + +func TestFlattenToSpanMaps(t *testing.T) { + b1 := &model.Trace{ + Spans: []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"}, + {TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"}, + }, + } + + b2 := &model.Trace{ + Spans: []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"}, + }, + } + + t1 := []*model.Span{ + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(1), OperationName: "x"}, + {TraceID: model.NewTraceID(1, 2), SpanID: model.NewSpanID(2), OperationName: "z"}} + + t2 := []*model.Span{{TraceID: model.NewTraceID(1, 3), SpanID: model.NewSpanID(2), OperationName: "y"}} + spanMap := make(map[model.TraceID][]*model.Span) + b1.FlattenToSpansMaps(spanMap) + b2.FlattenToSpansMaps(spanMap) + assert.Equal(t, t1, spanMap[model.NewTraceID(1, 2)]) + assert.Equal(t, t2, spanMap[model.NewTraceID(1, 3)]) +} + + +func TestDenormalizeProcess(t *testing.T){ + t1 := &model.Trace{ + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(1), + OperationName: "x", + }, + { + TraceID: model.NewTraceID(1, 3), + SpanID: model.NewSpanID(2), + OperationName: "y", + }, + }, + } + p1 := model.NewProcess("process1", model.KeyValues{}) + + t2 := &model.Trace{ + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(1), + OperationName: "x", + Process: model.NewProcess("process1", model.KeyValues{}), + }, + { + TraceID: model.NewTraceID(1, 3), + SpanID: model.NewSpanID(2), + OperationName: "y", + Process: model.NewProcess("process1", model.KeyValues{}), + }, + }, + } + t1.DenormalizeProcess(p1) + assert.Equal(t, t1, t2) + +}