Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Deduplicate spans based upon their hashcode #6009

Merged
merged 19 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/query/app/querysvc/adjusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
// before returning the data to the API clients.
func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster {
return []adjuster.Adjuster{
adjuster.SpanIDDeduper(),
adjuster.ZipkinSpanIDUniquifier(),
adjuster.ClockSkew(maxClockSkewAdjust),
adjuster.IPTagAdjuster(),
adjuster.OTelTagAdjuster(),
adjuster.SortLogFields(),
adjuster.SortTagsAndLogFields(),
adjuster.DedupeBySpanHash(), // requires SortTagsAndLogFields for deterministic results
adjuster.SpanReferences(),
adjuster.ParentReference(),
}
Expand Down
2 changes: 1 addition & 1 deletion model/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// child spans do not start before or end after their parent spans.
//
// The algorithm assumes that all spans have unique IDs, so the trace may need
// to go through another adjuster first, such as SpanIDDeduper.
// to go through another adjuster first, such as ZipkinSpanIDUniquifier.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@ import (
"github.com/jaegertracing/jaeger/model"
)

// SortLogFields returns an Adjuster that sorts the fields in the span logs.
// It puts the `event` field in the first position (if present), and sorts
// all other fields lexicographically.
// SortTagsAndLogFields returns an Adjuster that sorts the fields in the tags and
// span logs.
//
// For span logs, it puts the `event` field in the first position (if present), and
// sorts all other fields lexicographically.
//
// TODO: it should also do something about the "msg" field, maybe replace it
// with "event" field.
// TODO: we may also want to move "level" field (as in logging level) to an earlier
// place in the list. This adjuster needs some sort of config describing predefined
// field names/types and their relative order.
func SortLogFields() Adjuster {
func SortTagsAndLogFields() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
for _, span := range trace.Spans {
model.KeyValues(span.Tags).Sort()
cdanis marked this conversation as resolved.
Show resolved Hide resolved

for _, log := range span.Logs {
// first move "event" field into the first position
offset := 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/jaegertracing/jaeger/model"
)

func TestSortLogFields(t *testing.T) {
func TestSortTagsAndLogFieldsDoesSortFields(t *testing.T) {
testCases := []struct {
fields model.KeyValues
expected model.KeyValues
Expand Down Expand Up @@ -79,8 +79,42 @@ func TestSortLogFields(t *testing.T) {
},
},
}
trace, err := SortLogFields().Adjust(trace)
trace, err := SortTagsAndLogFields().Adjust(trace)
require.NoError(t, err)
assert.Equal(t, testCase.expected, model.KeyValues(trace.Spans[0].Logs[0].Fields))
}
}

func TestSortTagsAndLogFieldsDoesSortTags(t *testing.T) {
testCases := []model.KeyValues{
{
model.String("http.method", "GET"),
model.String("http.url", "http://wikipedia.org"),
model.Int64("http.status_code", 200),
model.String("guid:x-request-id", "f61defd2-7a77-11ef-b54f-4fbb67a6d181"),
},
{
// empty
},
{
model.String("argv", "mkfs.ext4 /dev/sda1"),
},
}
for _, tags := range testCases {
trace := &model.Trace{
Spans: []*model.Span{
{
Tags: tags,
},
},
}
sorted, err := SortTagsAndLogFields().Adjust(trace)
require.NoError(t, err)
assert.ElementsMatch(t, tags, sorted.Spans[0].Tags)
adjustedKeys := make([]string, len(sorted.Spans[0].Tags))
for i, kv := range sorted.Spans[0].Tags {
adjustedKeys[i] = kv.Key
}
assert.IsIncreasing(t, adjustedKeys)
}
}
46 changes: 46 additions & 0 deletions model/adjuster/span_hash_deduper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
cdanis marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"github.com/jaegertracing/jaeger/model"
)

// DedupeBySpanHash returns an adjuster that removes all but one span with the same hashcode.
// This is useful for when spans are duplicated in archival storage, as happens with
// ElasticSearch archival.
func DedupeBySpanHash() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanHashDeduper{trace: trace}
deduper.groupSpansByHash()
deduper.dedupeSpansByHash()
return deduper.trace, nil
})
}

type spanHashDeduper struct {
trace *model.Trace
spansByHash map[uint64][]*model.Span
}

func (d *spanHashDeduper) groupSpansByHash() {
spansByHash := make(map[uint64][]*model.Span)
cdanis marked this conversation as resolved.
Show resolved Hide resolved
for _, span := range d.trace.Spans {
hash, _ := model.HashCode(span)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if spans, ok := spansByHash[hash]; ok {
spansByHash[hash] = append(spans, span)
} else {
spansByHash[hash] = []*model.Span{span}
}
}
d.spansByHash = spansByHash
}

func (d *spanHashDeduper) dedupeSpansByHash() {
d.trace.Spans = nil
for _, spans := range d.spansByHash {
d.trace.Spans = append(d.trace.Spans, spans[0])
}
}
129 changes: 129 additions & 0 deletions model/adjuster/span_hash_deduper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
cdanis marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"github.com/jaegertracing/jaeger/model"
)

func newDuplicatedSpansTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: clientSpanID,
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
TraceID: traceID,
SpanID: clientSpanID, // shared span ID
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
// some other span, child of server span
TraceID: traceID,
SpanID: anotherSpanID,
References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)},
},
},
}
}

func newUniqueSpansTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: anotherSpanID,
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
TraceID: traceID,
SpanID: anotherSpanID, // same ID as before, but different metadata
References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)},
},
},
}
}

func getSpanIDs(spans []*model.Span) []int {
ids := make([]int, len(spans))
for i, span := range spans {
ids[i] = int(span.SpanID)
}
return ids
}

func TestDedupeBySpanHashTriggers(t *testing.T) {
trace := newDuplicatedSpansTrace()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, 2, "should dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs")
}

func TestDedupeBySpanHashNotTriggered(t *testing.T) {
trace := newUniqueSpansTrace()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, 2, "should not dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{int(anotherSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs")
assert.NotEqual(t, trace.Spans[0], trace.Spans[1], "should keep unique hashcodes")
}

func TestDedupeBySpanHashEmpty(t *testing.T) {
trace := &model.Trace{}
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Empty(t, trace.Spans, "should be empty")
}

func TestDedupeBySpanHashManyManySpans(t *testing.T) {
traceID := model.NewTraceID(0, 42)
spans := make([]*model.Span, 0, 100)
const distinctSpanIDs = 10
for i := 0; i < 100; i++ {
spans = append(spans, &model.Span{
TraceID: traceID,
SpanID: model.SpanID(i % distinctSpanIDs),
})
}
trace := &model.Trace{Spans: spans}
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, distinctSpanIDs, "should dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, ids, "should keep unique span IDs")
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ import (
"github.com/jaegertracing/jaeger/model"
)

// SpanIDDeduper returns an adjuster that changes span ids for server
// ZipkinSpanIDUniquifier returns an adjuster that changes span ids for server
// spans (i.e. spans with tag: span.kind == server) if there is another
// client span that shares the same span ID. This is needed to deal with
// Zipkin-style clients that reuse the same span ID for both client and server
// side of an RPC call. Jaeger UI expects all spans to have unique IDs.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
func SpanIDDeduper() Adjuster {
func ZipkinSpanIDUniquifier() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.dedupeSpanIDs()
deduper.uniquifyServerSpanIDs()
return deduper.trace, nil
})
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool {
return false
}

func (d *spanIDDeduper) dedupeSpanIDs() {
func (d *spanIDDeduper) uniquifyServerSpanIDs() {
oldToNewSpanIDs := make(map[model.SpanID]model.SpanID)
for _, span := range d.trace.Spans {
// only replace span IDs for server-side spans that share the ID with something else
Expand All @@ -82,7 +82,7 @@ func (d *spanIDDeduper) dedupeSpanIDs() {
}

// swapParentIDs corrects ParentSpanID of all spans that are children of the server
// spans whose IDs we deduped.
// spans whose IDs we made unique.
func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) {
for _, span := range d.trace.Spans {
if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
keySpanKind = "span.kind"
)

func newTrace() *model.Trace {
func newZipkinTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
Expand Down Expand Up @@ -52,9 +52,9 @@ func newTrace() *model.Trace {
}
}

func TestSpanIDDeduperTriggered(t *testing.T) {
trace := newTrace()
deduper := SpanIDDeduper()
func TestZipkinSpanIDUniquifierTriggered(t *testing.T) {
trace := newZipkinTrace()
deduper := ZipkinSpanIDUniquifier()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

Expand All @@ -70,11 +70,11 @@ func TestSpanIDDeduperTriggered(t *testing.T) {
assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent")
}

func TestSpanIDDeduperNotTriggered(t *testing.T) {
trace := newTrace()
func TestZipkinSpanIDUniquifierNotTriggered(t *testing.T) {
trace := newZipkinTrace()
trace.Spans = trace.Spans[1:] // remove client span

deduper := SpanIDDeduper()
deduper := ZipkinSpanIDUniquifier()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

Expand All @@ -87,16 +87,16 @@ func TestSpanIDDeduperNotTriggered(t *testing.T) {
assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent")
}

func TestSpanIDDeduperError(t *testing.T) {
trace := newTrace()
func TestZipkinSpanIDUniquifierError(t *testing.T) {
trace := newZipkinTrace()

maxID := int64(-1)
assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1")

deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.maxUsedID = maxSpanID - 1
deduper.dedupeSpanIDs()
deduper.uniquifyServerSpanIDs()
if assert.Len(t, trace.Spans[1].Warnings, 1) {
assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0])
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newTenant(cfg Configuration) *Tenant {
traces: map[model.TraceID]*model.Trace{},
services: map[string]struct{}{},
operations: map[string]map[spanstore.Operation]struct{}{},
deduper: adjuster.SpanIDDeduper(),
deduper: adjuster.ZipkinSpanIDUniquifier(),
config: cfg,
}
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback
deps := map[string]*model.DependencyLink{}
startTs := endTs.Add(-1 * lookback)
for _, orig := range m.traces {
// SpanIDDeduper never returns an err
// ZipkinSpanIDUniquifier never returns an err
trace, _ := m.deduper.Adjust(orig)
if traceIsBetweenStartAndEnd(startTs, endTs, trace) {
for _, s := range trace.Spans {
Expand Down
Loading