Skip to content

Commit

Permalink
[exporterhelper] Cleanup logging for export failures (open-telemetry#…
Browse files Browse the repository at this point in the history
…9282)

This change makes the logging of the exported error failures cleaner.
1. Ensure an error message is logged every time and only once when data
is dropped/rejected due to export failure.
2. Update the wording. Specifically, don't use "dropped" term when an
error is reported back to the pipeline. If there is no queue configured,
the exporter doesn't drop data by itself but rather rejects it. Keep the
"dropped" wording for failures after the enabled queue.
3. Properly report any error reported by a queue. For example, a
persistent storage error must be reported as a storage error, not as
"queue overflow".

Fixes
open-telemetry#9219
  • Loading branch information
dmitryax committed Jan 13, 2024
1 parent 72ba57a commit 55a8c6b
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 59 deletions.
27 changes: 27 additions & 0 deletions .chloggen/exporter-helper-cleanup-error-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporters

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Cleanup log messages for export failures

# One or more tracking issues or pull requests related to the change
issues: [9219]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
1. Ensure an error message is logged every time and only once when data is dropped/rejected due to export failure.
2. Update the wording. Specifically, don't use "dropped" term when an error is reported back to the pipeline.
Keep the "dropped" wording for failures happened after the enabled queue.
3. Properly report any error reported by a queue. For example, a persistent storage error must be reported as a storage error, not as "queue overflow".
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
40 changes: 16 additions & 24 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,6 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) {
b.nextSender = nextSender
}

type errorLoggingRequestSender struct {
baseRequestSender
logger *zap.Logger
message string
}

func (l *errorLoggingRequestSender) send(ctx context.Context, req Request) error {
err := l.baseRequestSender.send(ctx, req)
if err != nil {
l.logger.Error(l.message, zap.Int("dropped_items", req.ItemsCount()), zap.Error(err))
}
return err
}

type obsrepSenderFactory func(obsrep *ObsReport) requestSender

// Option apply changes to baseExporter.
Expand Down Expand Up @@ -86,10 +72,7 @@ func WithTimeout(timeoutSettings TimeoutSettings) Option {
func WithRetry(config configretry.BackOffConfig) Option {
return func(o *baseExporter) {
if !config.Enabled {
o.retrySender = &errorLoggingRequestSender{
logger: o.set.Logger,
message: "Exporting failed. Try enabling retry_on_failure config option to retry on retryable errors",
}
o.exportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors."
return
}
o.retrySender = newRetrySender(config, o.set)
Expand All @@ -105,13 +88,14 @@ func WithQueue(config QueueSettings) Option {
panic("queueing is not available for the new request exporters yet")
}
if !config.Enabled {
o.queueSender = &errorLoggingRequestSender{
logger: o.set.Logger,
message: "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.",
}
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return
}
o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
consumeErrHandler := func(err error, req Request) {
o.set.Logger.Error("Exporting failed. Dropping data."+o.exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler, consumeErrHandler)
}
}

Expand All @@ -137,6 +121,9 @@ type baseExporter struct {
set exporter.CreateSettings
obsrep *ObsReport

// Message for the user to be added with an export failure message.
exportFailureMessage string

// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
// Most of the senders are optional, and initialized with a no-op path-through sender.
Expand Down Expand Up @@ -182,7 +169,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req

// send sends the request using the first sender in the chain.
func (be *baseExporter) send(ctx context.Context, req Request) error {
return be.queueSender.send(ctx, req)
err := be.queueSender.send(ctx, req)
if err != nil {
be.set.Logger.Error("Exporting failed. Rejecting data."+be.exportFailureMessage,
zap.Error(err), zap.Int("rejected_items", req.ItemsCount()))
}
return err
}

// connectSenders connects the senders in the predefined order.
Expand Down
31 changes: 10 additions & 21 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig"
Expand Down Expand Up @@ -86,7 +85,7 @@ type queueSender struct {
}

func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType,
marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *queueSender {
marshaler RequestMarshaler, unmarshaler RequestUnmarshaler, consumeErrHandler func(error, Request)) *queueSender {

isPersistent := config.StorageID != nil
var queue internal.Queue[Request]
Expand Down Expand Up @@ -114,21 +113,15 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
}
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume)
return qs
}

// consume is the function that is executed by the queue consumers to send the data to the next consumerSender.
func (qs *queueSender) consume(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
if err != nil && !consumererror.IsPermanent(err) {
qs.logger.Error(
"Exporting failed. No more retries left. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
consumeFunc := func(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
if err != nil {
consumeErrHandler(err, req)
}
return err
}
return err
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, consumeFunc)
return qs
}

// Start is invoked during service startup.
Expand Down Expand Up @@ -210,11 +203,7 @@ func (qs *queueSender) send(ctx context.Context, req Request) error {

span := trace.SpanFromContext(c)
if err := qs.queue.Offer(c, req); err != nil {
qs.logger.Error(
"Dropping data because sending_queue is full. Try increasing queue_size.",
zap.Int("dropped_items", req.ItemsCount()),
)
span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute))
span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute))
return err
}

Expand Down
38 changes: 33 additions & 5 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -81,17 +83,23 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
require.Zero(t, be.queueSender.(*queueSender).queue.Size())
}

func TestQueuedRetry_DropOnFull(t *testing.T) {
func TestQueuedRetry_RejectOnFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.QueueSize = 0
qCfg.NumConsumers = 0
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithQueue(qCfg))
set := exportertest.NewNopCreateSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})
require.Error(t, be.send(context.Background(), newMockRequest(2, nil)))
assert.Len(t, observed.All(), 1)
assert.Equal(t, "Exporting failed. Rejecting data.", observed.All()[0].Message)
assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"])
}

func TestQueuedRetryHappyPath(t *testing.T) {
Expand Down Expand Up @@ -223,22 +231,42 @@ func TestQueueSettings_Validate(t *testing.T) {
func TestQueueRetryWithDisabledQueue(t *testing.T) {
qs := NewDefaultQueueSettings()
qs.Enabled = false
be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs))
require.IsType(t, &errorLoggingRequestSender{}, be.queueSender)
set := exportertest.NewNopCreateSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender,
WithQueue(qs))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
ocs := be.obsrepSender.(*observabilityConsumerSender)
mockR := newMockRequest(2, errors.New("some error"))
ocs.run(func() {
require.Error(t, be.send(context.Background(), mockR))
})
assert.Len(t, observed.All(), 1)
assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message)
ocs.awaitAsyncProcessing()
mockR.checkNumRequests(t, 1)
ocs.checkSendItemsCount(t, 0)
ocs.checkDroppedItemsCount(t, 2)
require.NoError(t, be.Shutdown(context.Background()))
}

func TestQueueFailedRequestDropped(t *testing.T) {
set := exportertest.NewNopCreateSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newNoopObsrepSender, WithQueue(NewDefaultQueueSettings()))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
mockR := newMockRequest(2, errors.New("some error"))
require.NoError(t, be.send(context.Background(), mockR))
require.NoError(t, be.Shutdown(context.Background()))
mockR.checkNumRequests(t, 1)
assert.Len(t, observed.All(), 1)
assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message)
}

func TestQueuedRetryPersistenceEnabled(t *testing.T) {
tt, err := componenttest.SetupTelemetry(defaultID)
require.NoError(t, err)
Expand Down Expand Up @@ -331,7 +359,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
}

func TestQueueSenderNoStartShutdown(t *testing.T) {
qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil)
qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil, nil)
assert.NoError(t, qs.Shutdown(context.Background()))
}

Expand Down
9 changes: 2 additions & 7 deletions exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,14 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {

// Immediately drop data on permanent errors.
if consumererror.IsPermanent(err) {
rs.logger.Error(
"Exporting failed. The error is not retryable. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
return err
return fmt.Errorf("not retryable error: %w", err)
}

req = extractPartialRequest(req, err)

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
return fmt.Errorf("max elapsed time expired %w", err)
return fmt.Errorf("no more retries left: %w", err)
}

throttleErr := throttleRetry{}
Expand Down
11 changes: 9 additions & 2 deletions exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/tag"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -239,15 +241,20 @@ func TestQueueRetryWithNoQueue(t *testing.T) {
func TestQueueRetryWithDisabledRetires(t *testing.T) {
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg))
require.IsType(t, &errorLoggingRequestSender{}, be.retrySender)
set := exportertest.NewNopCreateSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
ocs := be.obsrepSender.(*observabilityConsumerSender)
mockR := newMockRequest(2, errors.New("some error"))
ocs.run(func() {
require.Error(t, be.send(context.Background(), mockR))
})
assert.Len(t, observed.All(), 1)
assert.Equal(t, "Exporting failed. Rejecting data. "+
"Try enabling retry_on_failure config option to retry on retryable errors.", observed.All()[0].Message)
ocs.awaitAsyncProcessing()
mockR.checkNumRequests(t, 1)
ocs.checkSendItemsCount(t, 0)
Expand Down

0 comments on commit 55a8c6b

Please sign in to comment.