Skip to content

Commit

Permalink
tweak zipkin adjuster name & move to new file
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Danis <cdanis@wikimedia.org>
  • Loading branch information
cdanis committed Sep 23, 2024
1 parent d92fa27 commit fd74590
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 160 deletions.
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// before returning the data to the API clients.
func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster {
return []adjuster.Adjuster{
adjuster.ZipkinSpanIDRenamer(),
adjuster.ZipkinSpanIDUniquifier(),
adjuster.DedupeBySpanID(),
adjuster.ClockSkew(maxClockSkewAdjust),
adjuster.IPTagAdjuster(),
Expand Down
2 changes: 1 addition & 1 deletion model/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// child spans do not start before or end after their parent spans.
//
// The algorithm assumes that all spans have unique IDs, so the trace may need
// to go through another adjuster first, such as ZipkinSpanIDRenamer.
// to go through another adjuster first, such as ZipkinSpanIDUniquifier.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
Expand Down
98 changes: 0 additions & 98 deletions model/adjuster/span_id_deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,9 @@
package adjuster

import (
"errors"
"math"

"github.com/jaegertracing/jaeger/model"
)

// ZipkinSpanIDRenamer returns an adjuster that changes span ids for server
// spans (i.e. spans with tag: span.kind == server) if there is another
// client span that shares the same span ID. This is needed to deal with
// Zipkin-style clients that reuse the same span ID for both client and server
// side of an RPC call. Jaeger UI expects all spans to have unique IDs.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
func ZipkinSpanIDRenamer() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.uniquifyServerSpanIDs()
return deduper.trace, nil
})
}

// DedupeBySpanID returns an adjuster that removes all but one span with the same SpanID.
// This is useful for when spans are duplicated in archival storage, as happens with
// ElasticSearch archival.
Expand All @@ -40,87 +20,9 @@ func DedupeBySpanID() Adjuster {
})
}

const (
warningTooManySpans = "cannot assign unique span ID, too many spans in the trace"
)

var maxSpanID = model.NewSpanID(math.MaxUint64)

type spanIDDeduper struct {
trace *model.Trace
spansByID map[model.SpanID][]*model.Span
maxUsedID model.SpanID
}

// groupSpansByID groups spans with the same ID returning a map id -> []Span
func (d *spanIDDeduper) groupSpansByID() {
spansByID := make(map[model.SpanID][]*model.Span)
for _, span := range d.trace.Spans {
if spans, ok := spansByID[span.SpanID]; ok {
// TODO maybe return an error if more than 2 spans found
spansByID[span.SpanID] = append(spans, span)
} else {
spansByID[span.SpanID] = []*model.Span{span}
}
}
d.spansByID = spansByID
}

func (d *spanIDDeduper) dedupeSpansByID() {
d.trace.Spans = nil
for _, spans := range d.spansByID {
d.trace.Spans = append(d.trace.Spans, spans[0])
}
}

func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool {
for _, span := range d.spansByID[spanID] {
if span.IsRPCClient() {
return true
}
}
return false
}

func (d *spanIDDeduper) uniquifyServerSpanIDs() {
oldToNewSpanIDs := make(map[model.SpanID]model.SpanID)
for _, span := range d.trace.Spans {
// only replace span IDs for server-side spans that share the ID with something else
if span.IsRPCServer() && d.isSharedWithClientSpan(span.SpanID) {
newID, err := d.makeUniqueSpanID()
if err != nil {
span.Warnings = append(span.Warnings, err.Error())
continue
}
oldToNewSpanIDs[span.SpanID] = newID
span.ReplaceParentID(span.SpanID) // previously shared ID is the new parent
span.SpanID = newID
}
}
d.swapParentIDs(oldToNewSpanIDs)
}

// swapParentIDs corrects ParentSpanID of all spans that are children of the server
// spans whose IDs we made unique.
func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) {
for _, span := range d.trace.Spans {
if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok {
if span.SpanID != parentID {
span.ReplaceParentID(parentID)
}
}
}
}

// makeUniqueSpanID returns a new ID that is not used in the trace,
// or an error if such ID cannot be generated, which is unlikely,
// given that the whole space of span IDs is 2^64.
func (d *spanIDDeduper) makeUniqueSpanID() (model.SpanID, error) {
for id := d.maxUsedID + 1; id < maxSpanID; id++ {
if _, ok := d.spansByID[id]; !ok {
d.maxUsedID = id
return id, nil
}
}
return 0, errors.New(warningTooManySpans)
}
60 changes: 2 additions & 58 deletions model/adjuster/span_id_deduper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@ import (
"github.com/jaegertracing/jaeger/model"
)

var (
clientSpanID = model.NewSpanID(1)
anotherSpanID = model.NewSpanID(11)
keySpanKind = "span.kind"
)

func newTrace() *model.Trace {
func newDuplicatedSpansTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
Expand Down Expand Up @@ -52,58 +46,8 @@ func newTrace() *model.Trace {
}
}

func TestZipkinSpanIDRenamerTriggered(t *testing.T) {
trace := newTrace()
deduper := ZipkinSpanIDRenamer()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

clientSpan := trace.Spans[0]
assert.Equal(t, clientSpanID, clientSpan.SpanID, "client span ID should not change")

serverSpan := trace.Spans[1]
assert.Equal(t, clientSpanID+1, serverSpan.SpanID, "server span ID should be reassigned")
assert.Equal(t, clientSpan.SpanID, serverSpan.ParentSpanID(), "client span should be server span's parent")

thirdSpan := trace.Spans[2]
assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change")
assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent")
}

func TestZipkinSpanIDRenamerNotTriggered(t *testing.T) {
trace := newTrace()
trace.Spans = trace.Spans[1:] // remove client span

deduper := ZipkinSpanIDRenamer()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

serverSpanID := clientSpanID // for better readability
serverSpan := trace.Spans[0]
assert.Equal(t, serverSpanID, serverSpan.SpanID, "server span ID should be unchanged")

thirdSpan := trace.Spans[1]
assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change")
assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent")
}

func TestZipkinSpanIDRenamerError(t *testing.T) {
trace := newTrace()

maxID := int64(-1)
assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1")

deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.maxUsedID = maxSpanID - 1
deduper.uniquifyServerSpanIDs()
if assert.Len(t, trace.Spans[1].Warnings, 1) {
assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0])
}
}

func TestDedupeBySpanID(t *testing.T) {
trace := newTrace()
trace := newZipkinTrace()
deduper := DedupeBySpanID()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)
Expand Down
107 changes: 107 additions & 0 deletions model/adjuster/zipkin_span_id_uniquify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"errors"
"math"

"github.com/jaegertracing/jaeger/model"
)

// ZipkinSpanIDUniquifier returns an adjuster that changes span ids for server
// spans (i.e. spans with tag: span.kind == server) if there is another
// client span that shares the same span ID. This is needed to deal with
// Zipkin-style clients that reuse the same span ID for both client and server
// side of an RPC call. Jaeger UI expects all spans to have unique IDs.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
func ZipkinSpanIDUniquifier() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.uniquifyServerSpanIDs()
return deduper.trace, nil
})
}

const (
warningTooManySpans = "cannot assign unique span ID, too many spans in the trace"
)

var maxSpanID = model.NewSpanID(math.MaxUint64)

type spanIDDeduper struct {
trace *model.Trace
spansByID map[model.SpanID][]*model.Span
maxUsedID model.SpanID
}

// groupSpansByID groups spans with the same ID returning a map id -> []Span
func (d *spanIDDeduper) groupSpansByID() {
spansByID := make(map[model.SpanID][]*model.Span)
for _, span := range d.trace.Spans {
if spans, ok := spansByID[span.SpanID]; ok {
// TODO maybe return an error if more than 2 spans found
spansByID[span.SpanID] = append(spans, span)
} else {
spansByID[span.SpanID] = []*model.Span{span}
}
}
d.spansByID = spansByID
}

func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool {
for _, span := range d.spansByID[spanID] {
if span.IsRPCClient() {
return true
}
}
return false
}

func (d *spanIDDeduper) uniquifyServerSpanIDs() {
oldToNewSpanIDs := make(map[model.SpanID]model.SpanID)
for _, span := range d.trace.Spans {
// only replace span IDs for server-side spans that share the ID with something else
if span.IsRPCServer() && d.isSharedWithClientSpan(span.SpanID) {
newID, err := d.makeUniqueSpanID()
if err != nil {
span.Warnings = append(span.Warnings, err.Error())
continue
}
oldToNewSpanIDs[span.SpanID] = newID
span.ReplaceParentID(span.SpanID) // previously shared ID is the new parent
span.SpanID = newID
}
}
d.swapParentIDs(oldToNewSpanIDs)
}

// swapParentIDs corrects ParentSpanID of all spans that are children of the server
// spans whose IDs we made unique.
func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) {
for _, span := range d.trace.Spans {
if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok {
if span.SpanID != parentID {
span.ReplaceParentID(parentID)
}
}
}
}

// makeUniqueSpanID returns a new ID that is not used in the trace,
// or an error if such ID cannot be generated, which is unlikely,
// given that the whole space of span IDs is 2^64.
func (d *spanIDDeduper) makeUniqueSpanID() (model.SpanID, error) {
for id := d.maxUsedID + 1; id < maxSpanID; id++ {
if _, ok := d.spansByID[id]; !ok {
d.maxUsedID = id
return id, nil
}
}
return 0, errors.New(warningTooManySpans)
}
Loading

0 comments on commit fd74590

Please sign in to comment.