Skip to content

Commit

Permalink
[testbed] Add batcher performance tests
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Nov 5, 2024
1 parent 740d9aa commit 89f3353
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 8 deletions.
2 changes: 2 additions & 0 deletions cmd/oteltestbedcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ processors:
- gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.112.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.112.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.112.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.112.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.112.0

receivers:
- gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.112.0
Expand Down
4 changes: 4 additions & 0 deletions testbed/stabilitytests/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestStabilityMetricsOTLP(t *testing.T) {
contribPerfResultsSummary,
nil,
nil,
nil,
)
}

Expand All @@ -42,6 +43,7 @@ func TestStabilityMetricsOpenCensus(t *testing.T) {
contribPerfResultsSummary,
nil,
nil,
nil,
)
}

Expand All @@ -58,6 +60,7 @@ func TestStabilityMetricsCarbon(t *testing.T) {
contribPerfResultsSummary,
nil,
nil,
nil,
)
}

Expand All @@ -74,5 +77,6 @@ func TestStabilityMetricsSignalFx(t *testing.T) {
contribPerfResultsSummary,
nil,
nil,
nil,
)
}
6 changes: 6 additions & 0 deletions testbed/stabilitytests/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestStabilityTracesOpenCensus(t *testing.T) {
contribPerfResultsSummary,
processorsConfig,
nil,
nil,
)
}

Expand All @@ -68,6 +69,7 @@ func TestStabilityTracesSAPM(t *testing.T) {
contribPerfResultsSummary,
processorsConfig,
nil,
nil,
)
}

Expand All @@ -84,6 +86,7 @@ func TestStabilityTracesOTLP(t *testing.T) {
contribPerfResultsSummary,
processorsConfig,
nil,
nil,
)
}

Expand All @@ -100,6 +103,7 @@ func TestStabilityTracesJaegerGRPC(t *testing.T) {
contribPerfResultsSummary,
processorsConfig,
nil,
nil,
)
}

Expand All @@ -116,6 +120,7 @@ func TestStabilityTracesZipkin(t *testing.T) {
contribPerfResultsSummary,
processorsConfig,
nil,
nil,
)
}

Expand All @@ -132,5 +137,6 @@ func TestStabilityTracesDatadog(t *testing.T) {
contribPerfResultsSummary,
processorsConfig,
nil,
nil,
)
}
9 changes: 8 additions & 1 deletion testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type BaseOTLPDataReceiver struct {
compression string
retry string
sendingQueue string
batcher string
}

func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
Expand Down Expand Up @@ -98,6 +99,11 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec
return bor
}

func (bor *BaseOTLPDataReceiver) WithBatcher(batcher string) *BaseOTLPDataReceiver {
bor.batcher = batcher
return bor
}

func (bor *BaseOTLPDataReceiver) Stop() error {
// we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver.
return bor.logReceiver.Shutdown(context.Background())
Expand All @@ -118,8 +124,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string {
endpoint: "%s"
%s
%s
%s
tls:
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue)
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.batcher)
comp := "none"
if bor.compression != "" {
comp = bor.compression
Expand Down
259 changes: 259 additions & 0 deletions testbed/tests/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package tests contains test cases. To run the tests go to tests directory and run:
// RUN_TESTBED=1 go test -v

//go:build batcher
// +build batcher

package tests

// The tests in this file measure the effect of batching on collector performance.
// Their primary intent is to measure the performance impact of https://github.com/open-telemetry/opentelemetry-collector/issues/8122.

import (
"fmt"
"slices"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

type batcherTestSpec struct {
name string
withQueue bool
withBatchProcessor bool
withExporterBatcher bool
batchSize int
processors []ProcessorNameAndConfigBody
resourceSpec testbed.ResourceSpec
extensions map[string]string
}

func TestLog10kDPSNoProcessors(t *testing.T) {
tests := []batcherTestSpec{
{
name: "No batching, no queue",
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "No batching, queue",
withQueue: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with batch processor, no queue",
batchSize: 1000,
withBatchProcessor: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with batch processor, queue",
batchSize: 1000,
withBatchProcessor: true,
withQueue: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with exporter batcher, no queue",
withExporterBatcher: true,
batchSize: 1000,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with exporter batcher, queue",
withExporterBatcher: true,
withQueue: true,
batchSize: 1000,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
receiver.WithRetry(`
retry_on_failure:
enabled: true
`)
if test.withQueue {
receiver.WithQueue(`
sending_queue:
enabled: true
`)
}

if test.withExporterBatcher {
receiver.WithBatcher(fmt.Sprintf(`
batcher:
enabled: true
min_size_items: %d
`, test.batchSize))
}

processors := slices.Clone(test.processors)
if test.withBatchProcessor {
processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{
Name: "batch",
Body: fmt.Sprintf(`
batch:
send_batch_size: %d
`, test.batchSize),
})
}
loadOptions := &testbed.LoadOptions{
Parallel: 10,
ItemsPerBatch: 10,
}
Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, processors, test.extensions, loadOptions)
})
}
}

func TestLog10kDPSWithProcessors(t *testing.T) {
processors := []ProcessorNameAndConfigBody{
{
Name: "filter",
Body: `
filter:
logs:
log_record:
- not IsMatch(attributes["batch_index"], "batch_.+")
`,
},
{
Name: "transform",
Body: `
transform:
log_statements:
- context: log
statements:
- set(resource.attributes["batch_index"], attributes["batch_index"])
- set(attributes["counter"], ExtractPatterns(body, "Load Generator Counter (?P<counter>.+)"))
`,
},
}
tests := []batcherTestSpec{
{
name: "No batching, no queue",
processors: processors,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "No batching, queue",
processors: processors,
withQueue: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with batch processor, no queue",
processors: processors,
batchSize: 1000,
withBatchProcessor: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with batch processor, queue",
batchSize: 1000,
withBatchProcessor: true,
withQueue: true,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with exporter batcher, no queue",
processors: processors,
withExporterBatcher: true,
batchSize: 1000,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
{
name: "Batch size 1000 with exporter batcher, queue",
processors: processors,
withExporterBatcher: true,
withQueue: true,
batchSize: 1000,
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 30,
ExpectedMaxRAM: 120,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sender := testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
receiver.WithRetry(`
retry_on_failure:
enabled: true
`)
if test.withQueue {
receiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 10
`)
}

if test.withExporterBatcher {
receiver.WithBatcher(fmt.Sprintf(`
batcher:
enabled: true
min_size_items: %d
`, test.batchSize))
}

processors := slices.Clone(test.processors)
if test.withBatchProcessor {
processors = slices.Insert(processors, 0, ProcessorNameAndConfigBody{
Name: "batch",
Body: fmt.Sprintf(`
batch:
send_batch_size: %d
`, test.batchSize),
})
}
loadOptions := &testbed.LoadOptions{
Parallel: 10,
ItemsPerBatch: 10,
}
Scenario10kItemsPerSecond(t, sender, receiver, test.resourceSpec, performanceResultsSummary, test.processors, test.extensions, loadOptions)
})
}
}
1 change: 1 addition & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func TestLog10kDPS(t *testing.T) {
performanceResultsSummary,
processors,
test.extensions,
nil,
)
})
}
Expand Down
1 change: 1 addition & 0 deletions testbed/tests/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestMetric10kDPS(t *testing.T) {
performanceResultsSummary,
nil,
nil,
nil,
)
})
}
Expand Down
Loading

0 comments on commit 89f3353

Please sign in to comment.