Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Deduplicate spans based upon their hashcode #6009

Merged
merged 19 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/query/app/querysvc/adjusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
// before returning the data to the API clients.
func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster {
return []adjuster.Adjuster{
adjuster.SpanIDDeduper(),
adjuster.ZipkinSpanIDUniquifier(),
adjuster.DedupeBySpanHash(),
adjuster.ClockSkew(maxClockSkewAdjust),
adjuster.IPTagAdjuster(),
adjuster.OTelTagAdjuster(),
Expand Down
2 changes: 1 addition & 1 deletion model/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// child spans do not start before or end after their parent spans.
//
// The algorithm assumes that all spans have unique IDs, so the trace may need
// to go through another adjuster first, such as SpanIDDeduper.
// to go through another adjuster first, such as ZipkinSpanIDUniquifier.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
Expand Down
46 changes: 46 additions & 0 deletions model/adjuster/span_hash_deduper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
cdanis marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"github.com/jaegertracing/jaeger/model"
)

// DedupeBySpanHash returns an adjuster that removes all but one span with the same hashcode.
// This is useful for when spans are duplicated in archival storage, as happens with
// ElasticSearch archival.
func DedupeBySpanHash() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanHashDeduper{trace: trace}
deduper.groupSpansByHash()
deduper.dedupeSpansByHash()
return deduper.trace, nil
})
}

type spanHashDeduper struct {
trace *model.Trace
spansByHash map[uint64][]*model.Span
}

func (d *spanHashDeduper) groupSpansByHash() {
spansByHash := make(map[uint64][]*model.Span)
cdanis marked this conversation as resolved.
Show resolved Hide resolved
for _, span := range d.trace.Spans {
hash, _ := model.HashCode(span)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if spans, ok := spansByHash[hash]; ok {
spansByHash[hash] = append(spans, span)
} else {
spansByHash[hash] = []*model.Span{span}
}
}
d.spansByHash = spansByHash
}

func (d *spanHashDeduper) dedupeSpansByHash() {
d.trace.Spans = nil
for _, spans := range d.spansByHash {
d.trace.Spans = append(d.trace.Spans, spans[0])
}
}
129 changes: 129 additions & 0 deletions model/adjuster/span_hash_deduper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
cdanis marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"github.com/jaegertracing/jaeger/model"
)

func newDuplicatedSpansTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: clientSpanID,
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
TraceID: traceID,
SpanID: clientSpanID, // shared span ID
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
// some other span, child of server span
TraceID: traceID,
SpanID: anotherSpanID,
References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)},
},
},
}
}

func newUniqueSpansTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: anotherSpanID,
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
TraceID: traceID,
SpanID: anotherSpanID, // same ID as before, but different metadata
References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)},
},
},
}
}

func getSpanIDs(spans []*model.Span) []int {
ids := make([]int, len(spans))
for i, span := range spans {
ids[i] = int(span.SpanID)
}
return ids
}

func TestDedupeBySpanHashTriggers(t *testing.T) {
trace := newDuplicatedSpansTrace()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, 2, "should dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs")
}

func TestDedupeBySpanHashNotTriggered(t *testing.T) {
trace := newUniqueSpansTrace()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, 2, "should not dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{int(anotherSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs")
assert.NotEqual(t, trace.Spans[0], trace.Spans[1], "should keep unique hashcodes")
}

func TestDedupeBySpanHashEmpty(t *testing.T) {
trace := &model.Trace{}
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Empty(t, trace.Spans, "should be empty")
}

func TestDedupeBySpanHashManyManySpans(t *testing.T) {
traceID := model.NewTraceID(0, 42)
spans := make([]*model.Span, 0, 100)
const distinctSpanIDs = 10
for i := 0; i < 100; i++ {
spans = append(spans, &model.Span{
TraceID: traceID,
SpanID: model.SpanID(i % distinctSpanIDs),
})
}
trace := &model.Trace{Spans: spans}
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, distinctSpanIDs, "should dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, ids, "should keep unique span IDs")
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ import (
"github.com/jaegertracing/jaeger/model"
)

// SpanIDDeduper returns an adjuster that changes span ids for server
// ZipkinSpanIDUniquifier returns an adjuster that changes span ids for server
// spans (i.e. spans with tag: span.kind == server) if there is another
// client span that shares the same span ID. This is needed to deal with
// Zipkin-style clients that reuse the same span ID for both client and server
// side of an RPC call. Jaeger UI expects all spans to have unique IDs.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
func SpanIDDeduper() Adjuster {
func ZipkinSpanIDUniquifier() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.dedupeSpanIDs()
deduper.uniquifyServerSpanIDs()
return deduper.trace, nil
})
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool {
return false
}

func (d *spanIDDeduper) dedupeSpanIDs() {
func (d *spanIDDeduper) uniquifyServerSpanIDs() {
oldToNewSpanIDs := make(map[model.SpanID]model.SpanID)
for _, span := range d.trace.Spans {
// only replace span IDs for server-side spans that share the ID with something else
Expand All @@ -82,7 +82,7 @@ func (d *spanIDDeduper) dedupeSpanIDs() {
}

// swapParentIDs corrects ParentSpanID of all spans that are children of the server
// spans whose IDs we deduped.
// spans whose IDs we made unique.
func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) {
for _, span := range d.trace.Spans {
if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
keySpanKind = "span.kind"
)

func newTrace() *model.Trace {
func newZipkinTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
Expand Down Expand Up @@ -52,9 +52,9 @@ func newTrace() *model.Trace {
}
}

func TestSpanIDDeduperTriggered(t *testing.T) {
trace := newTrace()
deduper := SpanIDDeduper()
func TestZipkinSpanIDUniquifierTriggered(t *testing.T) {
trace := newZipkinTrace()
deduper := ZipkinSpanIDUniquifier()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

Expand All @@ -70,11 +70,11 @@ func TestSpanIDDeduperTriggered(t *testing.T) {
assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent")
}

func TestSpanIDDeduperNotTriggered(t *testing.T) {
trace := newTrace()
func TestZipkinSpanIDUniquifierNotTriggered(t *testing.T) {
trace := newZipkinTrace()
trace.Spans = trace.Spans[1:] // remove client span

deduper := SpanIDDeduper()
deduper := ZipkinSpanIDUniquifier()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

Expand All @@ -87,16 +87,16 @@ func TestSpanIDDeduperNotTriggered(t *testing.T) {
assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent")
}

func TestSpanIDDeduperError(t *testing.T) {
trace := newTrace()
func TestZipkinSpanIDUniquifierError(t *testing.T) {
trace := newZipkinTrace()

maxID := int64(-1)
assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1")

deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.maxUsedID = maxSpanID - 1
deduper.dedupeSpanIDs()
deduper.uniquifyServerSpanIDs()
if assert.Len(t, trace.Spans[1].Warnings, 1) {
assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0])
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newTenant(cfg Configuration) *Tenant {
traces: map[model.TraceID]*model.Trace{},
services: map[string]struct{}{},
operations: map[string]map[spanstore.Operation]struct{}{},
deduper: adjuster.SpanIDDeduper(),
deduper: adjuster.ZipkinSpanIDUniquifier(),
config: cfg,
}
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback
deps := map[string]*model.DependencyLink{}
startTs := endTs.Add(-1 * lookback)
for _, orig := range m.traces {
// SpanIDDeduper never returns an err
// ZipkinSpanIDUniquifier never returns an err
trace, _ := m.deduper.Adjust(orig)
if traceIsBetweenStartAndEnd(startTs, endTs, trace) {
for _, s := range trace.Spans {
Expand Down