From 0f4b3ca4d6debc0818895318cb79580d36de2f66 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 23 Oct 2024 15:28:56 -0700 Subject: [PATCH 1/6] Batcher just the frame --- exporter/internal/queue/batcher.go | 141 ++++++++++++++++++++++++ exporter/internal/queue/batcher_test.go | 60 ++++++++++ exporter/internal/queue/fake_request.go | 63 +++++++++++ 3 files changed, 264 insertions(+) create mode 100644 exporter/internal/queue/batcher.go create mode 100644 exporter/internal/queue/batcher_test.go create mode 100644 exporter/internal/queue/fake_request.go diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go new file mode 100644 index 00000000000..1d87002e7ae --- /dev/null +++ b/exporter/internal/queue/batcher.go @@ -0,0 +1,141 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "math" + "sync" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type batch struct { + ctx context.Context + req internal.Request + idxList []uint64 +} + +type Batcher struct { + batchCfg exporterbatcher.Config + + queue Queue[internal.Request] + maxWorkers int + + exportFunc func(context.Context, internal.Request) error + + readingBatch *batch + timer *time.Timer + shutdownCh chan bool + + stopWG sync.WaitGroup +} + +func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], + maxWorkers int, exportFunc func(context.Context, internal.Request) error) *Batcher { + return &Batcher{ + batchCfg: batchCfg, + queue: queue, + maxWorkers: maxWorkers, + exportFunc: exportFunc, + stopWG: sync.WaitGroup{}, + shutdownCh: make(chan bool, 1), + } +} + +// If preconditions pass, flush() take an item from the head of batch list and exports it. +func (qb *Batcher) flush(batchToFlush batch) { + err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) + for _, idx := range batchToFlush.idxList { + qb.queue.OnProcessingFinished(idx, err) + } +} + +// allocateFlusher() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available. +func (qb *Batcher) allocateFlusher(batchToFlush batch) { + // maxWorker = 0 means we don't limit the number of flushers. + if qb.maxWorkers == 0 { + qb.stopWG.Add(1) + go func() { + qb.flush(batchToFlush) + qb.stopWG.Done() + }() + return + } + panic("not implemented") +} + +// Start ensures that queue and all consumers are started. +func (qb *Batcher) Start(ctx context.Context, host component.Host) error { + if err := qb.queue.Start(ctx, host); err != nil { + return err + } + + if qb.batchCfg.Enabled { + panic("not implemented") + } + + // Timer doesn't do anything yet, but adding it so compiler won't complain about qb.timer.C + qb.timer = time.NewTimer(math.MaxInt) + qb.timer.Stop() + + if qb.maxWorkers != 0 { + panic("not implemented") + } + + // This goroutine keeps reading until flush is triggered because of request size. + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + for { + idx, _, req, ok := qb.queue.Read(context.Background()) + + if !ok { + qb.shutdownCh <- true + if qb.readingBatch != nil { + panic("batching is supported yet so reading batch should always be nil") + } + + return + } + if !qb.batchCfg.Enabled { + qb.readingBatch = &batch{ + req: req, + ctx: context.Background(), + idxList: []uint64{idx}} + qb.allocateFlusher(*qb.readingBatch) + qb.readingBatch = nil + } else { + panic("not implemented") + } + } + }() + + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + for { + select { + case <-qb.shutdownCh: + return + case <-qb.timer.C: + panic("batching is not yet implemented. Having timer here just so compiler won't complain") + } + + } + }() + return nil +} + +// Shutdown ensures that queue and all Batcher are stopped. +func (qb *Batcher) Shutdown(ctx context.Context) error { + if err := qb.queue.Shutdown(ctx); err != nil { + return err + } + qb.stopWG.Wait() + return nil +} diff --git a/exporter/internal/queue/batcher_test.go b/exporter/internal/queue/batcher_test.go new file mode 100644 index 00000000000..0b1379938dc --- /dev/null +++ b/exporter/internal/queue/batcher_test.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +func testExportFunc(ctx context.Context, req internal.Request) error { + return req.Export(ctx) +} + +func TestBatcher_BatchNotEnabled_InfiniteWorkerPool(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + maxWorkers := 0 + ba := NewBatcher(cfg, q, maxWorkers, testExportFunc) + + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, ba.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 20*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25 + }, 20*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38 + }, 20*time.Millisecond, 10*time.Millisecond) +} diff --git a/exporter/internal/queue/fake_request.go b/exporter/internal/queue/fake_request.go new file mode 100644 index 00000000000..a0983db6d35 --- /dev/null +++ b/exporter/internal/queue/fake_request.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type fakeRequestSink struct { + requestsCount *atomic.Int64 + itemsCount *atomic.Int64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: new(atomic.Int64), + itemsCount: new(atomic.Int64), + } +} + +type fakeRequest struct { + items int + exportErr error + delay time.Duration + sink *fakeRequestSink +} + +func (r *fakeRequest) Export(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.delay): + } + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(int64(r.items)) + } + return nil +} + +func (r *fakeRequest) ItemsCount() int { + return r.items +} + +func (r *fakeRequest) Merge(_ context.Context, + _ internal.Request) (internal.Request, error) { + return nil, errors.New("not implemented") +} + +func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, + _ internal.Request) ([]internal.Request, error) { + return nil, errors.New("not implemented") +} From 095533404f324875ece8470c1eb808da9e95a38d Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 23 Oct 2024 22:06:22 -0700 Subject: [PATCH 2/6] Edits according to Dmitrii's comment --- exporter/internal/queue/batcher.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 1d87002e7ae..7aa0871c618 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -28,7 +28,7 @@ type Batcher struct { exportFunc func(context.Context, internal.Request) error - readingBatch *batch + currentBatch *batch // the batch that is being built timer *time.Timer shutdownCh chan bool @@ -55,8 +55,8 @@ func (qb *Batcher) flush(batchToFlush batch) { } } -// allocateFlusher() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available. -func (qb *Batcher) allocateFlusher(batchToFlush batch) { +// flushAsync() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available. +func (qb *Batcher) flushAsync(batchToFlush batch) { // maxWorker = 0 means we don't limit the number of flushers. if qb.maxWorkers == 0 { qb.stopWG.Add(1) @@ -87,7 +87,10 @@ func (qb *Batcher) Start(ctx context.Context, host component.Host) error { panic("not implemented") } - // This goroutine keeps reading until flush is triggered because of request size. + // This goroutine reads and then flushes if request reaches size limit. There are two operations in this + // goroutine that could be blocking: + // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. + // 2. flushAsync() blocks until there are idle workers in the worker pool. qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() @@ -96,25 +99,26 @@ func (qb *Batcher) Start(ctx context.Context, host component.Host) error { if !ok { qb.shutdownCh <- true - if qb.readingBatch != nil { - panic("batching is supported yet so reading batch should always be nil") + if qb.currentBatch != nil { + panic("batching is not supported yet so reading batch should always be nil") } - return } if !qb.batchCfg.Enabled { - qb.readingBatch = &batch{ + qb.currentBatch = &batch{ req: req, ctx: context.Background(), idxList: []uint64{idx}} - qb.allocateFlusher(*qb.readingBatch) - qb.readingBatch = nil + qb.flushAsync(*qb.currentBatch) + qb.currentBatch = nil } else { panic("not implemented") } } }() + // The following goroutine is in charge of listening to timer and shutdown signal. This is a seperate goroutine + // from the reading-flushing, because we want to keep timer seperate blocking operations. qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() From 3bbfd6a29597a8d6226f7552491b7f287f78952c Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 24 Oct 2024 12:53:43 -0700 Subject: [PATCH 3/6] addressing bogdan's comments --- exporter/internal/queue/batcher.go | 25 ++++++++++++++----------- exporter/internal/queue/batcher_test.go | 6 +----- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 7aa0871c618..549f4ebd4d5 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -20,14 +20,13 @@ type batch struct { idxList []uint64 } +// TODO type Batcher struct { batchCfg exporterbatcher.Config queue Queue[internal.Request] maxWorkers int - exportFunc func(context.Context, internal.Request) error - currentBatch *batch // the batch that is being built timer *time.Timer shutdownCh chan bool @@ -35,34 +34,32 @@ type Batcher struct { stopWG sync.WaitGroup } -func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], - maxWorkers int, exportFunc func(context.Context, internal.Request) error) *Batcher { +func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) *Batcher { return &Batcher{ batchCfg: batchCfg, queue: queue, maxWorkers: maxWorkers, - exportFunc: exportFunc, stopWG: sync.WaitGroup{}, shutdownCh: make(chan bool, 1), } } -// If preconditions pass, flush() take an item from the head of batch list and exports it. +// flush take an item from the head of batch list and exports it. func (qb *Batcher) flush(batchToFlush batch) { - err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) + err := batchToFlush.req.Export(batchToFlush.ctx) for _, idx := range batchToFlush.idxList { qb.queue.OnProcessingFinished(idx, err) } } -// flushAsync() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available. +// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available. func (qb *Batcher) flushAsync(batchToFlush batch) { // maxWorker = 0 means we don't limit the number of flushers. if qb.maxWorkers == 0 { qb.stopWG.Add(1) go func() { + defer qb.stopWG.Done() qb.flush(batchToFlush) - qb.stopWG.Done() }() return } @@ -71,6 +68,9 @@ func (qb *Batcher) flushAsync(batchToFlush batch) { // Start ensures that queue and all consumers are started. func (qb *Batcher) Start(ctx context.Context, host component.Host) error { + // TODO: queue start is done here to keep the behavior similar to queue consumer. + // However batcher should not be responsible for starting down the queue. Move this up to + // queue sender once queue consumer is cleaned up. if err := qb.queue.Start(ctx, host); err != nil { return err } @@ -117,8 +117,8 @@ func (qb *Batcher) Start(ctx context.Context, host component.Host) error { } }() - // The following goroutine is in charge of listening to timer and shutdown signal. This is a seperate goroutine - // from the reading-flushing, because we want to keep timer seperate blocking operations. + // The following goroutine is in charge of listening to timer and shutdown signal. This is a separate goroutine + // from the reading-flushing, because we want to keep timer separate blocking operations. qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() @@ -137,6 +137,9 @@ func (qb *Batcher) Start(ctx context.Context, host component.Host) error { // Shutdown ensures that queue and all Batcher are stopped. func (qb *Batcher) Shutdown(ctx context.Context) error { + // TODO: queue shutdown is done here to keep the behavior similar to queue consumer. + // However batcher should not be responsible for shutting down the queue. Move this up to + // queue sender once queue consumer is cleaned up. if err := qb.queue.Shutdown(ctx); err != nil { return err } diff --git a/exporter/internal/queue/batcher_test.go b/exporter/internal/queue/batcher_test.go index 0b1379938dc..dbb99d325b8 100644 --- a/exporter/internal/queue/batcher_test.go +++ b/exporter/internal/queue/batcher_test.go @@ -17,10 +17,6 @@ import ( "go.opentelemetry.io/collector/exporter/internal" ) -func testExportFunc(ctx context.Context, req internal.Request) error { - return req.Export(ctx) -} - func TestBatcher_BatchNotEnabled_InfiniteWorkerPool(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = false @@ -32,7 +28,7 @@ func TestBatcher_BatchNotEnabled_InfiniteWorkerPool(t *testing.T) { }) maxWorkers := 0 - ba := NewBatcher(cfg, q, maxWorkers, testExportFunc) + ba := NewBatcher(cfg, q, maxWorkers) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { From 50caf39ff318caff547544fd9e962c7fbe4659ca Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 24 Oct 2024 13:35:31 -0700 Subject: [PATCH 4/6] Implemented DisabledBatcher for the case where batching is disabled --- exporter/internal/queue/batcher.go | 116 +++++--------------- exporter/internal/queue/disabled_batcher.go | 45 ++++++++ 2 files changed, 71 insertions(+), 90 deletions(-) create mode 100644 exporter/internal/queue/disabled_batcher.go diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 549f4ebd4d5..de7796abe5b 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -5,9 +5,7 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" - "math" "sync" - "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterbatcher" @@ -20,32 +18,39 @@ type batch struct { idxList []uint64 } -// TODO -type Batcher struct { - batchCfg exporterbatcher.Config +// Batcher is in charge of reading items from the queue and send them out asynchronously. +type Batcher interface { + component.Component +} +type BaseBatcher struct { + batchCfg exporterbatcher.Config queue Queue[internal.Request] maxWorkers int + stopWG sync.WaitGroup +} - currentBatch *batch // the batch that is being built - timer *time.Timer - shutdownCh chan bool +func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) Batcher { + if maxWorkers != 0 { + panic("not implemented") + } - stopWG sync.WaitGroup -} + if batchCfg.Enabled { + panic("not implemented") + } -func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) *Batcher { - return &Batcher{ - batchCfg: batchCfg, - queue: queue, - maxWorkers: maxWorkers, - stopWG: sync.WaitGroup{}, - shutdownCh: make(chan bool, 1), + return &DisabledBatcher{ + BaseBatcher{ + batchCfg: batchCfg, + queue: queue, + maxWorkers: maxWorkers, + stopWG: sync.WaitGroup{}, + }, } } -// flush take an item from the head of batch list and exports it. -func (qb *Batcher) flush(batchToFlush batch) { +// flush exports the incoming batch synchronously. +func (qb *BaseBatcher) flush(batchToFlush batch) { err := batchToFlush.req.Export(batchToFlush.ctx) for _, idx := range batchToFlush.idxList { qb.queue.OnProcessingFinished(idx, err) @@ -53,7 +58,7 @@ func (qb *Batcher) flush(batchToFlush batch) { } // flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available. -func (qb *Batcher) flushAsync(batchToFlush batch) { +func (qb *BaseBatcher) flushAsync(batchToFlush batch) { // maxWorker = 0 means we don't limit the number of flushers. if qb.maxWorkers == 0 { qb.stopWG.Add(1) @@ -66,77 +71,8 @@ func (qb *Batcher) flushAsync(batchToFlush batch) { panic("not implemented") } -// Start ensures that queue and all consumers are started. -func (qb *Batcher) Start(ctx context.Context, host component.Host) error { - // TODO: queue start is done here to keep the behavior similar to queue consumer. - // However batcher should not be responsible for starting down the queue. Move this up to - // queue sender once queue consumer is cleaned up. - if err := qb.queue.Start(ctx, host); err != nil { - return err - } - - if qb.batchCfg.Enabled { - panic("not implemented") - } - - // Timer doesn't do anything yet, but adding it so compiler won't complain about qb.timer.C - qb.timer = time.NewTimer(math.MaxInt) - qb.timer.Stop() - - if qb.maxWorkers != 0 { - panic("not implemented") - } - - // This goroutine reads and then flushes if request reaches size limit. There are two operations in this - // goroutine that could be blocking: - // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. - // 2. flushAsync() blocks until there are idle workers in the worker pool. - qb.stopWG.Add(1) - go func() { - defer qb.stopWG.Done() - for { - idx, _, req, ok := qb.queue.Read(context.Background()) - - if !ok { - qb.shutdownCh <- true - if qb.currentBatch != nil { - panic("batching is not supported yet so reading batch should always be nil") - } - return - } - if !qb.batchCfg.Enabled { - qb.currentBatch = &batch{ - req: req, - ctx: context.Background(), - idxList: []uint64{idx}} - qb.flushAsync(*qb.currentBatch) - qb.currentBatch = nil - } else { - panic("not implemented") - } - } - }() - - // The following goroutine is in charge of listening to timer and shutdown signal. This is a separate goroutine - // from the reading-flushing, because we want to keep timer separate blocking operations. - qb.stopWG.Add(1) - go func() { - defer qb.stopWG.Done() - for { - select { - case <-qb.shutdownCh: - return - case <-qb.timer.C: - panic("batching is not yet implemented. Having timer here just so compiler won't complain") - } - - } - }() - return nil -} - // Shutdown ensures that queue and all Batcher are stopped. -func (qb *Batcher) Shutdown(ctx context.Context) error { +func (qb *BaseBatcher) Shutdown(ctx context.Context) error { // TODO: queue shutdown is done here to keep the behavior similar to queue consumer. // However batcher should not be responsible for shutting down the queue. Move this up to // queue sender once queue consumer is cleaned up. diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go new file mode 100644 index 00000000000..78fb9f8c19e --- /dev/null +++ b/exporter/internal/queue/disabled_batcher.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + + "go.opentelemetry.io/collector/component" +) + +// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will +// be sent out (asynchronously) immediately regardless of the size. +type DisabledBatcher struct { + BaseBatcher +} + +// Start starts the goroutine that reads from the queue and flushes asynchronously. +func (qb *DisabledBatcher) Start(ctx context.Context, host component.Host) error { + // TODO: queue start is done here to keep the behavior similar to queue consumer. + // However batcher should not be responsible for starting or shuttting down the queue. Move this up to + // queue sender once queue consumer is cleaned up. + if err := qb.queue.Start(ctx, host); err != nil { + return err + } + + // This goroutine reads and then flushes. + // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. + // 2. flushAsync() blocks until there are idle workers in the worker pool. + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + for { + idx, _, req, ok := qb.queue.Read(context.Background()) + if !ok { + return + } + qb.flushAsync(batch{ + req: req, + ctx: context.Background(), + idxList: []uint64{idx}}) + } + }() + return nil +} From e42ea72e882ebc74f0683925df2dbf65e7f72645 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 24 Oct 2024 14:28:03 -0700 Subject: [PATCH 5/6] Renamed batcher_test to disabled_batcher_test --- .../queue/{batcher_test.go => disabled_batcher_test.go} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename exporter/internal/queue/{batcher_test.go => disabled_batcher_test.go} (96%) diff --git a/exporter/internal/queue/batcher_test.go b/exporter/internal/queue/disabled_batcher_test.go similarity index 96% rename from exporter/internal/queue/batcher_test.go rename to exporter/internal/queue/disabled_batcher_test.go index dbb99d325b8..a90cfbbe5b2 100644 --- a/exporter/internal/queue/batcher_test.go +++ b/exporter/internal/queue/disabled_batcher_test.go @@ -17,7 +17,7 @@ import ( "go.opentelemetry.io/collector/exporter/internal" ) -func TestBatcher_BatchNotEnabled_InfiniteWorkerPool(t *testing.T) { +func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = false From 1d5a4f37b02dbca71e263012d5a60b24031f0bff Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 24 Oct 2024 19:13:26 -0700 Subject: [PATCH 6/6] Cleaned up queue.start and queue.shutdown from batcher --- exporter/internal/queue/batcher.go | 6 ------ exporter/internal/queue/disabled_batcher.go | 9 +-------- exporter/internal/queue/disabled_batcher_test.go | 2 ++ 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index de7796abe5b..9c06323eba5 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -73,12 +73,6 @@ func (qb *BaseBatcher) flushAsync(batchToFlush batch) { // Shutdown ensures that queue and all Batcher are stopped. func (qb *BaseBatcher) Shutdown(ctx context.Context) error { - // TODO: queue shutdown is done here to keep the behavior similar to queue consumer. - // However batcher should not be responsible for shutting down the queue. Move this up to - // queue sender once queue consumer is cleaned up. - if err := qb.queue.Shutdown(ctx); err != nil { - return err - } qb.stopWG.Wait() return nil } diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index 78fb9f8c19e..c5078885538 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -16,14 +16,7 @@ type DisabledBatcher struct { } // Start starts the goroutine that reads from the queue and flushes asynchronously. -func (qb *DisabledBatcher) Start(ctx context.Context, host component.Host) error { - // TODO: queue start is done here to keep the behavior similar to queue consumer. - // However batcher should not be responsible for starting or shuttting down the queue. Move this up to - // queue sender once queue consumer is cleaned up. - if err := qb.queue.Start(ctx, host); err != nil { - return err - } - +func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { // This goroutine reads and then flushes. // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. // 2. flushAsync() blocks until there are idle workers in the worker pool. diff --git a/exporter/internal/queue/disabled_batcher_test.go b/exporter/internal/queue/disabled_batcher_test.go index a90cfbbe5b2..2f1318b2c72 100644 --- a/exporter/internal/queue/disabled_batcher_test.go +++ b/exporter/internal/queue/disabled_batcher_test.go @@ -30,8 +30,10 @@ func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) { maxWorkers := 0 ba := NewBatcher(cfg, q, maxWorkers) + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { + require.NoError(t, q.Shutdown(context.Background())) require.NoError(t, ba.Shutdown(context.Background())) })