Skip to content

Commit

Permalink
feat: send kept trace decision in a separate goroutine (#1412)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

To reduce ingest latency, moving the work for publishing kept trace
decisions into a separate goroutine so it doesn't block ingest incoming
data.

This code has been running in kibble for a few days and it's copied from
the debug branch #1408

## Short description of the changes

- create a buffer for kept trace decision
- make publishing kept trace decision non-blocking in the `collect` loop
  • Loading branch information
VinozzZ authored Nov 7, 2024
1 parent 96f5b92 commit fe442a3
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 78 deletions.
142 changes: 64 additions & 78 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ type InMemCollector struct {
dropDecisionMessages chan string
keptDecisionMessages chan string

dropDecisionBatch chan string
hostname string
dropDecisionBatch chan string
keptDecisionBuffer chan string
hostname string
}

var inMemCollectorMetrics = []metrics.Metadata{
Expand Down Expand Up @@ -156,6 +157,10 @@ var inMemCollectorMetrics = []metrics.Metadata{
{Name: "collector_drop_decision_batch_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of drop decisions sent in a batch"},
{Name: "collector_expired_traces_missing_decisions", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of decision spans forwarded for expired traces missing trace decision"},
{Name: "collector_expired_traces_orphans", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of expired traces missing trace decision when they are sent"},
{Name: "kept_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept decision message received"},
{Name: "drop_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of drop decision message received"},
{Name: "collector_kept_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times kept trace decision queue is full"},
{Name: "collector_drop_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times drop trace decision queue is full"},
}

func (i *InMemCollector) Start() error {
Expand Down Expand Up @@ -208,14 +213,16 @@ func (i *InMemCollector) Start() error {
i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions)
i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions)

i.dropDecisionBatch = make(chan string, 1000)
i.dropDecisionBatch = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*5)
i.keptDecisionBuffer = make(chan string, 100_000)
}

// spin up one collector because this is a single threaded collector
go i.collect()
go i.sendTraces()
// spin up a drop decision batch sender
go i.sendDropDecisions()
go i.sendKeptDecisions()

return nil
}
Expand Down Expand Up @@ -399,13 +406,6 @@ func (i *InMemCollector) collect() {
return
case <-i.redistributeTimer.Notify():
i.redistributeTraces(ctx)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
return
}

i.processKeptDecision(msg)
case sp, ok := <-i.fromPeer:
if !ok {
// channel's been closed; we should shut down.
Expand Down Expand Up @@ -841,54 +841,18 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe
// if we receive a proxy span after a trace decision has been made,
// we should just broadcast the decision again
if sp.IsDecisionSpan() {
var (
msg string
err error
)
topic := keptTraceDecisionTopic
if tr.Kept() {
// late span in this case won't get HasRoot
// this means the late span won't be decorated with some metadata
// like span count, event count, link count
msg, err = newKeptDecisionMessage(TraceDecision{
TraceID: sp.TraceID,
Kept: tr.Kept(),
KeptReason: keptReason,
SendReason: TraceSendLateSpan,
SampleRate: tr.Rate(),
Count: uint32(tr.SpanCount()),
EventCount: uint32(tr.SpanEventCount()),
LinkCount: uint32(tr.SpanLinkCount()),
})
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": sp.TraceID,
"kept": tr.Kept(),
"late_span": true,
}).Logf("Failed to create new kept decision message")
return
}
} else {
topic = droppedTraceDecisionTopic
msg, err = newDroppedDecisionMessage(sp.TraceID)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": sp.TraceID,
"kept": tr.Kept(),
"late_span": true,
}).Logf("Failed to create new dropped decision message")
return
}
}

err = i.PubSub.Publish(ctx, topic, msg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": sp.TraceID,
"kept": tr.Kept(),
"late_span": true,
}).Logf("Failed to publish trace decision")
// late span in this case won't get HasRoot
td := TraceDecision{
TraceID: sp.TraceID,
Kept: tr.Kept(),
KeptReason: keptReason,
SendReason: TraceSendLateSpan,
SampleRate: tr.Rate(),
Count: uint32(tr.SpanCount()),
EventCount: uint32(tr.SpanEventCount()),
LinkCount: uint32(tr.SpanLinkCount()),
}
i.publishTraceDecision(ctx, td)
return
}

Expand Down Expand Up @@ -1077,6 +1041,7 @@ func (i *InMemCollector) Stop() error {

if !i.Config.GetCollectionConfig().EnableTraceLocality {
close(i.dropDecisionBatch)
close(i.keptDecisionBuffer)
}

return nil
Expand Down Expand Up @@ -1368,6 +1333,8 @@ func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg st
}

func (i *InMemCollector) processDropDecisions(msg string) {
i.Metrics.Increment("drop_decisions_received")

ids := newDroppedTraceDecision(msg)

if len(ids) == 0 {
Expand All @@ -1393,6 +1360,8 @@ func (i *InMemCollector) processDropDecisions(msg string) {
}

func (i *InMemCollector) processKeptDecision(msg string) {
i.Metrics.Increment("kept_decisions_received")

td, err := newKeptTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err)
Expand Down Expand Up @@ -1513,33 +1482,50 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis

if td.Kept {
decisionMsg, err = newKeptDecisionMessage(td)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": td.TraceID,
"kept": td.Kept,
"reason": td.KeptReason,
"sampler": td.SamplerKey,
"selector": td.SamplerSelector,
"error": err.Error(),
}).Logf("Failed to create trace decision message")
return
}

select {
case i.keptDecisionBuffer <- decisionMsg:
default:
i.Metrics.Increment("collector_kept_decisions_queue_full")
i.Logger.Warn().Logf("kept trace decision buffer is full. Dropping message")
}
return
} else {
// if we're dropping the trace, we should add it to the batch so we can send it later
i.dropDecisionBatch <- td.TraceID
select {
case i.dropDecisionBatch <- td.TraceID:
default:
i.Metrics.Increment("collector_drop_decisions_queue_full")
i.Logger.Warn().Logf("drop trace decision buffer is full. Dropping message")
}
return
}
}

if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": td.TraceID,
"kept": td.Kept,
"reason": td.KeptReason,
"sampler": td.SamplerKey,
"selector": td.SamplerSelector,
"error": err.Error(),
}).Logf("Failed to create trace decision message")
func (i *InMemCollector) sendKeptDecisions() {
if i.Config.GetCollectionConfig().EnableTraceLocality {
return
}

err = i.PubSub.Publish(ctx, keptTraceDecisionTopic, decisionMsg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": td.TraceID,
"kept": td.Kept,
"reason": td.KeptReason,
"sampler": td.SamplerKey,
"selector": td.SamplerSelector,
"error": err.Error(),
}).Logf("Failed to publish trace decision")
ctx := context.Background()
for msg := range i.keptDecisionBuffer {
err := i.PubSub.Publish(ctx, keptTraceDecisionTopic, msg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
}).Logf("Failed to publish trace decision")
}

}
}

Expand Down
19 changes: 19 additions & 0 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func TestAddRootSpan(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -268,6 +269,7 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) {
coll.fromPeer = make(chan *types.Span, 5)
coll.dropDecisionBatch = make(chan string, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendDropDecisions()
Expand Down Expand Up @@ -369,6 +371,7 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -426,6 +429,7 @@ func TestAddSpan(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -532,6 +536,7 @@ func TestDryRunMode(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -758,6 +763,7 @@ func TestStableMaxAlloc(t *testing.T) {
coll.incoming = make(chan *types.Span, 1000)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 500)
coll.keptDecisionBuffer = make(chan string, 500)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -950,6 +956,7 @@ func TestAddCountsToRoot(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1040,6 +1047,7 @@ func TestLateRootGetsCounts(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1134,7 +1142,9 @@ func TestAddSpanCount(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

go coll.collect()
go coll.sendTraces()

Expand Down Expand Up @@ -1225,6 +1235,7 @@ func TestLateRootGetsSpanCount(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1303,6 +1314,7 @@ func TestLateSpanNotDecorated(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1375,6 +1387,7 @@ func TestAddAdditionalAttributes(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1536,6 +1549,7 @@ func TestStressReliefDecorateHostname(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1643,6 +1657,7 @@ func TestSpanWithRuleReasons(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1753,6 +1768,7 @@ func TestRedistributeTraces(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
Expand Down Expand Up @@ -1863,6 +1879,7 @@ func TestDrainTracesOnShutdown(t *testing.T) {
coll.fromPeer = make(chan *types.Span, 5)

coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

sentTraceChan := make(chan sentRecord, 1)
Expand Down Expand Up @@ -1993,6 +2010,7 @@ func TestBigTracesGoEarly(t *testing.T) {
coll.incoming = make(chan *types.Span, 500)
coll.fromPeer = make(chan *types.Span, 500)
coll.outgoingTraces = make(chan sendableTrace, 500)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -2226,6 +2244,7 @@ func TestExpiredTracesCleanup(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

for _, traceID := range peerTraceIDs {
Expand Down

0 comments on commit fe442a3

Please sign in to comment.