Skip to content

Commit

Permalink
[exporterhelper] Make the re-enqueue behavior configurable
Browse files Browse the repository at this point in the history
Instead of relying or enabled `retry_on_failure` option, we now have a new option `sending_queue::requeue_on_failure` to control the requeuing independently of the retry sender. This can be useful for users who don't want the blocking exponential retry, just want to put the failed request back to the queue. This option can also be enabled with memory queue now, which means that the data will never be dropped after getting to the queue as long as the collector is up and running.
  • Loading branch information
dmitryax committed Nov 27, 2023
1 parent 0ae738f commit 5fe217b
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 24 deletions.
30 changes: 30 additions & 0 deletions .chloggen/introduce-reenque-option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make the re-enqueue behavior configurable.

# One or more tracking issues or pull requests related to the change
issues: [8987]

subtext: |
Instead of relying or enabled `retry_on_failure` option, we now have a new option `sending_queue::requeue_on_failure`
to control the requeuing independently of the retry sender. This can be useful for users who don't want the blocking
exponential retry, just want to put the failed request back to the queue. This option can also be enabled with memory
queue now, which means that the data will never be dropped after getting to the queue as long as the collector is up
and running.
IMPORTANT: Make sure to set `sending_queue::requeue_on_failure` to `true` if you use persistent queue with
`retry_on_failure` enabled to preserve the same behavior.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 0 additions & 9 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,6 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}
be.connectSenders()

// If retry sender is disabled then disable requeuing in the queue sender.
// TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender.
if qs, ok := be.queueSender.(*queueSender); ok {
// if it's not retrySender, then it is disabled.
if _, ok = be.retrySender.(*retrySender); !ok {
qs.requeuingEnabled = false
}
}

return be, nil
}

Expand Down
28 changes: 18 additions & 10 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ type QueueSettings struct {
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
// RequeueOnFailure indicates whether requests are requeued or dropped on non-permanent failures.
// If true, a request failed with a non-permanent error will be put back to the queue and
// retried after the current queue is drained.
// If false (default), request will be dropped after any failed send operation.
// Setting this option to true means that the request will never leave the exporter queue until it's
// successfully exported or failed with permanent error.
// Do not confuse this option with the `retry_on_failure, which consumes
// requests from the queue and apply blocking exponential backoff retry mechanism.
RequeueOnFailure bool `mapstructure:"requeue_on_failure"`
}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
Expand Down Expand Up @@ -102,16 +111,15 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
queue = internal.NewBoundedMemoryQueue[Request](config.QueueSize)
}
return &queueSender{
fullName: set.ID.String(),
signal: signal,
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
numConsumers: config.NumConsumers,
stopWG: sync.WaitGroup{},
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: isPersistent,
fullName: set.ID.String(),
signal: signal,
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
numConsumers: config.NumConsumers,
stopWG: sync.WaitGroup{},
requeuingEnabled: config.RequeueOnFailure,
}
}

Expand Down
12 changes: 7 additions & 5 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,13 @@ func TestQueueSettings_Validate(t *testing.T) {
// if requeueing is enabled, we eventually retry even if we failed at first
func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.RequeueOnFailure = true
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -253,16 +253,17 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
ocs.checkDroppedItemsCount(t, 4) // not actually dropped, but ocs counts each failed send here
}

// disabling retry sender should disable requeuing.
func TestQueuedRetry_RequeuingDisabled(t *testing.T) {
mockR := newMockRequest(2, errors.New("transient error"))

// use persistent storage as it expected to be used with requeuing unless the retry sender is disabled
// use persistent storage as it expected to be used with requeuing unless it's disabled
qCfg := NewDefaultQueueSettings()
qCfg.RequeueOnFailure = false

storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry

be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
Expand Down Expand Up @@ -291,6 +292,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 1
qCfg.RequeueOnFailure = true
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead

Expand All @@ -300,7 +302,6 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
Expand Down Expand Up @@ -397,6 +398,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.RequeueOnFailure = true
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown

Expand Down

0 comments on commit 5fe217b

Please sign in to comment.