From fd7459002e934e5e65fb37fd8f67a6222b8e43f8 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Mon, 23 Sep 2024 12:50:51 -0400 Subject: [PATCH] tweak zipkin adjuster name & move to new file Signed-off-by: Chris Danis --- cmd/query/app/querysvc/adjusters.go | 2 +- model/adjuster/clockskew.go | 2 +- model/adjuster/span_id_deduper.go | 98 ---------------- model/adjuster/span_id_deduper_test.go | 60 +--------- model/adjuster/zipkin_span_id_uniquify.go | 107 ++++++++++++++++++ .../adjuster/zipkin_span_id_uniquify_test.go | 103 +++++++++++++++++ plugin/storage/memory/memory.go | 4 +- 7 files changed, 216 insertions(+), 160 deletions(-) create mode 100644 model/adjuster/zipkin_span_id_uniquify.go create mode 100644 model/adjuster/zipkin_span_id_uniquify_test.go diff --git a/cmd/query/app/querysvc/adjusters.go b/cmd/query/app/querysvc/adjusters.go index 0e5ac21d956c..be2469f4ab00 100644 --- a/cmd/query/app/querysvc/adjusters.go +++ b/cmd/query/app/querysvc/adjusters.go @@ -14,7 +14,7 @@ import ( // before returning the data to the API clients. func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster { return []adjuster.Adjuster{ - adjuster.ZipkinSpanIDRenamer(), + adjuster.ZipkinSpanIDUniquifier(), adjuster.DedupeBySpanID(), adjuster.ClockSkew(maxClockSkewAdjust), adjuster.IPTagAdjuster(), diff --git a/model/adjuster/clockskew.go b/model/adjuster/clockskew.go index e6fec7f9dce9..30f922307dcf 100644 --- a/model/adjuster/clockskew.go +++ b/model/adjuster/clockskew.go @@ -19,7 +19,7 @@ import ( // child spans do not start before or end after their parent spans. // // The algorithm assumes that all spans have unique IDs, so the trace may need -// to go through another adjuster first, such as ZipkinSpanIDRenamer. +// to go through another adjuster first, such as ZipkinSpanIDUniquifier. // // This adjuster never returns any errors. Instead it records any issues // it encounters in Span.Warnings. diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/span_id_deduper.go index fbee318487d5..6886c4649939 100644 --- a/model/adjuster/span_id_deduper.go +++ b/model/adjuster/span_id_deduper.go @@ -5,29 +5,9 @@ package adjuster import ( - "errors" - "math" - "github.com/jaegertracing/jaeger/model" ) -// ZipkinSpanIDRenamer returns an adjuster that changes span ids for server -// spans (i.e. spans with tag: span.kind == server) if there is another -// client span that shares the same span ID. This is needed to deal with -// Zipkin-style clients that reuse the same span ID for both client and server -// side of an RPC call. Jaeger UI expects all spans to have unique IDs. -// -// This adjuster never returns any errors. Instead it records any issues -// it encounters in Span.Warnings. -func ZipkinSpanIDRenamer() Adjuster { - return Func(func(trace *model.Trace) (*model.Trace, error) { - deduper := &spanIDDeduper{trace: trace} - deduper.groupSpansByID() - deduper.uniquifyServerSpanIDs() - return deduper.trace, nil - }) -} - // DedupeBySpanID returns an adjuster that removes all but one span with the same SpanID. // This is useful for when spans are duplicated in archival storage, as happens with // ElasticSearch archival. @@ -40,87 +20,9 @@ func DedupeBySpanID() Adjuster { }) } -const ( - warningTooManySpans = "cannot assign unique span ID, too many spans in the trace" -) - -var maxSpanID = model.NewSpanID(math.MaxUint64) - -type spanIDDeduper struct { - trace *model.Trace - spansByID map[model.SpanID][]*model.Span - maxUsedID model.SpanID -} - -// groupSpansByID groups spans with the same ID returning a map id -> []Span -func (d *spanIDDeduper) groupSpansByID() { - spansByID := make(map[model.SpanID][]*model.Span) - for _, span := range d.trace.Spans { - if spans, ok := spansByID[span.SpanID]; ok { - // TODO maybe return an error if more than 2 spans found - spansByID[span.SpanID] = append(spans, span) - } else { - spansByID[span.SpanID] = []*model.Span{span} - } - } - d.spansByID = spansByID -} - func (d *spanIDDeduper) dedupeSpansByID() { d.trace.Spans = nil for _, spans := range d.spansByID { d.trace.Spans = append(d.trace.Spans, spans[0]) } } - -func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool { - for _, span := range d.spansByID[spanID] { - if span.IsRPCClient() { - return true - } - } - return false -} - -func (d *spanIDDeduper) uniquifyServerSpanIDs() { - oldToNewSpanIDs := make(map[model.SpanID]model.SpanID) - for _, span := range d.trace.Spans { - // only replace span IDs for server-side spans that share the ID with something else - if span.IsRPCServer() && d.isSharedWithClientSpan(span.SpanID) { - newID, err := d.makeUniqueSpanID() - if err != nil { - span.Warnings = append(span.Warnings, err.Error()) - continue - } - oldToNewSpanIDs[span.SpanID] = newID - span.ReplaceParentID(span.SpanID) // previously shared ID is the new parent - span.SpanID = newID - } - } - d.swapParentIDs(oldToNewSpanIDs) -} - -// swapParentIDs corrects ParentSpanID of all spans that are children of the server -// spans whose IDs we made unique. -func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) { - for _, span := range d.trace.Spans { - if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok { - if span.SpanID != parentID { - span.ReplaceParentID(parentID) - } - } - } -} - -// makeUniqueSpanID returns a new ID that is not used in the trace, -// or an error if such ID cannot be generated, which is unlikely, -// given that the whole space of span IDs is 2^64. -func (d *spanIDDeduper) makeUniqueSpanID() (model.SpanID, error) { - for id := d.maxUsedID + 1; id < maxSpanID; id++ { - if _, ok := d.spansByID[id]; !ok { - d.maxUsedID = id - return id, nil - } - } - return 0, errors.New(warningTooManySpans) -} diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/span_id_deduper_test.go index aa571cba04dd..162823a90274 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/span_id_deduper_test.go @@ -14,13 +14,7 @@ import ( "github.com/jaegertracing/jaeger/model" ) -var ( - clientSpanID = model.NewSpanID(1) - anotherSpanID = model.NewSpanID(11) - keySpanKind = "span.kind" -) - -func newTrace() *model.Trace { +func newDuplicatedSpansTrace() *model.Trace { traceID := model.NewTraceID(0, 42) return &model.Trace{ Spans: []*model.Span{ @@ -52,58 +46,8 @@ func newTrace() *model.Trace { } } -func TestZipkinSpanIDRenamerTriggered(t *testing.T) { - trace := newTrace() - deduper := ZipkinSpanIDRenamer() - trace, err := deduper.Adjust(trace) - require.NoError(t, err) - - clientSpan := trace.Spans[0] - assert.Equal(t, clientSpanID, clientSpan.SpanID, "client span ID should not change") - - serverSpan := trace.Spans[1] - assert.Equal(t, clientSpanID+1, serverSpan.SpanID, "server span ID should be reassigned") - assert.Equal(t, clientSpan.SpanID, serverSpan.ParentSpanID(), "client span should be server span's parent") - - thirdSpan := trace.Spans[2] - assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change") - assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") -} - -func TestZipkinSpanIDRenamerNotTriggered(t *testing.T) { - trace := newTrace() - trace.Spans = trace.Spans[1:] // remove client span - - deduper := ZipkinSpanIDRenamer() - trace, err := deduper.Adjust(trace) - require.NoError(t, err) - - serverSpanID := clientSpanID // for better readability - serverSpan := trace.Spans[0] - assert.Equal(t, serverSpanID, serverSpan.SpanID, "server span ID should be unchanged") - - thirdSpan := trace.Spans[1] - assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change") - assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") -} - -func TestZipkinSpanIDRenamerError(t *testing.T) { - trace := newTrace() - - maxID := int64(-1) - assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1") - - deduper := &spanIDDeduper{trace: trace} - deduper.groupSpansByID() - deduper.maxUsedID = maxSpanID - 1 - deduper.uniquifyServerSpanIDs() - if assert.Len(t, trace.Spans[1].Warnings, 1) { - assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0]) - } -} - func TestDedupeBySpanID(t *testing.T) { - trace := newTrace() + trace := newZipkinTrace() deduper := DedupeBySpanID() trace, err := deduper.Adjust(trace) require.NoError(t, err) diff --git a/model/adjuster/zipkin_span_id_uniquify.go b/model/adjuster/zipkin_span_id_uniquify.go new file mode 100644 index 000000000000..5f8a86a32ccc --- /dev/null +++ b/model/adjuster/zipkin_span_id_uniquify.go @@ -0,0 +1,107 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "errors" + "math" + + "github.com/jaegertracing/jaeger/model" +) + +// ZipkinSpanIDUniquifier returns an adjuster that changes span ids for server +// spans (i.e. spans with tag: span.kind == server) if there is another +// client span that shares the same span ID. This is needed to deal with +// Zipkin-style clients that reuse the same span ID for both client and server +// side of an RPC call. Jaeger UI expects all spans to have unique IDs. +// +// This adjuster never returns any errors. Instead it records any issues +// it encounters in Span.Warnings. +func ZipkinSpanIDUniquifier() Adjuster { + return Func(func(trace *model.Trace) (*model.Trace, error) { + deduper := &spanIDDeduper{trace: trace} + deduper.groupSpansByID() + deduper.uniquifyServerSpanIDs() + return deduper.trace, nil + }) +} + +const ( + warningTooManySpans = "cannot assign unique span ID, too many spans in the trace" +) + +var maxSpanID = model.NewSpanID(math.MaxUint64) + +type spanIDDeduper struct { + trace *model.Trace + spansByID map[model.SpanID][]*model.Span + maxUsedID model.SpanID +} + +// groupSpansByID groups spans with the same ID returning a map id -> []Span +func (d *spanIDDeduper) groupSpansByID() { + spansByID := make(map[model.SpanID][]*model.Span) + for _, span := range d.trace.Spans { + if spans, ok := spansByID[span.SpanID]; ok { + // TODO maybe return an error if more than 2 spans found + spansByID[span.SpanID] = append(spans, span) + } else { + spansByID[span.SpanID] = []*model.Span{span} + } + } + d.spansByID = spansByID +} + +func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool { + for _, span := range d.spansByID[spanID] { + if span.IsRPCClient() { + return true + } + } + return false +} + +func (d *spanIDDeduper) uniquifyServerSpanIDs() { + oldToNewSpanIDs := make(map[model.SpanID]model.SpanID) + for _, span := range d.trace.Spans { + // only replace span IDs for server-side spans that share the ID with something else + if span.IsRPCServer() && d.isSharedWithClientSpan(span.SpanID) { + newID, err := d.makeUniqueSpanID() + if err != nil { + span.Warnings = append(span.Warnings, err.Error()) + continue + } + oldToNewSpanIDs[span.SpanID] = newID + span.ReplaceParentID(span.SpanID) // previously shared ID is the new parent + span.SpanID = newID + } + } + d.swapParentIDs(oldToNewSpanIDs) +} + +// swapParentIDs corrects ParentSpanID of all spans that are children of the server +// spans whose IDs we made unique. +func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) { + for _, span := range d.trace.Spans { + if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok { + if span.SpanID != parentID { + span.ReplaceParentID(parentID) + } + } + } +} + +// makeUniqueSpanID returns a new ID that is not used in the trace, +// or an error if such ID cannot be generated, which is unlikely, +// given that the whole space of span IDs is 2^64. +func (d *spanIDDeduper) makeUniqueSpanID() (model.SpanID, error) { + for id := d.maxUsedID + 1; id < maxSpanID; id++ { + if _, ok := d.spansByID[id]; !ok { + d.maxUsedID = id + return id, nil + } + } + return 0, errors.New(warningTooManySpans) +} diff --git a/model/adjuster/zipkin_span_id_uniquify_test.go b/model/adjuster/zipkin_span_id_uniquify_test.go new file mode 100644 index 000000000000..1ad0895985fa --- /dev/null +++ b/model/adjuster/zipkin_span_id_uniquify_test.go @@ -0,0 +1,103 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "github.com/jaegertracing/jaeger/model" +) + +var ( + clientSpanID = model.NewSpanID(1) + anotherSpanID = model.NewSpanID(11) + keySpanKind = "span.kind" +) + +func newZipkinTrace() *model.Trace { + traceID := model.NewTraceID(0, 42) + return &model.Trace{ + Spans: []*model.Span{ + { + // client span + TraceID: traceID, + SpanID: clientSpanID, + Tags: model.KeyValues{ + // span.kind = client + model.String(keySpanKind, trace.SpanKindClient.String()), + }, + }, + { + // server span + TraceID: traceID, + SpanID: clientSpanID, // shared span ID + Tags: model.KeyValues{ + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), + }, + }, + { + // some other span, child of server span + TraceID: traceID, + SpanID: anotherSpanID, + References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)}, + }, + }, + } +} + +func TestZipkinSpanIDUniquifierTriggered(t *testing.T) { + trace := newZipkinTrace() + deduper := ZipkinSpanIDUniquifier() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + clientSpan := trace.Spans[0] + assert.Equal(t, clientSpanID, clientSpan.SpanID, "client span ID should not change") + + serverSpan := trace.Spans[1] + assert.Equal(t, clientSpanID+1, serverSpan.SpanID, "server span ID should be reassigned") + assert.Equal(t, clientSpan.SpanID, serverSpan.ParentSpanID(), "client span should be server span's parent") + + thirdSpan := trace.Spans[2] + assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change") + assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") +} + +func TestZipkinSpanIDUniquifierNotTriggered(t *testing.T) { + trace := newZipkinTrace() + trace.Spans = trace.Spans[1:] // remove client span + + deduper := ZipkinSpanIDUniquifier() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + serverSpanID := clientSpanID // for better readability + serverSpan := trace.Spans[0] + assert.Equal(t, serverSpanID, serverSpan.SpanID, "server span ID should be unchanged") + + thirdSpan := trace.Spans[1] + assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change") + assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") +} + +func TestZipkinSpanIDUniquifierError(t *testing.T) { + trace := newZipkinTrace() + + maxID := int64(-1) + assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1") + + deduper := &spanIDDeduper{trace: trace} + deduper.groupSpansByID() + deduper.maxUsedID = maxSpanID - 1 + deduper.uniquifyServerSpanIDs() + if assert.Len(t, trace.Spans[1].Warnings, 1) { + assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0]) + } +} diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index 52d748ff7508..48276aabffb4 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -59,7 +59,7 @@ func newTenant(cfg Configuration) *Tenant { traces: map[model.TraceID]*model.Trace{}, services: map[string]struct{}{}, operations: map[string]map[spanstore.Operation]struct{}{}, - deduper: adjuster.ZipkinSpanIDRenamer(), + deduper: adjuster.ZipkinSpanIDUniquifier(), config: cfg, } } @@ -90,7 +90,7 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback deps := map[string]*model.DependencyLink{} startTs := endTs.Add(-1 * lookback) for _, orig := range m.traces { - // ZipkinSpanIDRenamer never returns an err + // ZipkinSpanIDUniquifier never returns an err trace, _ := m.deduper.Adjust(orig) if traceIsBetweenStartAndEnd(startTs, endTs, trace) { for _, s := range trace.Spans {