diff --git a/testbed/datareceivers/sapm.go b/testbed/datareceivers/sapm.go index 5c69c2cac7c6..140ea0d7489d 100644 --- a/testbed/datareceivers/sapm.go +++ b/testbed/datareceivers/sapm.go @@ -21,12 +21,13 @@ import ( // SapmDataReceiver implements Sapm format receiver. type SapmDataReceiver struct { testbed.DataReceiverBase - receiver receiver.Traces + receiver receiver.Traces + compression string } // NewSapmDataReceiver creates a new SapmDataReceiver. -func NewSapmDataReceiver(port int) *SapmDataReceiver { - return &SapmDataReceiver{DataReceiverBase: testbed.DataReceiverBase{Port: port}} +func NewSapmDataReceiver(port int, compression string) *SapmDataReceiver { + return &SapmDataReceiver{DataReceiverBase: testbed.DataReceiverBase{Port: port}, compression: compression} } // Start the receiver. @@ -57,12 +58,17 @@ func (sr *SapmDataReceiver) Stop() error { // GenConfigYAMLStr returns exporter config for the agent. func (sr *SapmDataReceiver) GenConfigYAMLStr() string { + disableCompression := false + if sr.compression == "" { + disableCompression = true + } // Note that this generates an exporter config for agent. return fmt.Sprintf(` sapm: endpoint: "http://127.0.0.1:%d/v2/trace" - disable_compression: true - access_token_passthrough: true`, sr.Port) + disable_compression: %v + compression: %s + access_token_passthrough: true`, sr.Port, disableCompression, sr.compression) } // ProtocolName returns protocol name as it is specified in Collector config. diff --git a/testbed/datasenders/sapm.go b/testbed/datasenders/sapm.go index 04ddfc8a7623..cbc318677399 100644 --- a/testbed/datasenders/sapm.go +++ b/testbed/datasenders/sapm.go @@ -19,6 +19,7 @@ import ( type SapmDataSender struct { testbed.DataSenderBase consumer.Traces + compression string } // Ensure SapmDataSender implements TraceDataSenderOld. @@ -26,12 +27,13 @@ var _ testbed.TraceDataSender = (*SapmDataSender)(nil) // NewSapmDataSender creates a new Sapm protocol sender that will send // to the specified port after Start is called. -func NewSapmDataSender(port int) *SapmDataSender { +func NewSapmDataSender(port int, compression string) *SapmDataSender { return &SapmDataSender{ DataSenderBase: testbed.DataSenderBase{ Port: port, Host: testbed.DefaultHost, }, + compression: compression, } } @@ -39,9 +41,12 @@ func NewSapmDataSender(port int) *SapmDataSender { func (je *SapmDataSender) Start() error { factory := sapmexporter.NewFactory() cfg := &sapmexporter.Config{ - Endpoint: fmt.Sprintf("http://%s/v2/trace", je.GetEndpoint()), - DisableCompression: true, - AccessToken: "MyToken", + Endpoint: fmt.Sprintf("http://%s/v2/trace", je.GetEndpoint()), + Compression: je.compression, + AccessToken: "MyToken", + } + if je.compression == "" { + cfg.DisableCompression = true } params := exportertest.NewNopCreateSettings() params.Logger = zap.L() diff --git a/testbed/stabilitytests/trace_test.go b/testbed/stabilitytests/trace_test.go index 33cf67614011..30699b485763 100644 --- a/testbed/stabilitytests/trace_test.go +++ b/testbed/stabilitytests/trace_test.go @@ -54,8 +54,8 @@ func TestStabilityTracesOpenCensus(t *testing.T) { func TestStabilityTracesSAPM(t *testing.T) { scenarios.Scenario10kItemsPerSecond( t, - datasenders.NewSapmDataSender(testbed.GetAvailablePort(t)), - datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t)), + datasenders.NewSapmDataSender(testbed.GetAvailablePort(t), ""), + datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t), ""), testbed.ResourceSpec{ ExpectedMaxCPU: 40, ExpectedMaxRAM: 100, diff --git a/testbed/testbed/data_providers.go b/testbed/testbed/data_providers.go index eef844ad97bf..1134ae46b736 100644 --- a/testbed/testbed/data_providers.go +++ b/testbed/testbed/data_providers.go @@ -61,7 +61,7 @@ func (dp *perfTestDataProvider) GenerateTraces() (ptrace.Traces, bool) { traceID := dp.traceIDSequence.Add(1) for i := 0; i < dp.options.ItemsPerBatch; i++ { - startTime := time.Now() + startTime := time.Now().Add(time.Duration(i+int(traceID)*1000) * time.Second) endTime := startTime.Add(time.Millisecond) spanID := dp.dataItemsGenerated.Add(1) @@ -71,7 +71,7 @@ func (dp *perfTestDataProvider) GenerateTraces() (ptrace.Traces, bool) { // Create a span. span.SetTraceID(idutils.UInt64ToTraceID(0, traceID)) span.SetSpanID(idutils.UInt64ToSpanID(spanID)) - span.SetName("load-generator-span") + span.SetName("load-generator-span" + strconv.FormatUint(spanID+traceID*1000, 10)) span.SetKind(ptrace.SpanKindClient) attrs := span.Attributes() attrs.PutInt("load_generator.span_seq_num", int64(spanID)) diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index 5b03ca8ceec6..161dc4631c12 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -93,13 +93,31 @@ func TestTrace10kSPS(t *testing.T) { }, { "SAPM", - datasenders.NewSapmDataSender(testbed.GetAvailablePort(t)), - datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t)), + datasenders.NewSapmDataSender(testbed.GetAvailablePort(t), ""), + datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t), ""), testbed.ResourceSpec{ ExpectedMaxCPU: 32, ExpectedMaxRAM: 100, }, }, + { + "SAPM-gzip", + datasenders.NewSapmDataSender(testbed.GetAvailablePort(t), "gzip"), + datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t), "gzip"), + testbed.ResourceSpec{ + ExpectedMaxCPU: 35, + ExpectedMaxRAM: 110, + }, + }, + { + "SAPM-zstd", + datasenders.NewSapmDataSender(testbed.GetAvailablePort(t), "zstd"), + datareceivers.NewSapmDataReceiver(testbed.GetAvailablePort(t), "zstd"), + testbed.ResourceSpec{ + ExpectedMaxCPU: 32, + ExpectedMaxRAM: 300, + }, + }, { "Zipkin", datasenders.NewZipkinDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),