Skip to content

Commit

Permalink
Fix start/stop logic for batch processor (#7708)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored May 22, 2023
1 parent 38615b2 commit bf6ca2b
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 4 deletions.
11 changes: 11 additions & 0 deletions .chloggen/fixbatcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: batchprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix start/stop logic for batch processor

# One or more tracking issues or pull requests related to the change
issues: [7708]
4 changes: 0 additions & 4 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,11 @@ func (bp *batchProcessor) Capabilities() consumer.Capabilities {

// Start is invoked during service startup.
func (bp *batchProcessor) Start(context.Context, component.Host) error {
bp.goroutines.Add(1)
return nil
}

// Shutdown is invoked during service shutdown.
func (bp *batchProcessor) Shutdown(context.Context) error {
// Done corresponds with the initial Add(1) in Start.
bp.goroutines.Done()

close(bp.shutdownC)

// Wait until all goroutines are done.
Expand Down
51 changes: 51 additions & 0 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,57 @@ import (
"go.opentelemetry.io/collector/processor/processortest"
)

func TestProcessorShutdown(t *testing.T) {
factory := NewFactory()

ctx := context.Background()
processorCreationSet := processortest.NewNopCreateSettings()

for i := 0; i < 5; i++ {
require.NotPanics(t, func() {
tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop())
require.NoError(t, err)
_ = tProc.Shutdown(ctx)
})

require.NotPanics(t, func() {
mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop())
require.NoError(t, err)
_ = mProc.Shutdown(ctx)
})

require.NotPanics(t, func() {
lProc, err := factory.CreateLogsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop())
require.NoError(t, err)
_ = lProc.Shutdown(ctx)
})
}
}

func TestProcessorLifecycle(t *testing.T) {
factory := NewFactory()

ctx := context.Background()
processorCreationSet := processortest.NewNopCreateSettings()

for i := 0; i < 5; i++ {
tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop())
require.NoError(t, err)
require.NoError(t, tProc.Start(ctx, componenttest.NewNopHost()))
require.NoError(t, tProc.Shutdown(ctx))

mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop())
require.NoError(t, err)
require.NoError(t, mProc.Start(ctx, componenttest.NewNopHost()))
require.NoError(t, mProc.Shutdown(ctx))

lProc, err := factory.CreateLogsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop())
require.NoError(t, err)
require.NoError(t, lProc.Start(ctx, componenttest.NewNopHost()))
require.NoError(t, lProc.Shutdown(ctx))
}
}

func TestBatchProcessorSpansDelivered(t *testing.T) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
Expand Down

0 comments on commit bf6ca2b

Please sign in to comment.