Skip to content

Commit

Permalink
feat: extract decision span from full span (#1338)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

Extract only necessary information that's needed for trace decision from
a full span so that we can forward only the key fields to peers later
part of #1318 

## Short description of the changes

- add a method on `Span` to extract only necessary information into a
new types.Event
- add a method to differentiate a full span from a decision span
- store `IsRoot` information on `Span`
- simplify naming for span annotation types
  • Loading branch information
VinozzZ authored Sep 30, 2024
1 parent cd17f63 commit 1d70862
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 96 deletions.
24 changes: 4 additions & 20 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
}

// if this is a root span, say so and send the trace
if i.isRootSpan(sp) {
if sp.IsRoot {
markTraceForSending = true
trace.RootSpan = sp
}
Expand Down Expand Up @@ -671,8 +671,7 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe
i.Logger.Debug().WithField("trace_id", sp.TraceID).Logf("Sending span because of previous decision to send trace")
mergeTraceAndSpanSampleRates(sp, tr.Rate(), isDryRun)
// if this span is a late root span, possibly update it with our current span count
isRootSpan := i.isRootSpan(sp)
if isRootSpan {
if sp.IsRoot {
if i.Config.GetAddCountsToRoot() {
sp.Data["meta.span_event_count"] = int64(tr.SpanEventCount())
sp.Data["meta.span_link_count"] = int64(tr.SpanLinkCount())
Expand All @@ -682,7 +681,7 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe
sp.Data["meta.span_count"] = int64(tr.DescendantCount())
}
}
otelutil.AddSpanField(span, "is_root_span", isRootSpan)
otelutil.AddSpanField(span, "is_root_span", sp.IsRoot)
i.Metrics.Increment(TraceSendLateSpan)
i.addAdditionalAttributes(sp)
i.Transmission.EnqueueSpan(sp)
Expand Down Expand Up @@ -718,21 +717,6 @@ func mergeTraceAndSpanSampleRates(sp *types.Span, traceSampleRate uint, dryRunMo
}
}

func (i *InMemCollector) isRootSpan(sp *types.Span) bool {
// log event should never be considered a root span, check for that first
if signalType := sp.Data["meta.signal_type"]; signalType == "log" {
return false
}
// check if the event has a parent id using the configured parent id field names
for _, parentIdFieldName := range i.Config.GetParentIdFieldNames() {
parentId := sp.Data[parentIdFieldName]
if _, ok := parentId.(string); ok && parentId != "" {
return false
}
}
return true
}

func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
if trace.Sent {
// someone else already sent this so we shouldn't also send it.
Expand Down Expand Up @@ -828,7 +812,7 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {

// update the root span (if we have one, which we might not if the trace timed out)
// with the final total as of our send time
if i.isRootSpan(sp) {
if sp.IsRoot {
if i.Config.GetAddCountsToRoot() {
sp.Data["meta.span_event_count"] = int64(trace.SpanEventCount())
sp.Data["meta.span_link_count"] = int64(trace.SpanLinkCount())
Expand Down
86 changes: 20 additions & 66 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func TestAddRootSpan(t *testing.T) {
Dataset: "aoeu",
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand All @@ -141,6 +142,7 @@ func TestAddRootSpan(t *testing.T) {
Dataset: "aoeu",
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpanFromPeer(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -233,6 +235,7 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) {
SampleRate: 0, // no upstream sampling
Data: make(map[string]interface{}),
},
IsRoot: true,
})
require.NoError(t, err, "must be able to add the span")

Expand Down Expand Up @@ -293,6 +296,7 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) {
SampleRate: 0, // This should get lifted to 1
Data: make(map[string]interface{}),
},
IsRoot: true,
}

coll.AddSpan(span)
Expand Down Expand Up @@ -378,6 +382,7 @@ func TestAddSpan(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 5)
Expand Down Expand Up @@ -452,6 +457,7 @@ func TestDryRunMode(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -488,6 +494,7 @@ func TestDryRunMode(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpanFromPeer(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand All @@ -511,6 +518,7 @@ func TestDryRunMode(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -642,6 +650,7 @@ func TestSampleConfigReload(t *testing.T) {
Dataset: dataset,
APIKey: legacyAPIKey,
},
IsRoot: true,
}

coll.AddSpan(span)
Expand Down Expand Up @@ -669,6 +678,7 @@ func TestSampleConfigReload(t *testing.T) {
Dataset: dataset,
APIKey: legacyAPIKey,
},
IsRoot: true,
}

coll.AddSpan(span)
Expand Down Expand Up @@ -930,6 +940,7 @@ func TestAddCountsToRoot(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1021,6 +1032,7 @@ func TestLateRootGetsCounts(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1101,6 +1113,7 @@ func TestAddSpanCount(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1176,6 +1189,7 @@ func TestLateRootGetsSpanCount(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1247,6 +1261,7 @@ func TestLateSpanNotDecorated(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)

Expand Down Expand Up @@ -1317,6 +1332,7 @@ func TestAddAdditionalAttributes(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 5)
Expand Down Expand Up @@ -1393,6 +1409,7 @@ func TestStressReliefSampleRate(t *testing.T) {
APIKey: legacyAPIKey,
SampleRate: 10,
},
IsRoot: true,
}

processed2, kept2 := coll.ProcessSpanImmediately(rootSpan)
Expand Down Expand Up @@ -1473,6 +1490,7 @@ func TestStressReliefDecorateHostname(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1591,6 +1609,7 @@ func TestSpanWithRuleReasons(t *testing.T) {
},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
if i == 0 {
rootSpan.Data["test"] = int64(1)
Expand Down Expand Up @@ -1624,72 +1643,6 @@ func TestSpanWithRuleReasons(t *testing.T) {
transmission.Mux.RUnlock()
}

func TestIsRootSpan(t *testing.T) {
tesCases := []struct {
name string
span *types.Span
expected bool
}{
{
name: "root span - no parent id",
span: &types.Span{
Event: types.Event{
Data: map[string]interface{}{},
},
},
expected: true,
},
{
name: "root span - empty parent id",
span: &types.Span{
Event: types.Event{
Data: map[string]interface{}{
"trace.parent_id": "",
},
},
},
expected: true,
},
{
name: "non-root span - parent id",
span: &types.Span{
Event: types.Event{
Data: map[string]interface{}{
"trace.parent_id": "some-id",
},
},
},
expected: false,
},
{
name: "non-root span - no parent id but has signal_type of log",
span: &types.Span{
Event: types.Event{
Data: map[string]interface{}{
"meta.signal_type": "log",
},
},
},
expected: false,
},
}

collector := &InMemCollector{
Config: &config.MockConfig{
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetCollectionConfigVal: config.CollectionConfig{
ShutdownDelay: config.Duration(1 * time.Millisecond),
},
},
}

for _, tc := range tesCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expected, collector.isRootSpan(tc.span))
})
}
}

func TestRedistributeTraces(t *testing.T) {
conf := &config.MockConfig{
GetTracesConfigVal: config.TracesConfig{
Expand Down Expand Up @@ -1942,6 +1895,7 @@ func TestBigTracesGoEarly(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)

Expand Down
53 changes: 45 additions & 8 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,14 +562,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
return nil
}

// extract trace ID
var traceID string
for _, traceIdFieldName := range r.Config.GetTraceIdFieldNames() {
if trID, ok := ev.Data[traceIdFieldName]; ok {
traceID = trID.(string)
break
}
}
traceID := extractTraceID(r.Config.GetTraceIdFieldNames(), ev)
if traceID == "" {
// not part of a trace. send along upstream
r.Metrics.Increment(r.incomingOrPeer + "_router_nonspan")
Expand All @@ -584,6 +577,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
span := &types.Span{
Event: *ev,
TraceID: traceID,
IsRoot: isRootSpan(ev, r.Config),
}

// we know we're a span, but we need to check if we're in Stress Relief mode;
Expand All @@ -606,6 +600,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
}
}

// TODO: only do this if the span proxy feature is disabled
// Figure out if we should handle this span locally or pass on to a peer
targetShard := r.Sharder.WhichShard(traceID)
if !targetShard.Equals(r.Sharder.MyShard()) {
Expand All @@ -614,6 +609,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
WithString("peer", targetShard.GetAddress()).
WithField("isprobe", isProbe).
Logf("Sending span from batch to peer")

ev.APIHost = targetShard.GetAddress()

// Unfortunately this doesn't tell us if the event was actually
Expand All @@ -634,6 +630,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
if r.incomingOrPeer == "incoming" {
err = r.Collector.AddSpan(span)
} else {
// TODO: again, only do this if span proxy is disabled
err = r.Collector.AddSpanFromPeer(span)
}
if err != nil {
Expand Down Expand Up @@ -1015,3 +1012,43 @@ func getDatasetFromRequest(req *http.Request) (string, error) {
}
return dataset, nil
}

func isRootSpan(ev *types.Event, cfg config.Config) bool {
// log event should never be considered a root span, check for that first
if signalType := ev.Data["meta.signal_type"]; signalType == "log" {
return false
}

// check if the event has a root flag
if isRoot, ok := ev.Data["meta.refinery.root"]; ok {
v, ok := isRoot.(bool)
if !ok {
return false
}

return v
}

// check if the event has a parent id using the configured parent id field names
for _, parentIdFieldName := range cfg.GetParentIdFieldNames() {
parentId := ev.Data[parentIdFieldName]
if _, ok := parentId.(string); ok && parentId != "" {
return false
}
}
return true
}

func extractTraceID(traceIdFieldNames []string, ev *types.Event) string {
if trID, ok := ev.Data["trace_id"]; ok {
return trID.(string)
}

for _, traceIdFieldName := range traceIdFieldNames {
if trID, ok := ev.Data[traceIdFieldName]; ok {
return trID.(string)
}
}

return ""
}
Loading

0 comments on commit 1d70862

Please sign in to comment.