diff --git a/.chloggen/introduce-reenque-option.yaml b/.chloggen/introduce-reenque-option.yaml new file mode 100755 index 00000000000..77966cba9e0 --- /dev/null +++ b/.chloggen/introduce-reenque-option.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make the re-enqueue behavior configurable. + +# One or more tracking issues or pull requests related to the change +issues: [8987] + +subtext: | + Instead of relying on the enabled `retry_on_failure` option, we now have a new option + `sending_queue::requeue_on_failure` to control the requeuing independently of the retry sender. This can be useful + for users who don't want the blocking exponential retry and just want to put the failed request back in the queue. + This option can also be enabled with the memory queue now, which means that the data will never be dropped after + getting to the queue as long as the collector is up and running. + + IMPORTANT: Make sure to set `sending_queue::requeue_on_failure` to `true` if you use persistent queue with + `retry_on_failure` enabled to preserve the same behavior. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 69fb3771d85..0d46945ce6f 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -176,15 +176,6 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req } be.connectSenders() - // If retry sender is disabled then disable requeuing in the queue sender. - // TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender. - if qs, ok := be.queueSender.(*queueSender); ok { - // if it's not retrySender, then it is disabled. - if _, ok = be.retrySender.(*retrySender); !ok { - qs.requeuingEnabled = false - } - } - return be, nil } diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index c8e554fce56..d9224fc7cec 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -41,6 +41,14 @@ type QueueSettings struct { // StorageID if not empty, enables the persistent storage and uses the component specified // as a storage extension for the persistent queue StorageID *component.ID `mapstructure:"storage"` + // RequeueOnFailure indicates whether requests are requeued or dropped on non-permanent failures. + // Do not confuse this option with the retry_on_failure which consumes + // requests from the queue and apply blocking exponential backoff retry mechanism. + // This option is applied after all the retries configured with retry_on_failure are exhausted. + // If true, a request failed with a non-permanent error will be put back in the queue and + // retried after the current queue is drained. + // If false (default), all failed requests will be dropped. + RequeueOnFailure bool `mapstructure:"requeue_on_failure"` } // NewDefaultQueueSettings returns the default settings for QueueSettings. @@ -89,22 +97,20 @@ type queueSender struct { func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *queueSender { - isPersistent := config.StorageID != nil var queue internal.Queue[Request] - if isPersistent { + if config.StorageID != nil { queue = internal.NewPersistentQueue[Request]( config.QueueSize, signal, *config.StorageID, marshaler, unmarshaler, set) } else { queue = internal.NewBoundedMemoryQueue[Request](config.QueueSize) } qs := &queueSender{ - fullName: set.ID.String(), - queue: queue, - traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), - logger: set.TelemetrySettings.Logger, - meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), - // TODO: this can be further exposed as a config param rather than relying on a type of queue - requeuingEnabled: isPersistent, + fullName: set.ID.String(), + queue: queue, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + logger: set.TelemetrySettings.Logger, + meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), + requeuingEnabled: config.RequeueOnFailure, } qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume) return qs diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 75a30226e25..ac07e2f6023 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -226,13 +226,13 @@ func TestQueueSettings_Validate(t *testing.T) { // if requeueing is enabled, we eventually retry even if we failed at first func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg := NewDefaultQueueSettings() + qCfg.RequeueOnFailure = true qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) - be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -253,16 +253,17 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { ocs.checkDroppedItemsCount(t, 4) // not actually dropped, but ocs counts each failed send here } -// disabling retry sender should disable requeuing. func TestQueuedRetry_RequeuingDisabled(t *testing.T) { mockR := newMockRequest(2, errors.New("transient error")) - // use persistent storage as it expected to be used with requeuing unless the retry sender is disabled + // use persistent storage as it expected to be used with requeuing unless it's disabled qCfg := NewDefaultQueueSettings() + qCfg.RequeueOnFailure = false + storageID := component.NewIDWithName("file_storage", "storage") qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() - rCfg.Enabled = false + rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -291,6 +292,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 qCfg.QueueSize = 1 + qCfg.RequeueOnFailure = true rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead @@ -300,7 +302,6 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -397,6 +398,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 + qCfg.RequeueOnFailure = true storageID := component.NewIDWithName("file_storage", "storage") qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown