From ef4a5be399b10bfc70749a3616c20751897db76a Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Thu, 14 Dec 2023 20:41:02 -0800 Subject: [PATCH] [exporterhelper] Do not re-enqueue failed requests (#9090) The current re-enqueuing behavior is not obvious and cannot be configured. It takes place only for persistent queue and only if `retry_on_failure::enabled=true` even if `retry_on_failure` is a setting for a different backoff retry strategy. This change removes the re-enqueuing behavior in favor of the `retry_on_failure` option. Consider increasing `retry_on_failure::max_elapsed_time` to reduce chances of data loss. Resolves https://github.com/open-telemetry/opentelemetry-collector/issues/8382 --- .chloggen/disable-requeuing.yaml | 29 ++++ exporter/exporterhelper/common.go | 11 +- .../internal/bounded_memory_queue.go | 5 +- .../internal/bounded_memory_queue_test.go | 12 +- exporter/exporterhelper/internal/consumers.go | 4 +- exporter/exporterhelper/internal/err.go | 20 +++ .../internal/persistent_queue.go | 18 ++- .../internal/persistent_queue_test.go | 22 +-- exporter/exporterhelper/internal/queue.go | 3 +- exporter/exporterhelper/queue_sender.go | 51 ++----- exporter/exporterhelper/queue_sender_test.go | 126 +----------------- exporter/exporterhelper/retry_sender.go | 5 +- 12 files changed, 97 insertions(+), 209 deletions(-) create mode 100644 .chloggen/disable-requeuing.yaml create mode 100644 exporter/exporterhelper/internal/err.go diff --git a/.chloggen/disable-requeuing.yaml b/.chloggen/disable-requeuing.yaml new file mode 100644 index 00000000000..903d8ca8650 --- /dev/null +++ b/.chloggen/disable-requeuing.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporters/sending_queue + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Do not re-enqueue failed batches, rely on the retry_on_failure strategy instead. + +# One or more tracking issues or pull requests related to the change +issues: [8382] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The current re-enqueuing behavior is not obvious and cannot be configured. It takes place only for persistent queue + and only if `retry_on_failure::enabled=true` even if `retry_on_failure` is a setting for a different backoff retry + strategy. This change removes the re-enqueuing behavior. Consider increasing `retry_on_failure::max_elapsed_time` + to reduce chances of data loss or set it to 0 to keep retrying until requests succeed. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 69fb3771d85..bc840d754c6 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -176,15 +176,6 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req } be.connectSenders() - // If retry sender is disabled then disable requeuing in the queue sender. - // TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender. - if qs, ok := be.queueSender.(*queueSender); ok { - // if it's not retrySender, then it is disabled. - if _, ok = be.retrySender.(*retrySender); !ok { - qs.requeuingEnabled = false - } - } - return be, nil } @@ -212,7 +203,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { func (be *baseExporter) Shutdown(ctx context.Context) error { return multierr.Combine( - // First shutdown the retry sender, so it can push any pending requests to back the queue. + // First shutdown the retry sender, so the queue sender can flush the queue without retries. be.retrySender.Shutdown(ctx), // Then shutdown the queue sender. be.queueSender.Shutdown(ctx), diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index a643f8e6c61..86af52896b0 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -40,12 +40,13 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. -func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) bool) bool { +func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { item, ok := <-q.items if !ok { return false } - consumeFunc(item.ctx, item.req) + // the memory queue doesn't handle consume errors + _ = consumeFunc(item.ctx, item.req) return true } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index ef2cc4aa876..b9b689d704e 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -30,10 +30,10 @@ func TestBoundedQueue(t *testing.T) { consumerState := newConsumerState(t) - consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) bool { + consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) error { consumerState.record(item) <-waitCh - return true + return nil }) assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost())) @@ -89,10 +89,10 @@ func TestShutdownWhileNotEmpty(t *testing.T) { consumerState := newConsumerState(t) waitChan := make(chan struct{}) - consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) bool { + consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) error { <-waitChan consumerState.record(item) - return true + return nil }) assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost())) @@ -176,9 +176,9 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) b.ReportAllocs() for i := 0; i < b.N; i++ { q := NewBoundedMemoryQueue[string](capacity) - consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) bool { + consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error { time.Sleep(1 * time.Millisecond) - return true + return nil }) require.NoError(b, consumers.Start(context.Background(), componenttest.NewNopHost())) for j := 0; j < numberOfItems; j++ { diff --git a/exporter/exporterhelper/internal/consumers.go b/exporter/exporterhelper/internal/consumers.go index 91f6a244115..3a5f42a63c0 100644 --- a/exporter/exporterhelper/internal/consumers.go +++ b/exporter/exporterhelper/internal/consumers.go @@ -13,11 +13,11 @@ import ( type QueueConsumers[T any] struct { queue Queue[T] numConsumers int - consumeFunc func(context.Context, T) bool + consumeFunc func(context.Context, T) error stopWG sync.WaitGroup } -func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) bool) *QueueConsumers[T] { +func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *QueueConsumers[T] { return &QueueConsumers[T]{ queue: q, numConsumers: numConsumers, diff --git a/exporter/exporterhelper/internal/err.go b/exporter/exporterhelper/internal/err.go new file mode 100644 index 00000000000..c93bd92f556 --- /dev/null +++ b/exporter/exporterhelper/internal/err.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +type shutdownErr struct { + err error +} + +func NewShutdownErr(err error) error { + return shutdownErr{err: err} +} + +func (s shutdownErr) Error() string { + return "interrupted due to shutdown: " + s.err.Error() +} + +func (s shutdownErr) Unwrap() error { + return s.err +} diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index f58016b5853..c635f7fa052 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -141,10 +141,10 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. -func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) bool) bool { +func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { var ( req T - onProcessingFinished func() + onProcessingFinished func(error) consumed bool ) @@ -157,9 +157,7 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) bool) } if consumed { - if ok := consumeFunc(context.Background(), req); ok { - onProcessingFinished() - } + onProcessingFinished(consumeFunc(context.Background(), req)) return true } } @@ -241,7 +239,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { // getNextItem pulls the next available item from the persistent storage along with a callback function that should be // called after the item is processed to clean up the storage. If no new item is available, returns false. -func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(), bool) { +func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), bool) { pq.mu.Lock() defer pq.mu.Unlock() @@ -282,7 +280,13 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(), bool) // Increase the reference count, so the client is not closed while the request is being processed. pq.refClient++ - return request, func() { + return request, func(consumeErr error) { + if errors.As(consumeErr, &shutdownErr{}) { + // The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart. + // TODO: Handle partially delivered requests by updating their values in the storage. + return + } + // Delete the item from the persistent storage after it was processed. pq.mu.Lock() defer pq.mu.Unlock() diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 75d2a8f7ad7..6b0d4710f60 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -39,7 +39,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { } // createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers. -func createAndStartTestPersistentQueue(t *testing.T, capacity, numConsumers int, consumeFunc func(_ context.Context, item ptrace.Traces) bool) Queue[ptrace.Traces] { +func createAndStartTestPersistentQueue(t *testing.T, capacity, numConsumers int, consumeFunc func(_ context.Context, item ptrace.Traces) error) Queue[ptrace.Traces] { pq := NewPersistentQueue[ptrace.Traces](capacity, component.DataTypeTraces, component.ID{}, marshaler.MarshalTraces, unmarshaler.UnmarshalTraces, exportertest.NewNopCreateSettings()) host := &mockHost{ext: map[component.ID]component.Component{ @@ -73,10 +73,10 @@ func createTestPersistentQueue(client storage.Client) *persistentQueue[ptrace.Tr func TestPersistentQueue_FullCapacity(t *testing.T) { start := make(chan struct{}) done := make(chan struct{}) - pq := createAndStartTestPersistentQueue(t, 5, 1, func(context.Context, ptrace.Traces) bool { + pq := createAndStartTestPersistentQueue(t, 5, 1, func(context.Context, ptrace.Traces) error { <-start <-done - return true + return nil }) assert.Equal(t, 0, pq.Size()) @@ -100,7 +100,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { } func TestPersistentQueue_Shutdown(t *testing.T) { - pq := createAndStartTestPersistentQueue(t, 1001, 100, func(context.Context, ptrace.Traces) bool { return true }) + pq := createAndStartTestPersistentQueue(t, 1001, 100, func(context.Context, ptrace.Traces) error { return nil }) req := newTraces(1, 10) for i := 0; i < 1000; i++ { @@ -140,9 +140,9 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { req := newTraces(1, 10) numMessagesConsumed := &atomic.Int32{} - pq := createAndStartTestPersistentQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) bool { + pq := createAndStartTestPersistentQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) error { numMessagesConsumed.Add(int32(1)) - return true + return nil }) for i := 0; i < c.numMessagesProduced; i++ { @@ -401,7 +401,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1}) // Lets mark item 1 as finished, it will remove it from the currently dispatched items list. - onProcessingFinished() + onProcessingFinished(nil) requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0}) // Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end. @@ -415,7 +415,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { r, onProcessingFinished, found := newPs.getNextItem(context.Background()) require.True(t, found) assert.Equal(t, req, r) - onProcessingFinished() + onProcessingFinished(nil) } // The queue should be now empty @@ -524,7 +524,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) { } for i := 0; i < bb.N; i++ { - require.True(bb, ps.Consume(func(context.Context, ptrace.Traces) bool { return true })) + require.True(bb, ps.Consume(func(context.Context, ptrace.Traces) error { return nil })) } require.NoError(b, ext.Shutdown(context.Background())) }) @@ -603,7 +603,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { assert.False(t, client.(*mockStorageClient).isClosed()) assert.NoError(t, ps.Shutdown(context.Background())) assert.False(t, client.(*mockStorageClient).isClosed()) - onProcessingFinished() + onProcessingFinished(nil) assert.True(t, client.(*mockStorageClient).isClosed()) } @@ -643,7 +643,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) { // Subsequent items succeed, as deleting the first item frees enough space for the state update reqCount-- for i := reqCount; i > 0; i-- { - require.True(t, ps.Consume(func(context.Context, ptrace.Traces) bool { return true })) + require.True(t, ps.Consume(func(context.Context, ptrace.Traces) error { return nil })) } // We should be able to put a new item in diff --git a/exporter/exporterhelper/internal/queue.go b/exporter/exporterhelper/internal/queue.go index 535db30eff2..dab175f3658 100644 --- a/exporter/exporterhelper/internal/queue.go +++ b/exporter/exporterhelper/internal/queue.go @@ -30,8 +30,7 @@ type Queue[T any] interface { // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. - // The provided callback function returns true if the item was consumed or false if the consumer is stopped. - Consume(func(ctx context.Context, item T) bool) bool + Consume(func(ctx context.Context, item T) error) bool // Size returns the current Size of the queue Size() int // Capacity returns the capacity of the queue. diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 865ff730253..9f5794fc2f1 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "sync/atomic" "time" "go.opencensus.io/metric/metricdata" @@ -75,14 +74,12 @@ func (qCfg *QueueSettings) Validate() error { type queueSender struct { baseRequestSender - fullName string - queue internal.Queue[Request] - traceAttribute attribute.KeyValue - logger *zap.Logger - meter otelmetric.Meter - consumers *internal.QueueConsumers[Request] - requeuingEnabled bool - stopped *atomic.Bool + fullName string + queue internal.Queue[Request] + traceAttribute attribute.KeyValue + logger *zap.Logger + meter otelmetric.Meter + consumers *internal.QueueConsumers[Request] metricCapacity otelmetric.Int64ObservableGauge metricSize otelmetric.Int64ObservableGauge @@ -105,50 +102,22 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), - // TODO: this can be further exposed as a config param rather than relying on a type of queue - requeuingEnabled: isPersistent, - stopped: &atomic.Bool{}, } qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume) return qs } // consume is the function that is executed by the queue consumers to send the data to the next consumerSender. -func (qs *queueSender) consume(ctx context.Context, req Request) bool { +func (qs *queueSender) consume(ctx context.Context, req Request) error { err := qs.nextSender.send(ctx, req) - - // Nothing to do if the error is nil or permanent. Permanent errors are already logged by retrySender. - if err == nil || consumererror.IsPermanent(err) { - return true - } - - // Do not requeue if the queue sender is stopped. - if qs.stopped.Load() { - return false - } - - if !qs.requeuingEnabled { + if err != nil && !consumererror.IsPermanent(err) { qs.logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), zap.Int("dropped_items", req.ItemsCount()), ) - return true - } - - if qs.queue.Offer(ctx, extractPartialRequest(req, err)) == nil { - qs.logger.Error( - "Exporting failed. Putting back to the end of the queue.", - zap.Error(err), - ) - } else { - qs.logger.Error( - "Exporting failed. Queue did not accept requeuing request. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.ItemsCount()), - ) } - return true + return err } // Start is invoked during service startup. @@ -212,8 +181,6 @@ func (qs *queueSender) recordWithOC() error { // Shutdown is invoked during service shutdown. func (qs *queueSender) Shutdown(ctx context.Context) error { - qs.stopped.Store(true) - // Cleanup queue metrics reporting _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 3fb8e47af33..cdac341ec1f 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -11,8 +11,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -222,112 +220,6 @@ func TestQueueSettings_Validate(t *testing.T) { assert.NoError(t, qCfg.Validate()) } -// if requeueing is enabled, we eventually retry even if we failed at first -func TestQueuedRetry_RequeuingEnabled(t *testing.T) { - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 1 - rCfg := NewDefaultRetrySettings() - rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - be.queueSender.(*queueSender).requeuingEnabled = true - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - mockR := newMockRequest(4, errors.New("transient error")) - ocs.run(func() { - ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) - // ensure that only 1 item was sent which correspond to items count in the error returned by mockRequest.OnError() - ocs.checkSendItemsCount(t, 1) - ocs.checkDroppedItemsCount(t, 4) // not actually dropped, but ocs counts each failed send here -} - -// disabling retry sender should disable requeuing. -func TestQueuedRetry_RequeuingDisabled(t *testing.T) { - mockR := newMockRequest(2, errors.New("transient error")) - - // use persistent storage as it expected to be used with requeuing unless the retry sender is disabled - qCfg := NewDefaultQueueSettings() - storageID := component.NewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID // enable persistence - rCfg := NewDefaultRetrySettings() - rCfg.Enabled = false - - be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - - var extensions = map[component.ID]component.Component{ - storageID: internal.NewMockStorageExtension(nil), - } - host := &mockHost{ext: extensions} - require.NoError(t, be.Start(context.Background(), host)) - - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // one failed request, no retries, two items dropped. - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) -} - -// if requeueing is enabled, but the queue is full, we get an error -func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 1 - qCfg.QueueSize = 1 - rCfg := NewDefaultRetrySettings() - rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - - set := exportertest.NewNopCreateSettings() - logger, observedLogs := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - be.queueSender.(*queueSender).requeuingEnabled = true - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - // send a request that will fail after waitReq1 is unblocked - waitReq1 := make(chan struct{}) - req1 := newMockExportRequest(func(ctx context.Context) error { - waitReq1 <- struct{}{} - return errors.New("some error") - }) - require.NoError(t, be.queueSender.send(context.Background(), req1)) - - // send another request to fill the queue - req2 := newMockRequest(1, nil) - require.NoError(t, be.queueSender.send(context.Background(), req2)) - - <-waitReq1 - - // req1 cannot be put back to the queue and should be dropped, check the log message - assert.Eventually(t, func() bool { - return observedLogs.FilterMessageSnippet("Queue did not accept requeuing request. Dropping data.").Len() == 1 - }, time.Second, 1*time.Millisecond) - - // req2 should be sent out after that - req2.checkNumRequests(t, 1) -} - func TestQueueRetryWithDisabledQueue(t *testing.T) { qs := NewDefaultQueueSettings() qs.Enabled = false @@ -393,7 +285,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { require.Error(t, be.Start(context.Background(), host), "could not get storage client") } -func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { +func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 storageID := component.NewIDWithName("file_storage", "storage") @@ -450,19 +342,3 @@ type mockHost struct { func (nh *mockHost) GetExtensions() map[component.ID]component.Component { return nh.ext } - -type mockExportRequest struct { - exportFunc func(context.Context) error -} - -func newMockExportRequest(exportFunc func(context.Context) error) *mockExportRequest { - return &mockExportRequest{exportFunc: exportFunc} -} - -func (m *mockExportRequest) ItemsCount() int { - return 1 -} - -func (m *mockExportRequest) Export(ctx context.Context) error { - return m.exportFunc(ctx) -} diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index e437d50df0b..6ce76ccb8de 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -35,7 +36,7 @@ type RetrySettings struct { // consecutive retries will always be `MaxInterval`. MaxInterval time.Duration `mapstructure:"max_interval"` // MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. - // Once this value is reached, the data is discarded. + // Once this value is reached, the data is discarded. If set to 0, the retries are never stopped. MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"` } @@ -185,7 +186,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { case <-ctx.Done(): return fmt.Errorf("request is cancelled or timed out %w", err) case <-rs.stopCh: - return fmt.Errorf("interrupted due to shutdown %w", err) + return internal.NewShutdownErr(err) case <-time.After(backoffDelay): } }