From 4658926d0c065141b8749e9e366833ab4b6fd980 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Sun, 26 Nov 2023 18:52:05 -0800 Subject: [PATCH] [exporterhelper] Make the re-enqueue behavior configurable Instead of relying or enabled `retry_on_failure` option, we now have a new option `sending_queue::requeue_on_failure` to control the requeuing independently of the retry sender. This can be useful for users who don't want the blocking exponential retry, just want to put the failed request back to the queue. This option can also be enabled with memory queue now, which means that the data will never be dropped after getting to the queue as long as the collector is up and running. --- .chloggen/introduce-reenque-option.yaml | 30 ++++++++++++++++++++ exporter/exporterhelper/common.go | 9 ------ exporter/exporterhelper/queue_sender.go | 28 +++++++++++------- exporter/exporterhelper/queue_sender_test.go | 12 ++++---- 4 files changed, 54 insertions(+), 25 deletions(-) create mode 100755 .chloggen/introduce-reenque-option.yaml diff --git a/.chloggen/introduce-reenque-option.yaml b/.chloggen/introduce-reenque-option.yaml new file mode 100755 index 00000000000..42f775cc2ed --- /dev/null +++ b/.chloggen/introduce-reenque-option.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make the re-enqueue behavior configurable. + +# One or more tracking issues or pull requests related to the change +issues: [8987] + +subtext: | + Instead of relying on the enabled `retry_on_failure` option, we now have a new option + `sending_queue::requeue_on_failure` to control the requeuing independently of the retry sender. This can be useful + for users who don't want the blocking exponential retry and just want to put the failed request back in the queue. + This option can also be enabled with the memory queue now, which means that the data will never be dropped after + getting to the queue as long as the collector is up and running. + + IMPORTANT: Make sure to set `sending_queue::requeue_on_failure` to `true` if you use persistent queue with + `retry_on_failure` enabled to preserve the same behavior. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 69fb3771d85..0d46945ce6f 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -176,15 +176,6 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req } be.connectSenders() - // If retry sender is disabled then disable requeuing in the queue sender. - // TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender. - if qs, ok := be.queueSender.(*queueSender); ok { - // if it's not retrySender, then it is disabled. - if _, ok = be.retrySender.(*retrySender); !ok { - qs.requeuingEnabled = false - } - } - return be, nil } diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 2f86c475611..29119aa9260 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -42,6 +42,14 @@ type QueueSettings struct { // StorageID if not empty, enables the persistent storage and uses the component specified // as a storage extension for the persistent queue StorageID *component.ID `mapstructure:"storage"` + // RequeueOnFailure indicates whether requests are requeued or dropped on non-permanent failures. + // Do not confuse this option with the retry_on_failure which consumes + // requests from the queue and apply blocking exponential backoff retry mechanism. + // This option is applied after all the retries configured with retry_on_failure are exhausted. + // If true, a request failed with a non-permanent error will be put back in the queue and + // retried after the current queue is drained. + // If false (default), all failed requests will be dropped. + RequeueOnFailure bool `mapstructure:"requeue_on_failure"` } // NewDefaultQueueSettings returns the default settings for QueueSettings. @@ -92,24 +100,22 @@ type queueSender struct { func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *queueSender { - isPersistent := config.StorageID != nil var queue internal.Queue[Request] - if isPersistent { + if config.StorageID != nil { queue = internal.NewPersistentQueue[Request]( config.QueueSize, signal, *config.StorageID, marshaler, unmarshaler, set) } else { queue = internal.NewBoundedMemoryQueue[Request](config.QueueSize) } qs := &queueSender{ - fullName: set.ID.String(), - signal: signal, - queue: queue, - traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), - logger: set.TelemetrySettings.Logger, - meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), - stopWG: sync.WaitGroup{}, - // TODO: this can be further exposed as a config param rather than relying on a type of queue - requeuingEnabled: isPersistent, + fullName: set.ID.String(), + signal: signal, + queue: queue, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + logger: set.TelemetrySettings.Logger, + meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), + stopWG: sync.WaitGroup{}, + requeuingEnabled: config.RequeueOnFailure, } qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume) return qs diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 75a30226e25..ac07e2f6023 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -226,13 +226,13 @@ func TestQueueSettings_Validate(t *testing.T) { // if requeueing is enabled, we eventually retry even if we failed at first func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg := NewDefaultQueueSettings() + qCfg.RequeueOnFailure = true qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) - be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -253,16 +253,17 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { ocs.checkDroppedItemsCount(t, 4) // not actually dropped, but ocs counts each failed send here } -// disabling retry sender should disable requeuing. func TestQueuedRetry_RequeuingDisabled(t *testing.T) { mockR := newMockRequest(2, errors.New("transient error")) - // use persistent storage as it expected to be used with requeuing unless the retry sender is disabled + // use persistent storage as it expected to be used with requeuing unless it's disabled qCfg := NewDefaultQueueSettings() + qCfg.RequeueOnFailure = false + storageID := component.NewIDWithName("file_storage", "storage") qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() - rCfg.Enabled = false + rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -291,6 +292,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 qCfg.QueueSize = 1 + qCfg.RequeueOnFailure = true rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead @@ -300,7 +302,6 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -397,6 +398,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 + qCfg.RequeueOnFailure = true storageID := component.NewIDWithName("file_storage", "storage") qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown