diff --git a/.chloggen/optimize-tailsamplingprocessor.yaml b/.chloggen/optimize-tailsamplingprocessor.yaml new file mode 100644 index 000000000000..218bff68f6a0 --- /dev/null +++ b/.chloggen/optimize-tailsamplingprocessor.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: tailsamplingprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Optimize performance of tailsamplingprocessor + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27889] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 92218b743194..ae1c566886b6 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -26,6 +26,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" ) +var ( + tagUpsertSampled = tag.Upsert(tagSampledKey, "true") + tagUpsertNotSampled = tag.Upsert(tagSampledKey, "false") +) + // policy combines a sampling policy evaluator with the destinations to be // used for that policy. type policy struct { @@ -51,6 +56,10 @@ type tailSamplingSpanProcessor struct { decisionBatcher idbatcher.Batcher deleteChan chan pcommon.TraceID numTracesOnMap *atomic.Uint64 + + // This is for reusing the slice by each call of `makeDecision`. This + // was previously identified to be a bottleneck using profiling. + mutatorsBuf []tag.Mutator } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -115,6 +124,10 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting policies: policies, tickerFrequency: time.Second, numTracesOnMap: &atomic.Uint64{}, + + // We allocate exactly 1 element, because that's the exact amount + // used in any place. + mutatorsBuf: make([]tag.Mutator, 1), } tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick} @@ -279,6 +292,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa finalDecision = sampling.Sampled } + mutators := tsp.mutatorsBuf for i, p := range tsp.policies { switch trace.Decisions[i] { case sampling.Sampled: @@ -288,17 +302,19 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa matchingPolicy = p } + mutators[0] = tagUpsertSampled _ = stats.RecordWithTags( p.ctx, - []tag.Mutator{tag.Upsert(tagSampledKey, "true")}, + mutators, statCountTracesSampled.M(int64(1)), ) metrics.decisionSampled++ case sampling.NotSampled: + mutators[0] = tagUpsertNotSampled _ = stats.RecordWithTags( p.ctx, - []tag.Mutator{tag.Upsert(tagSampledKey, "false")}, + mutators, statCountTracesSampled.M(int64(1)), ) metrics.decisionNotSampled++ @@ -307,15 +323,17 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa switch finalDecision { case sampling.Sampled: + mutators[0] = tagUpsertSampled _ = stats.RecordWithTags( tsp.ctx, - []tag.Mutator{tag.Upsert(tagSampledKey, "true")}, + mutators, statCountGlobalTracesSampled.M(int64(1)), ) case sampling.NotSampled: + mutators[0] = tagUpsertNotSampled _ = stats.RecordWithTags( tsp.ctx, - []tag.Mutator{tag.Upsert(tagSampledKey, "false")}, + mutators, statCountGlobalTracesSampled.M(int64(1)), ) } diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 53bea153d4bd..f5da628dc503 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opencensus.io/tag" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" @@ -128,6 +129,7 @@ func TestTraceIntegrity(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -360,6 +362,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -421,6 +424,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -489,6 +493,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -552,6 +557,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -614,6 +620,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { deleteChan: make(chan pcommon.TraceID, maxSize), policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, + mutatorsBuf: make([]tag.Mutator, 1), numTracesOnMap: &atomic.Uint64{}, } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) @@ -678,6 +685,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { policyTicker: &manualTTicker{}, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -745,6 +753,7 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -1013,3 +1022,37 @@ func simpleTracesWithID(traceID pcommon.TraceID) ptrace.Traces { traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID(traceID) return traces } + +func BenchmarkSampling(b *testing.B) { + traceIds, batches := generateIdsAndBatches(128) + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: uint64(2 * len(traceIds)), + ExpectedNewTracesPerSec: 64, + PolicyCfgs: testPolicy, + } + + sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg) + tsp := sp.(*tailSamplingSpanProcessor) + require.NoError(b, tsp.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(b, tsp.Shutdown(context.Background())) + }() + metrics := &policyMetrics{} + sampleBatches := make([]*sampling.TraceData, 0, len(batches)) + + for i := 0; i < len(batches); i++ { + sampleBatches = append(sampleBatches, &sampling.TraceData{ + Decisions: []sampling.Decision{sampling.Pending}, + ArrivalTime: time.Now(), + //SpanCount: spanCount, + ReceivedBatches: ptrace.NewTraces(), + }) + } + + for i := 0; i < b.N; i++ { + for i, id := range traceIds { + _, _ = tsp.makeDecision(id, sampleBatches[i], metrics) + } + } +}