diff --git a/.chloggen/test_testbed-batcher.yaml b/.chloggen/test_testbed-batcher.yaml new file mode 100644 index 000000000000..7d3c989362cd --- /dev/null +++ b/.chloggen/test_testbed-batcher.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: testbed + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add batcher performance tests + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36206] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/oteltestbedcol/builder-config.yaml b/cmd/oteltestbedcol/builder-config.yaml index 14f880b25b51..f41a111668f7 100644 --- a/cmd/oteltestbedcol/builder-config.yaml +++ b/cmd/oteltestbedcol/builder-config.yaml @@ -33,6 +33,8 @@ processors: - gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.113.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.113.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.113.0 + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.113.0 + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.113.0 receivers: - gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.113.0 @@ -100,6 +102,8 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter => ../../exporter/sapmexporter - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver => ../../receiver/syslogreceiver - github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor => ../../processor/resourceprocessor + - github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../processor/transformprocessor + - github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor => ../../processor/filterprocessor - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter => ../../exporter/carbonexporter - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus => ../../pkg/translator/prometheus - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/testbed/stabilitytests/metric_test.go b/testbed/stabilitytests/metric_test.go index 6a68229d4c6d..05d310bfdc2c 100644 --- a/testbed/stabilitytests/metric_test.go +++ b/testbed/stabilitytests/metric_test.go @@ -26,6 +26,7 @@ func TestStabilityMetricsOTLP(t *testing.T) { contribPerfResultsSummary, nil, nil, + nil, ) } @@ -42,6 +43,7 @@ func TestStabilityMetricsOpenCensus(t *testing.T) { contribPerfResultsSummary, nil, nil, + nil, ) } @@ -58,6 +60,7 @@ func TestStabilityMetricsCarbon(t *testing.T) { contribPerfResultsSummary, nil, nil, + nil, ) } @@ -74,5 +77,6 @@ func TestStabilityMetricsSignalFx(t *testing.T) { contribPerfResultsSummary, nil, nil, + nil, ) } diff --git a/testbed/stabilitytests/trace_test.go b/testbed/stabilitytests/trace_test.go index 2006b664b855..85eeca8d366e 100644 --- a/testbed/stabilitytests/trace_test.go +++ b/testbed/stabilitytests/trace_test.go @@ -52,6 +52,7 @@ func TestStabilityTracesOpenCensus(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -68,6 +69,7 @@ func TestStabilityTracesSAPM(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -84,6 +86,7 @@ func TestStabilityTracesOTLP(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -100,6 +103,7 @@ func TestStabilityTracesJaegerGRPC(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -116,6 +120,7 @@ func TestStabilityTracesZipkin(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } @@ -132,5 +137,6 @@ func TestStabilityTracesDatadog(t *testing.T) { contribPerfResultsSummary, processorsConfig, nil, + nil, ) } diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 6db874822f94..829eb6a5d411 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -56,6 +56,7 @@ type BaseOTLPDataReceiver struct { retry string sendingQueue string timeout string + batcher string } func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { @@ -104,6 +105,11 @@ func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiv return bor } +func (bor *BaseOTLPDataReceiver) WithBatcher(batcher string) *BaseOTLPDataReceiver { + bor.batcher = batcher + return bor +} + func (bor *BaseOTLPDataReceiver) Stop() error { // we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver. return bor.logReceiver.Shutdown(context.Background()) @@ -125,8 +131,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { %s %s %s + %s tls: - insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout) + insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout, bor.batcher) comp := "none" if bor.compression != "" { comp = bor.compression diff --git a/testbed/tests/batcher_test.go b/testbed/tests/batcher_test.go new file mode 100644 index 000000000000..4952914230fa --- /dev/null +++ b/testbed/tests/batcher_test.go @@ -0,0 +1,259 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package tests contains test cases. To run the tests go to tests directory and run: +// RUN_TESTBED=1 go test -v + +//go:build batcher +// +build batcher + +package tests + +// The tests in this file measure the effect of batching on collector performance. +// Their primary intent is to measure the performance impact of https://github.com/open-telemetry/opentelemetry-collector/issues/8122. + +import ( + "fmt" + "slices" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" +) + +type batcherTestSpec struct { + name string + withQueue bool + withBatchProcessor bool + withExporterBatcher bool + batchSize int + processors []ProcessorNameAndConfigBody + resourceSpec testbed.ResourceSpec + extensions map[string]string +} + +func TestLog10kDPSNoProcessors(t *testing.T) { + tests := []batcherTestSpec{ + { + name: "No batching, no queue", + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "No batching, queue", + withQueue: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with batch processor, no queue", + batchSize: 1000, + withBatchProcessor: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with batch processor, queue", + batchSize: 1000, + withBatchProcessor: true, + withQueue: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with exporter batcher, no queue", + withExporterBatcher: true, + batchSize: 1000, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with exporter batcher, queue", + withExporterBatcher: true, + withQueue: true, + batchSize: 1000, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)) + receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + receiver.WithRetry(` + retry_on_failure: + enabled: true +`) + if test.withQueue { + receiver.WithQueue(` + sending_queue: + enabled: true +`) + } + + if test.withExporterBatcher { + receiver.WithBatcher(fmt.Sprintf(` + batcher: + enabled: true + min_size_items: %d +`, test.batchSize)) + } + + processors := slices.Clone(test.processors) + if test.withBatchProcessor { + processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{ + Name: "batch", + Body: fmt.Sprintf(` + batch: + send_batch_size: %d +`, test.batchSize), + }) + } + loadOptions := &testbed.LoadOptions{ + Parallel: 10, + ItemsPerBatch: 10, + } + Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, processors, test.extensions, loadOptions) + }) + } +} + +func TestLog10kDPSWithProcessors(t *testing.T) { + processors := []ProcessorNameAndConfigBody{ + { + Name: "filter", + Body: ` + filter: + logs: + log_record: + - not IsMatch(attributes["batch_index"], "batch_.+") +`, + }, + { + Name: "transform", + Body: ` + transform: + log_statements: + - context: log + statements: + - set(resource.attributes["batch_index"], attributes["batch_index"]) + - set(attributes["counter"], ExtractPatterns(body, "Load Generator Counter (?P.+)")) +`, + }, + } + tests := []batcherTestSpec{ + { + name: "No batching, no queue", + processors: processors, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "No batching, queue", + processors: processors, + withQueue: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with batch processor, no queue", + processors: processors, + batchSize: 1000, + withBatchProcessor: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with batch processor, queue", + batchSize: 1000, + withBatchProcessor: true, + withQueue: true, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with exporter batcher, no queue", + processors: processors, + withExporterBatcher: true, + batchSize: 1000, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + { + name: "Batch size 1000 with exporter batcher, queue", + processors: processors, + withExporterBatcher: true, + withQueue: true, + batchSize: 1000, + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 30, + ExpectedMaxRAM: 120, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)) + receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + receiver.WithRetry(` + retry_on_failure: + enabled: true +`) + if test.withQueue { + receiver.WithQueue(` + sending_queue: + enabled: true + queue_size: 10 +`) + } + + if test.withExporterBatcher { + receiver.WithBatcher(fmt.Sprintf(` + batcher: + enabled: true + min_size_items: %d +`, test.batchSize)) + } + + processors := slices.Clone(test.processors) + if test.withBatchProcessor { + processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{ + Name: "batch", + Body: fmt.Sprintf(` + batch: + send_batch_size: %d +`, test.batchSize), + }) + } + loadOptions := &testbed.LoadOptions{ + Parallel: 10, + ItemsPerBatch: 10, + } + Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, test.processors, test.extensions, loadOptions) + }) + } +} diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 2756ce2ffe97..f46e2abd04f2 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -178,6 +178,7 @@ func TestLog10kDPS(t *testing.T) { performanceResultsSummary, processors, test.extensions, + nil, ) }) } diff --git a/testbed/tests/metric_test.go b/testbed/tests/metric_test.go index 6eb8b7fd9829..2de78612cceb 100644 --- a/testbed/tests/metric_test.go +++ b/testbed/tests/metric_test.go @@ -88,6 +88,7 @@ func TestMetric10kDPS(t *testing.T) { performanceResultsSummary, nil, nil, + nil, ) }) } diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index a2e3ea868a5c..72dab67dd997 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -137,15 +137,18 @@ func Scenario10kItemsPerSecond( resultsSummary testbed.TestResultsSummary, processors []ProcessorNameAndConfigBody, extensions map[string]string, -) { + loadOptions *testbed.LoadOptions) { resultDir, err := filepath.Abs(path.Join("results", t.Name())) require.NoError(t, err) - options := testbed.LoadOptions{ - DataItemsPerSecond: 10_000, - ItemsPerBatch: 100, - Parallel: 1, + if loadOptions == nil { + loadOptions = &testbed.LoadOptions{ + ItemsPerBatch: 100, + Parallel: 1, + } } + loadOptions.DataItemsPerSecond = 10_000 + agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2")) configStr := createConfigYaml(t, sender, receiver, resultDir, processors, extensions) @@ -153,7 +156,7 @@ func Scenario10kItemsPerSecond( require.NoError(t, err) defer configCleanup() - dataProvider := testbed.NewPerfTestDataProvider(options) + dataProvider := testbed.NewPerfTestDataProvider(*loadOptions) tc := testbed.NewTestCase( t, dataProvider, @@ -169,7 +172,7 @@ func Scenario10kItemsPerSecond( tc.StartBackend() tc.StartAgent() - tc.StartLoad(options) + tc.StartLoad(*loadOptions) tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index 8f53311d5419..7b6007727e26 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -149,6 +149,7 @@ func TestTrace10kSPS(t *testing.T) { performanceResultsSummary, processors, nil, + nil, ) }) }