diff --git a/collect/collect.go b/collect/collect.go index da8549e2d3..670622353a 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -419,7 +419,7 @@ func (i *InMemCollector) collect() { span.End() return } - count := drainSpanQueue(ctx, sp, i.fromPeer, "peer", i.processSpan) + count := drainSpanQueue(ctx, sp, i.fromPeer, "peer", 1000, i.processSpan) i.Metrics.Gauge("num_span_drained_from_peer", count) case sp, ok := <-i.incoming: @@ -428,7 +428,7 @@ func (i *InMemCollector) collect() { span.End() return } - count := drainSpanQueue(ctx, sp, i.incoming, "incoming", i.processSpan) + count := drainSpanQueue(ctx, sp, i.incoming, "incoming", 1000, i.processSpan) i.Metrics.Gauge("num_span_drained_from_incoming", count) case msg, ok := <-i.dropDecisionMessages: if !ok { @@ -464,7 +464,7 @@ func (i *InMemCollector) collect() { } } -func drainSpanQueue(ctx context.Context, span *types.Span, ch <-chan *types.Span, queueName string, processSpanFunc func(context.Context, *types.Span, string)) int { +func drainSpanQueue(ctx context.Context, span *types.Span, ch <-chan *types.Span, queueName string, limit int, processSpanFunc func(context.Context, *types.Span, string)) int { // process the original span processSpanFunc(ctx, span, queueName) count := 1 @@ -472,7 +472,15 @@ func drainSpanQueue(ctx context.Context, span *types.Span, ch <-chan *types.Span // let't try to process as many spans as we can in the next 100ms // TODO: make timer configurable? timer := time.NewTimer(time.Millisecond * 100) + defer timer.Stop() + for { + + // if we've processed enough spans, we should return + if limit != 0 && count >= limit { + return count + } + select { case <-timer.C: // we've spent enough time processing spans @@ -1568,6 +1576,7 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis if td.Kept { select { + case <-i.done: case i.keptDecisionBuffer <- td: default: i.Metrics.Increment("collector_kept_decisions_queue_full") @@ -1576,6 +1585,7 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis return } else { select { + case <-i.done: case i.dropDecisionBuffer <- td: default: i.Metrics.Increment("collector_drop_decisions_queue_full")