Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterhelper] Make the re-enqueue behavior configurable #8993

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .chloggen/introduce-reenque-option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make the re-enqueue behavior configurable.

# One or more tracking issues or pull requests related to the change
issues: [8987]

subtext: |
Instead of relying on the enabled `retry_on_failure` option, we now have a new option
`sending_queue::requeue_on_failure` to control the requeuing independently of the retry sender. This can be useful
for users who don't want the blocking exponential retry and just want to put the failed request back in the queue.
This option can also be enabled with the memory queue now, which means that the data will never be dropped after
getting to the queue as long as the collector is up and running.

IMPORTANT: Make sure to set `sending_queue::requeue_on_failure` to `true` if you use persistent queue with
`retry_on_failure` enabled to preserve the same behavior.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 0 additions & 9 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,6 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}
be.connectSenders()

// If retry sender is disabled then disable requeuing in the queue sender.
// TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender.
if qs, ok := be.queueSender.(*queueSender); ok {
// if it's not retrySender, then it is disabled.
if _, ok = be.retrySender.(*retrySender); !ok {
qs.requeuingEnabled = false
}
}

return be, nil
}

Expand Down
24 changes: 15 additions & 9 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ type QueueSettings struct {
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
// RequeueOnFailure indicates whether requests are requeued or dropped on non-permanent failures.
// Do not confuse this option with the retry_on_failure which consumes
// requests from the queue and apply blocking exponential backoff retry mechanism.
// This option is applied after all the retries configured with retry_on_failure are exhausted.
// If true, a request failed with a non-permanent error will be put back in the queue and
// retried after the current queue is drained.
// If false (default), all failed requests will be dropped.
RequeueOnFailure bool `mapstructure:"requeue_on_failure"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RequeueOnTransientFailure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requeue_on_transient_failure seems like a lot of typing. Also we have retry_on_failure, so we probably should stay consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reasons we want requeue_on_failure rather than drop_data_on_failure?

  • By default, users don't like data loss. If they miss out on this configuration they would have data loss unexpectedly.
  • Having drop_data_on_failure with default to false would make this change backward compatible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's the case for the persistent queue only. Memory queue has the opposite behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not even always applicable to the persistent queue. Only if retry_on_failure::enabled=true

Copy link
Member Author

@dmitryax dmitryax Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably do some transition period when this option is preset if persistent queue + retry_on_failure are enabled with a warning asking users to set this explicitly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm i see. so this is also aligning the behavior between memory/persistent queue. yeah having some WARN logs would probably be helpful in this transition time.

}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
Expand Down Expand Up @@ -89,22 +97,20 @@ type queueSender struct {
func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType,
marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *queueSender {

isPersistent := config.StorageID != nil
var queue internal.Queue[Request]
if isPersistent {
if config.StorageID != nil {
queue = internal.NewPersistentQueue[Request](
config.QueueSize, signal, *config.StorageID, marshaler, unmarshaler, set)
} else {
queue = internal.NewBoundedMemoryQueue[Request](config.QueueSize)
}
qs := &queueSender{
fullName: set.ID.String(),
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: isPersistent,
fullName: set.ID.String(),
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
requeuingEnabled: config.RequeueOnFailure,
}
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume)
return qs
Expand Down
12 changes: 7 additions & 5 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,13 @@ func TestQueueSettings_Validate(t *testing.T) {
// if requeueing is enabled, we eventually retry even if we failed at first
func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.RequeueOnFailure = true
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -253,16 +253,17 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
ocs.checkDroppedItemsCount(t, 4) // not actually dropped, but ocs counts each failed send here
}

// disabling retry sender should disable requeuing.
func TestQueuedRetry_RequeuingDisabled(t *testing.T) {
mockR := newMockRequest(2, errors.New("transient error"))

// use persistent storage as it expected to be used with requeuing unless the retry sender is disabled
// use persistent storage as it expected to be used with requeuing unless it's disabled
qCfg := NewDefaultQueueSettings()
qCfg.RequeueOnFailure = false

storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry

be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
Expand Down Expand Up @@ -291,6 +292,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 1
qCfg.RequeueOnFailure = true
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead

Expand All @@ -300,7 +302,6 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
Expand Down Expand Up @@ -397,6 +398,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.RequeueOnFailure = true
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown

Expand Down
Loading