Skip to content

Commit

Permalink
[exporterhelper] Make the re-enqueue behavior configurable
Browse files Browse the repository at this point in the history
Instead of relying or enabled `retry_on_failure` option, we now have a new option `sending_queue::requeue_on_failure` to control the requeuing independently of the retry sender. This can be useful for users who don't want the blocking exponential retry, just want to put the failed request back to the queue. This option can also be enabled with memory queue now, which means that the data will never be dropped after getting to the queue as long as the collector is up and running.
  • Loading branch information
dmitryax committed Nov 27, 2023
1 parent 575c5f5 commit 4658926
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 25 deletions.
30 changes: 30 additions & 0 deletions .chloggen/introduce-reenque-option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make the re-enqueue behavior configurable.

# One or more tracking issues or pull requests related to the change
issues: [8987]

subtext: |
Instead of relying on the enabled `retry_on_failure` option, we now have a new option
`sending_queue::requeue_on_failure` to control the requeuing independently of the retry sender. This can be useful
for users who don't want the blocking exponential retry and just want to put the failed request back in the queue.
This option can also be enabled with the memory queue now, which means that the data will never be dropped after
getting to the queue as long as the collector is up and running.
IMPORTANT: Make sure to set `sending_queue::requeue_on_failure` to `true` if you use persistent queue with
`retry_on_failure` enabled to preserve the same behavior.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 0 additions & 9 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,6 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}
be.connectSenders()

// If retry sender is disabled then disable requeuing in the queue sender.
// TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender.
if qs, ok := be.queueSender.(*queueSender); ok {
// if it's not retrySender, then it is disabled.
if _, ok = be.retrySender.(*retrySender); !ok {
qs.requeuingEnabled = false
}
}

return be, nil
}

Expand Down
28 changes: 17 additions & 11 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ type QueueSettings struct {
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
// RequeueOnFailure indicates whether requests are requeued or dropped on non-permanent failures.
// Do not confuse this option with the retry_on_failure which consumes
// requests from the queue and apply blocking exponential backoff retry mechanism.
// This option is applied after all the retries configured with retry_on_failure are exhausted.
// If true, a request failed with a non-permanent error will be put back in the queue and
// retried after the current queue is drained.
// If false (default), all failed requests will be dropped.
RequeueOnFailure bool `mapstructure:"requeue_on_failure"`
}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
Expand Down Expand Up @@ -92,24 +100,22 @@ type queueSender struct {
func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType,
marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *queueSender {

isPersistent := config.StorageID != nil
var queue internal.Queue[Request]
if isPersistent {
if config.StorageID != nil {
queue = internal.NewPersistentQueue[Request](
config.QueueSize, signal, *config.StorageID, marshaler, unmarshaler, set)
} else {
queue = internal.NewBoundedMemoryQueue[Request](config.QueueSize)
}
qs := &queueSender{
fullName: set.ID.String(),
signal: signal,
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
stopWG: sync.WaitGroup{},
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: isPersistent,
fullName: set.ID.String(),
signal: signal,
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
stopWG: sync.WaitGroup{},
requeuingEnabled: config.RequeueOnFailure,
}
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume)
return qs
Expand Down
12 changes: 7 additions & 5 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,13 @@ func TestQueueSettings_Validate(t *testing.T) {
// if requeueing is enabled, we eventually retry even if we failed at first
func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.RequeueOnFailure = true
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -253,16 +253,17 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
ocs.checkDroppedItemsCount(t, 4) // not actually dropped, but ocs counts each failed send here
}

// disabling retry sender should disable requeuing.
func TestQueuedRetry_RequeuingDisabled(t *testing.T) {
mockR := newMockRequest(2, errors.New("transient error"))

// use persistent storage as it expected to be used with requeuing unless the retry sender is disabled
// use persistent storage as it expected to be used with requeuing unless it's disabled
qCfg := NewDefaultQueueSettings()
qCfg.RequeueOnFailure = false

storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry

be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
Expand Down Expand Up @@ -291,6 +292,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 1
qCfg.RequeueOnFailure = true
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead

Expand All @@ -300,7 +302,6 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
Expand Down Expand Up @@ -397,6 +398,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.RequeueOnFailure = true
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown

Expand Down

0 comments on commit 4658926

Please sign in to comment.