From 55a8c6b6519c876674212eda53a09a10def40e03 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Fri, 12 Jan 2024 18:18:19 -0800 Subject: [PATCH] [exporterhelper] Cleanup logging for export failures (#9282) This change makes the logging of the exported error failures cleaner. 1. Ensure an error message is logged every time and only once when data is dropped/rejected due to export failure. 2. Update the wording. Specifically, don't use "dropped" term when an error is reported back to the pipeline. If there is no queue configured, the exporter doesn't drop data by itself but rather rejects it. Keep the "dropped" wording for failures after the enabled queue. 3. Properly report any error reported by a queue. For example, a persistent storage error must be reported as a storage error, not as "queue overflow". Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/9219 --- .../exporter-helper-cleanup-error-logs.yaml | 27 +++++++++++++ exporter/exporterhelper/common.go | 40 ++++++++----------- exporter/exporterhelper/queue_sender.go | 31 +++++--------- exporter/exporterhelper/queue_sender_test.go | 38 +++++++++++++++--- exporter/exporterhelper/retry_sender.go | 9 +---- exporter/exporterhelper/retry_sender_test.go | 11 ++++- 6 files changed, 97 insertions(+), 59 deletions(-) create mode 100755 .chloggen/exporter-helper-cleanup-error-logs.yaml diff --git a/.chloggen/exporter-helper-cleanup-error-logs.yaml b/.chloggen/exporter-helper-cleanup-error-logs.yaml new file mode 100755 index 00000000000..28ed9ea5bc0 --- /dev/null +++ b/.chloggen/exporter-helper-cleanup-error-logs.yaml @@ -0,0 +1,27 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporters + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Cleanup log messages for export failures + +# One or more tracking issues or pull requests related to the change +issues: [9219] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + 1. Ensure an error message is logged every time and only once when data is dropped/rejected due to export failure. + 2. Update the wording. Specifically, don't use "dropped" term when an error is reported back to the pipeline. + Keep the "dropped" wording for failures happened after the enabled queue. + 3. Properly report any error reported by a queue. For example, a persistent storage error must be reported as a storage error, not as "queue overflow". + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 2c5a4e96692..b5e7aa39a33 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -38,20 +38,6 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) { b.nextSender = nextSender } -type errorLoggingRequestSender struct { - baseRequestSender - logger *zap.Logger - message string -} - -func (l *errorLoggingRequestSender) send(ctx context.Context, req Request) error { - err := l.baseRequestSender.send(ctx, req) - if err != nil { - l.logger.Error(l.message, zap.Int("dropped_items", req.ItemsCount()), zap.Error(err)) - } - return err -} - type obsrepSenderFactory func(obsrep *ObsReport) requestSender // Option apply changes to baseExporter. @@ -86,10 +72,7 @@ func WithTimeout(timeoutSettings TimeoutSettings) Option { func WithRetry(config configretry.BackOffConfig) Option { return func(o *baseExporter) { if !config.Enabled { - o.retrySender = &errorLoggingRequestSender{ - logger: o.set.Logger, - message: "Exporting failed. Try enabling retry_on_failure config option to retry on retryable errors", - } + o.exportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors." return } o.retrySender = newRetrySender(config, o.set) @@ -105,13 +88,14 @@ func WithQueue(config QueueSettings) Option { panic("queueing is not available for the new request exporters yet") } if !config.Enabled { - o.queueSender = &errorLoggingRequestSender{ - logger: o.set.Logger, - message: "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", - } + o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." return } - o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler) + consumeErrHandler := func(err error, req Request) { + o.set.Logger.Error("Exporting failed. Dropping data."+o.exportFailureMessage, + zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + } + o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler, consumeErrHandler) } } @@ -137,6 +121,9 @@ type baseExporter struct { set exporter.CreateSettings obsrep *ObsReport + // Message for the user to be added with an export failure message. + exportFailureMessage string + // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. @@ -182,7 +169,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req // send sends the request using the first sender in the chain. func (be *baseExporter) send(ctx context.Context, req Request) error { - return be.queueSender.send(ctx, req) + err := be.queueSender.send(ctx, req) + if err != nil { + be.set.Logger.Error("Exporting failed. Rejecting data."+be.exportFailureMessage, + zap.Error(err), zap.Int("rejected_items", req.ItemsCount())) + } + return err } // connectSenders connects the senders in the predefined order. diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 423b7657e10..1ee3c1ad5d1 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -17,7 +17,6 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/internal/obsreportconfig" @@ -86,7 +85,7 @@ type queueSender struct { } func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType, - marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *queueSender { + marshaler RequestMarshaler, unmarshaler RequestUnmarshaler, consumeErrHandler func(error, Request)) *queueSender { isPersistent := config.StorageID != nil var queue internal.Queue[Request] @@ -114,21 +113,15 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), } - qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume) - return qs -} - -// consume is the function that is executed by the queue consumers to send the data to the next consumerSender. -func (qs *queueSender) consume(ctx context.Context, req Request) error { - err := qs.nextSender.send(ctx, req) - if err != nil && !consumererror.IsPermanent(err) { - qs.logger.Error( - "Exporting failed. No more retries left. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.ItemsCount()), - ) + consumeFunc := func(ctx context.Context, req Request) error { + err := qs.nextSender.send(ctx, req) + if err != nil { + consumeErrHandler(err, req) + } + return err } - return err + qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, consumeFunc) + return qs } // Start is invoked during service startup. @@ -210,11 +203,7 @@ func (qs *queueSender) send(ctx context.Context, req Request) error { span := trace.SpanFromContext(c) if err := qs.queue.Offer(c, req); err != nil { - qs.logger.Error( - "Dropping data because sending_queue is full. Try increasing queue_size.", - zap.Int("dropped_items", req.ItemsCount()), - ) - span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute)) + span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute)) return err } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 831098c3308..3f3b2fc8105 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -81,17 +83,23 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } -func TestQueuedRetry_DropOnFull(t *testing.T) { +func TestQueuedRetry_RejectOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 qCfg.NumConsumers = 0 - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithQueue(qCfg)) + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) require.Error(t, be.send(context.Background(), newMockRequest(2, nil))) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data.", observed.All()[0].Message) + assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"]) } func TestQueuedRetryHappyPath(t *testing.T) { @@ -223,8 +231,11 @@ func TestQueueSettings_Validate(t *testing.T) { func TestQueueRetryWithDisabledQueue(t *testing.T) { qs := NewDefaultQueueSettings() qs.Enabled = false - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) - require.IsType(t, &errorLoggingRequestSender{}, be.queueSender) + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, + WithQueue(qs)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -232,6 +243,8 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { ocs.run(func() { require.Error(t, be.send(context.Background(), mockR)) }) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 0) @@ -239,6 +252,21 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) } +func TestQueueFailedRequestDropped(t *testing.T) { + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newNoopObsrepSender, WithQueue(NewDefaultQueueSettings())) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + mockR := newMockRequest(2, errors.New("some error")) + require.NoError(t, be.send(context.Background(), mockR)) + require.NoError(t, be.Shutdown(context.Background())) + mockR.checkNumRequests(t, 1) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message) +} + func TestQueuedRetryPersistenceEnabled(t *testing.T) { tt, err := componenttest.SetupTelemetry(defaultID) require.NoError(t, err) @@ -331,7 +359,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { } func TestQueueSenderNoStartShutdown(t *testing.T) { - qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil) + qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil, nil) assert.NoError(t, qs.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 1de3a23c587..1bf24157898 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -93,19 +93,14 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { // Immediately drop data on permanent errors. if consumererror.IsPermanent(err) { - rs.logger.Error( - "Exporting failed. The error is not retryable. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.ItemsCount()), - ) - return err + return fmt.Errorf("not retryable error: %w", err) } req = extractPartialRequest(req, err) backoffDelay := expBackoff.NextBackOff() if backoffDelay == backoff.Stop { - return fmt.Errorf("max elapsed time expired %w", err) + return fmt.Errorf("no more retries left: %w", err) } throttleErr := throttleRetry{} diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index 99efd945794..06154d5fbc2 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -17,6 +17,8 @@ import ( "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricproducer" "go.opencensus.io/tag" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -239,8 +241,10 @@ func TestQueueRetryWithNoQueue(t *testing.T) { func TestQueueRetryWithDisabledRetires(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) - require.IsType(t, &errorLoggingRequestSender{}, be.retrySender) + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -248,6 +252,9 @@ func TestQueueRetryWithDisabledRetires(t *testing.T) { ocs.run(func() { require.Error(t, be.send(context.Background(), mockR)) }) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data. "+ + "Try enabling retry_on_failure config option to retry on retryable errors.", observed.All()[0].Message) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 0)