From 2d377eab11b877f69fa66002fe5a68795ddba1a7 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 20 Oct 2023 17:39:29 +0200 Subject: [PATCH 1/5] tailsamplingprocessor: Create benchmark for makeDecision --- .../tailsamplingprocessor/processor_test.go | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 53bea153d4bd..0151d522b68b 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -1013,3 +1013,37 @@ func simpleTracesWithID(traceID pcommon.TraceID) ptrace.Traces { traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID(traceID) return traces } + +func BenchmarkSampling(b *testing.B) { + traceIds, batches := generateIdsAndBatches(128) + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: uint64(2 * len(traceIds)), + ExpectedNewTracesPerSec: 64, + PolicyCfgs: testPolicy, + } + + sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg) + tsp := sp.(*tailSamplingSpanProcessor) + require.NoError(b, tsp.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(b, tsp.Shutdown(context.Background())) + }() + metrics := &policyMetrics{} + sampleBatches := make([]*sampling.TraceData, 0, len(batches)) + + for i := 0; i < len(batches); i++ { + sampleBatches = append(sampleBatches, &sampling.TraceData{ + Decisions: []sampling.Decision{sampling.Pending}, + ArrivalTime: time.Now(), + //SpanCount: spanCount, + ReceivedBatches: ptrace.NewTraces(), + }) + } + + for i := 0; i < b.N; i++ { + for i, id := range traceIds { + _, _ = tsp.makeDecision(id, sampleBatches[i], metrics) + } + } +} From 9e359d66b871d95c61ee4934c9cbe129a485f20a Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 20 Oct 2023 17:39:57 +0200 Subject: [PATCH 2/5] tailsamplingprocessor: Optimize tag mutator memory allocations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since each `tailSamplingSpanProcessor`'s instance is not concurrently called by the ticker worker (it's a 1-to-1 relationship) we can safely reuse a slice for the tag mutators used in `makeDecision`. Additionally the tag mutators themselves were causing a lot of allocations and since they are static, we created constants for them preventing allocations on each execution of `makeDecision`. This improved the `makeDecision` benchmark by ~31%. ``` benchstat old.txt new.txt name old time/op new time/op delta Sampling-10 51.8µs ± 1% 35.7µs ± 1% -30.94% (p=0.008 n=5+5) ``` --- processor/tailsamplingprocessor/processor.go | 26 +++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 92218b743194..ae1c566886b6 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -26,6 +26,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" ) +var ( + tagUpsertSampled = tag.Upsert(tagSampledKey, "true") + tagUpsertNotSampled = tag.Upsert(tagSampledKey, "false") +) + // policy combines a sampling policy evaluator with the destinations to be // used for that policy. type policy struct { @@ -51,6 +56,10 @@ type tailSamplingSpanProcessor struct { decisionBatcher idbatcher.Batcher deleteChan chan pcommon.TraceID numTracesOnMap *atomic.Uint64 + + // This is for reusing the slice by each call of `makeDecision`. This + // was previously identified to be a bottleneck using profiling. + mutatorsBuf []tag.Mutator } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -115,6 +124,10 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting policies: policies, tickerFrequency: time.Second, numTracesOnMap: &atomic.Uint64{}, + + // We allocate exactly 1 element, because that's the exact amount + // used in any place. + mutatorsBuf: make([]tag.Mutator, 1), } tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick} @@ -279,6 +292,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa finalDecision = sampling.Sampled } + mutators := tsp.mutatorsBuf for i, p := range tsp.policies { switch trace.Decisions[i] { case sampling.Sampled: @@ -288,17 +302,19 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa matchingPolicy = p } + mutators[0] = tagUpsertSampled _ = stats.RecordWithTags( p.ctx, - []tag.Mutator{tag.Upsert(tagSampledKey, "true")}, + mutators, statCountTracesSampled.M(int64(1)), ) metrics.decisionSampled++ case sampling.NotSampled: + mutators[0] = tagUpsertNotSampled _ = stats.RecordWithTags( p.ctx, - []tag.Mutator{tag.Upsert(tagSampledKey, "false")}, + mutators, statCountTracesSampled.M(int64(1)), ) metrics.decisionNotSampled++ @@ -307,15 +323,17 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa switch finalDecision { case sampling.Sampled: + mutators[0] = tagUpsertSampled _ = stats.RecordWithTags( tsp.ctx, - []tag.Mutator{tag.Upsert(tagSampledKey, "true")}, + mutators, statCountGlobalTracesSampled.M(int64(1)), ) case sampling.NotSampled: + mutators[0] = tagUpsertNotSampled _ = stats.RecordWithTags( tsp.ctx, - []tag.Mutator{tag.Upsert(tagSampledKey, "false")}, + mutators, statCountGlobalTracesSampled.M(int64(1)), ) } From dca42762eceb6f73878f7c8d43b916dd6a99aec5 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 20 Oct 2023 17:53:05 +0200 Subject: [PATCH 3/5] .chloggen: Add changelog item --- .chloggen/optimize-tailsamplingprocessor.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/optimize-tailsamplingprocessor.yaml diff --git a/.chloggen/optimize-tailsamplingprocessor.yaml b/.chloggen/optimize-tailsamplingprocessor.yaml new file mode 100644 index 000000000000..218bff68f6a0 --- /dev/null +++ b/.chloggen/optimize-tailsamplingprocessor.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: tailsamplingprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Optimize performance of tailsamplingprocessor + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27889] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 7b4aac333ff50808d0e9b3184d88a3268ef251b0 Mon Sep 17 00:00:00 2001 From: Jiekun Date: Wed, 25 Oct 2023 09:08:56 +0800 Subject: [PATCH 4/5] fix: [test] tailSamplingSpanProcessor structs in ut are missing new field mutatorsBuf and panic in CICD. Added this field to all struct in ut. --- processor/tailsamplingprocessor/processor_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 0151d522b68b..cfdfefb3e231 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -7,6 +7,7 @@ import ( "context" "encoding/binary" "errors" + "go.opencensus.io/tag" "sort" "sync" "sync/atomic" @@ -128,6 +129,7 @@ func TestTraceIntegrity(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -360,6 +362,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -421,6 +424,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -489,6 +493,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -552,6 +557,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -614,6 +620,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { deleteChan: make(chan pcommon.TraceID, maxSize), policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, + mutatorsBuf: make([]tag.Mutator, 1), numTracesOnMap: &atomic.Uint64{}, } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) @@ -678,6 +685,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { policyTicker: &manualTTicker{}, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -745,6 +753,7 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, + mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { From a82f3ea56cfea6166d685dfc6a23ad2a1402ec68 Mon Sep 17 00:00:00 2001 From: Jiekun Date: Wed, 25 Oct 2023 09:49:29 +0800 Subject: [PATCH 5/5] fix: [test] fixed lint in tailprocessor --- processor/tailsamplingprocessor/processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index cfdfefb3e231..f5da628dc503 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -7,7 +7,6 @@ import ( "context" "encoding/binary" "errors" - "go.opencensus.io/tag" "sort" "sync" "sync/atomic" @@ -16,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opencensus.io/tag" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon"