Skip to content

Commit

Permalink
create new top level function tests to avoid indentation changes
Browse files Browse the repository at this point in the history
  • Loading branch information
paivagustavo committed Nov 16, 2022
1 parent 8aca1eb commit 5961f60
Showing 1 changed file with 175 additions and 168 deletions.
343 changes: 175 additions & 168 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,100 +123,103 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
}

func TestBatchProcessorSentBySize(t *testing.T) {
telemetryTest(t, testBatchProcessorSentBySize)
}

func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry, registry *featuregate.Registry) {
sizer := &ptrace.ProtoMarshaler{}
telemetryTest(t, func(t *testing.T, tel testTelemetry, registry *featuregate.Registry) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.Timeout = 500 * time.Millisecond
creationSet := tel.NewProcessorCreateSettings()
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed, registry)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 100
spansPerRequest := 5

start := time.Now()
sizeSum := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
sizeSum += sizer.TracesSize(td)
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.Timeout = 500 * time.Millisecond
creationSet := tel.NewProcessorCreateSettings()
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed, registry)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 100
spansPerRequest := 5

start := time.Now()
sizeSum := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
sizeSum += sizer.TracesSize(td)
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}

require.NoError(t, batcher.Shutdown(context.Background()))
require.NoError(t, batcher.Shutdown(context.Background()))

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

expectedBatchesNum := requestCount * spansPerRequest / sendBatchSize
expectedBatchingFactor := sendBatchSize / spansPerRequest
expectedBatchesNum := requestCount * spansPerRequest / sendBatchSize
expectedBatchingFactor := sendBatchSize / spansPerRequest

require.Equal(t, requestCount*spansPerRequest, sink.SpanCount())
receivedTraces := sink.AllTraces()
require.EqualValues(t, expectedBatchesNum, len(receivedTraces))
for _, td := range receivedTraces {
rss := td.ResourceSpans()
require.Equal(t, expectedBatchingFactor, rss.Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, spansPerRequest, rss.At(i).ScopeSpans().At(0).Spans().Len())
}
require.Equal(t, requestCount*spansPerRequest, sink.SpanCount())
receivedTraces := sink.AllTraces()
require.EqualValues(t, expectedBatchesNum, len(receivedTraces))
for _, td := range receivedTraces {
rss := td.ResourceSpans()
require.Equal(t, expectedBatchingFactor, rss.Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, spansPerRequest, rss.At(i).ScopeSpans().At(0).Spans().Len())
}
}

tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.SpanCount()),
sendSizeBytesSum: float64(sizeSum),
sizeTrigger: float64(expectedBatchesNum),
})
tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.SpanCount()),
sendSizeBytesSum: float64(sizeSum),
sizeTrigger: float64(expectedBatchesNum),
})

}

func TestBatchProcessorSentBySize_withMaxSize(t *testing.T) {
telemetryTest(t, func(t *testing.T, tel testTelemetry, registry *featuregate.Registry) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
sendBatchMaxSize := 37
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.SendBatchMaxSize = uint32(sendBatchMaxSize)
cfg.Timeout = 500 * time.Millisecond
creationSet := tel.NewProcessorCreateSettings()
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed, registry)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 1
spansPerRequest := 500
totalSpans := requestCount * spansPerRequest

start := time.Now()
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}
telemetryTest(t, testBatchProcessorSentBySize_withMaxSize)
}

require.NoError(t, batcher.Shutdown(context.Background()))
func testBatchProcessorSentBySize_withMaxSize(t *testing.T, tel testTelemetry, registry *featuregate.Registry) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
sendBatchMaxSize := 37
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.SendBatchMaxSize = uint32(sendBatchMaxSize)
cfg.Timeout = 500 * time.Millisecond
creationSet := tel.NewProcessorCreateSettings()
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed, registry)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
requestCount := 1
spansPerRequest := 500
totalSpans := requestCount * spansPerRequest

// The max batch size is not a divisor of the total number of spans
expectedBatchesNum := int(math.Ceil(float64(totalSpans) / float64(sendBatchMaxSize)))
start := time.Now()
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}

require.NoError(t, batcher.Shutdown(context.Background()))

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

require.Equal(t, totalSpans, sink.SpanCount())
receivedTraces := sink.AllTraces()
require.EqualValues(t, expectedBatchesNum, len(receivedTraces))
// The max batch size is not a divisor of the total number of spans
expectedBatchesNum := int(math.Ceil(float64(totalSpans) / float64(sendBatchMaxSize)))

tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.SpanCount()),
sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)),
timeoutTrigger: 1,
})
require.Equal(t, totalSpans, sink.SpanCount())
receivedTraces := sink.AllTraces()
require.EqualValues(t, expectedBatchesNum, len(receivedTraces))

tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.SpanCount()),
sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)),
timeoutTrigger: 1,
})
}

Expand Down Expand Up @@ -346,59 +349,61 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
}

func TestBatchMetricProcessor_BatchSize(t *testing.T) {
telemetryTest(t, testBatchMetricProcessor_BatchSize)
}

func testBatchMetricProcessor_BatchSize(t *testing.T, tel testTelemetry, registry *featuregate.Registry) {
sizer := &pmetric.ProtoMarshaler{}
telemetryTest(t, func(t *testing.T, tel testTelemetry, registry *featuregate.Registry) {

// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)),
Timeout: 100 * time.Millisecond,
SendBatchSize: 50,
}

requestCount := 100
metricsPerRequest := 5
dataPointsPerMetric := 2 // Since the int counter uses two datapoints.
dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
sink := new(consumertest.MetricsSink)

creationSet := tel.NewProcessorCreateSettings()
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, registry)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
size := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
md := testdata.GenerateMetrics(metricsPerRequest)
size += sizer.MetricsSize(md)
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
}
require.NoError(t, batcher.Shutdown(context.Background()))
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)),
Timeout: 100 * time.Millisecond,
SendBatchSize: 50,
}

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
requestCount := 100
metricsPerRequest := 5
dataPointsPerMetric := 2 // Since the int counter uses two datapoints.
dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
sink := new(consumertest.MetricsSink)

expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest
creationSet := tel.NewProcessorCreateSettings()
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, registry)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount())
receivedMds := sink.AllMetrics()
require.Equal(t, expectedBatchesNum, len(receivedMds))
for _, md := range receivedMds {
require.Equal(t, expectedBatchingFactor, md.ResourceMetrics().Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, metricsPerRequest, md.ResourceMetrics().At(i).ScopeMetrics().At(0).Metrics().Len())
}
start := time.Now()
size := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
md := testdata.GenerateMetrics(metricsPerRequest)
size += sizer.MetricsSize(md)
assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md))
}
require.NoError(t, batcher.Shutdown(context.Background()))

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest

require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount())
receivedMds := sink.AllMetrics()
require.Equal(t, expectedBatchesNum, len(receivedMds))
for _, md := range receivedMds {
require.Equal(t, expectedBatchingFactor, md.ResourceMetrics().Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, metricsPerRequest, md.ResourceMetrics().At(i).ScopeMetrics().At(0).Metrics().Len())
}
}

tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.DataPointCount()),
sendSizeBytesSum: float64(size),
sizeTrigger: 20,
})
tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.DataPointCount()),
sendSizeBytesSum: float64(size),
sizeTrigger: 20,
})
}

Expand Down Expand Up @@ -654,57 +659,59 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
}

func TestBatchLogProcessor_BatchSize(t *testing.T) {
telemetryTest(t, testBatchLogProcessor_BatchSize)
}

func testBatchLogProcessor_BatchSize(t *testing.T, tel testTelemetry, registry *featuregate.Registry) {
sizer := &plog.ProtoMarshaler{}
telemetryTest(t, func(t *testing.T, tel testTelemetry, registry *featuregate.Registry) {

// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)),
Timeout: 100 * time.Millisecond,
SendBatchSize: 50,
}

requestCount := 100
logsPerRequest := 5
sink := new(consumertest.LogsSink)

creationSet := tel.NewProcessorCreateSettings()
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, registry)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
size := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogs(logsPerRequest)
size += sizer.LogsSize(ld)
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}
require.NoError(t, batcher.Shutdown(context.Background()))
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)),
Timeout: 100 * time.Millisecond,
SendBatchSize: 50,
}

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())
requestCount := 100
logsPerRequest := 5
sink := new(consumertest.LogsSink)

expectedBatchesNum := requestCount * logsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / logsPerRequest
creationSet := tel.NewProcessorCreateSettings()
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, registry)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount())
receivedMds := sink.AllLogs()
require.Equal(t, expectedBatchesNum, len(receivedMds))
for _, ld := range receivedMds {
require.Equal(t, expectedBatchingFactor, ld.ResourceLogs().Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, logsPerRequest, ld.ResourceLogs().At(i).ScopeLogs().At(0).LogRecords().Len())
}
start := time.Now()
size := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogs(logsPerRequest)
size += sizer.LogsSize(ld)
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}
require.NoError(t, batcher.Shutdown(context.Background()))

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

expectedBatchesNum := requestCount * logsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / logsPerRequest

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount())
receivedMds := sink.AllLogs()
require.Equal(t, expectedBatchesNum, len(receivedMds))
for _, ld := range receivedMds {
require.Equal(t, expectedBatchingFactor, ld.ResourceLogs().Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, logsPerRequest, ld.ResourceLogs().At(i).ScopeLogs().At(0).LogRecords().Len())
}
}

tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.LogRecordCount()),
sendSizeBytesSum: float64(size),
sizeTrigger: float64(expectedBatchesNum),
})
tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.LogRecordCount()),
sendSizeBytesSum: float64(size),
sizeTrigger: float64(expectedBatchesNum),
})
}

Expand Down

0 comments on commit 5961f60

Please sign in to comment.