Skip to content

Commit

Permalink
[testbed] Add compression end-to-end tests for SAPM
Browse files Browse the repository at this point in the history
- Added gzip and zstd tests to SAPM trace test.

- Modified GenerateTraces to generate more variable data to make
  sure it is a more fair comparison when compression is enabled
  (otherwise it was compressing too well and was not representative
  of real-world workloads).
  • Loading branch information
tigrannajaryan committed Jun 19, 2023
1 parent 5fff104 commit 2332f4d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 15 deletions.
16 changes: 11 additions & 5 deletions testbed/datareceivers/sapm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
// SapmDataReceiver implements Sapm format receiver.
type SapmDataReceiver struct {
testbed.DataReceiverBase
receiver receiver.Traces
receiver receiver.Traces
compression string
}

// NewSapmDataReceiver creates a new SapmDataReceiver.
func NewSapmDataReceiver(port int) *SapmDataReceiver {
return &SapmDataReceiver{DataReceiverBase: testbed.DataReceiverBase{Port: port}}
func NewSapmDataReceiver(port int, compression string) *SapmDataReceiver {
return &SapmDataReceiver{DataReceiverBase: testbed.DataReceiverBase{Port: port}, compression: compression}
}

// Start the receiver.
Expand Down Expand Up @@ -57,12 +58,17 @@ func (sr *SapmDataReceiver) Stop() error {

// GenConfigYAMLStr returns exporter config for the agent.
func (sr *SapmDataReceiver) GenConfigYAMLStr() string {
disableCompression := false
if sr.compression == "" {
disableCompression = true
}
// Note that this generates an exporter config for agent.
return fmt.Sprintf(`
sapm:
endpoint: "http://127.0.0.1:%d/v2/trace"
disable_compression: true
access_token_passthrough: true`, sr.Port)
disable_compression: %v
compression: %s
access_token_passthrough: true`, sr.Port, disableCompression, sr.compression)
}

// ProtocolName returns protocol name as it is specified in Collector config.
Expand Down
13 changes: 9 additions & 4 deletions testbed/datasenders/sapm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,34 @@ import (
type SapmDataSender struct {
testbed.DataSenderBase
consumer.Traces
compression string
}

// Ensure SapmDataSender implements TraceDataSenderOld.
var _ testbed.TraceDataSender = (*SapmDataSender)(nil)

// NewSapmDataSender creates a new Sapm protocol sender that will send
// to the specified port after Start is called.
func NewSapmDataSender(port int) *SapmDataSender {
func NewSapmDataSender(port int, compression string) *SapmDataSender {
return &SapmDataSender{
DataSenderBase: testbed.DataSenderBase{
Port: port,
Host: testbed.DefaultHost,
},
compression: compression,
}
}

// Start the sender.
func (je *SapmDataSender) Start() error {
factory := sapmexporter.NewFactory()
cfg := &sapmexporter.Config{
Endpoint: fmt.Sprintf("http://%s/v2/trace", je.GetEndpoint()),
DisableCompression: true,
AccessToken: "MyToken",
Endpoint: fmt.Sprintf("http://%s/v2/trace", je.GetEndpoint()),
Compression: je.compression,
AccessToken: "MyToken",
}
if je.compression == "" {
cfg.DisableCompression = true
}
params := exportertest.NewNopCreateSettings()
params.Logger = zap.L()
Expand Down
4 changes: 2 additions & 2 deletions testbed/stabilitytests/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func TestStabilityTracesOpenCensus(t *testing.T) {
func TestStabilityTracesSAPM(t *testing.T) {
scenarios.Scenario10kItemsPerSecond(
t,
datasenders.NewSapmDataSender(testbed.GetAvailablePort(t)),
datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t)),
datasenders.NewSapmDataSender(testbed.GetAvailablePort(t), ""),
datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t), ""),
testbed.ResourceSpec{
ExpectedMaxCPU: 40,
ExpectedMaxRAM: 100,
Expand Down
4 changes: 2 additions & 2 deletions testbed/testbed/data_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (dp *perfTestDataProvider) GenerateTraces() (ptrace.Traces, bool) {
traceID := dp.traceIDSequence.Add(1)
for i := 0; i < dp.options.ItemsPerBatch; i++ {

startTime := time.Now()
startTime := time.Now().Add(time.Duration(i+int(traceID)*1000) * time.Second)
endTime := startTime.Add(time.Millisecond)

spanID := dp.dataItemsGenerated.Add(1)
Expand All @@ -71,7 +71,7 @@ func (dp *perfTestDataProvider) GenerateTraces() (ptrace.Traces, bool) {
// Create a span.
span.SetTraceID(idutils.UInt64ToTraceID(0, traceID))
span.SetSpanID(idutils.UInt64ToSpanID(spanID))
span.SetName("load-generator-span")
span.SetName("load-generator-span" + strconv.FormatUint(spanID+traceID*1000, 10))
span.SetKind(ptrace.SpanKindClient)
attrs := span.Attributes()
attrs.PutInt("load_generator.span_seq_num", int64(spanID))
Expand Down
22 changes: 20 additions & 2 deletions testbed/tests/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,31 @@ func TestTrace10kSPS(t *testing.T) {
},
{
"SAPM",
datasenders.NewSapmDataSender(testbed.GetAvailablePort(t)),
datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t)),
datasenders.NewSapmDataSender(testbed.GetAvailablePort(t), ""),
datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t), ""),
testbed.ResourceSpec{
ExpectedMaxCPU: 32,
ExpectedMaxRAM: 100,
},
},
{
"SAPM-gzip",
datasenders.NewSapmDataSender(testbed.GetAvailablePort(t), "gzip"),
datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t), "gzip"),
testbed.ResourceSpec{
ExpectedMaxCPU: 35,
ExpectedMaxRAM: 110,
},
},
{
"SAPM-zstd",
datasenders.NewSapmDataSender(testbed.GetAvailablePort(t), "zstd"),
datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t), "zstd"),
testbed.ResourceSpec{
ExpectedMaxCPU: 32,
ExpectedMaxRAM: 300,
},
},
{
"Zipkin",
datasenders.NewZipkinDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
Expand Down

0 comments on commit 2332f4d

Please sign in to comment.