diff --git a/pkg/stanza/adapter/converter.go b/pkg/stanza/adapter/converter.go index efe7d489f680..b10502207a58 100644 --- a/pkg/stanza/adapter/converter.go +++ b/pkg/stanza/adapter/converter.go @@ -74,8 +74,24 @@ type Converter struct { logger *zap.Logger } -func NewConverter(logger *zap.Logger) *Converter { - return &Converter{ +type converterOption interface { + apply(*Converter) +} + +func withWorkerCount(workerCount int) converterOption { + return workerCountOption{workerCount} +} + +type workerCountOption struct { + workerCount int +} + +func (o workerCountOption) apply(c *Converter) { + c.workerCount = o.workerCount +} + +func NewConverter(logger *zap.Logger, opts ...converterOption) *Converter { + c := &Converter{ workerChan: make(chan []*entry.Entry), workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))), pLogsChan: make(chan plog.Logs), @@ -83,6 +99,10 @@ func NewConverter(logger *zap.Logger) *Converter { flushChan: make(chan plog.Logs), logger: logger, } + for _, opt := range opts { + opt.apply(c) + } + return c } func (c *Converter) Start() { diff --git a/pkg/stanza/adapter/converter_test.go b/pkg/stanza/adapter/converter_test.go index b1c3cc822e93..52a198af51dd 100644 --- a/pkg/stanza/adapter/converter_test.go +++ b/pkg/stanza/adapter/converter_test.go @@ -822,12 +822,11 @@ func BenchmarkConverter(b *testing.B) { b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) { for i := 0; i < b.N; i++ { - converter := NewConverter(zap.NewNop()) + converter := NewConverter(zap.NewNop(), withWorkerCount(wc)) converter.Start() defer converter.Stop() b.ReportAllocs() - b.ResetTimer() go func() { for from := 0; from < entryCount; from += int(batchSize) {