From 3c572da960e3c2bd4f9764e10c87f6527699c022 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Mon, 12 Jun 2023 00:06:10 -0700 Subject: [PATCH] [exporterhelper] New exporter helper with costom requests Introduce a new exporter helper that operates over client provided requests instead of pdata. It opens a door for moving batching to the exporter where batches will be built from clients data format, instead of pdata. The batches can be properly sized by custom request size which can be different from OTLP. The same custom request sizing will be applied to the sending queue. It will also improve performance of the sending queue retries for non-OTLP exporters, they don't need to translate pdata on every retry. This is an experimental API, once stabilized it's intended to replace the existing helpers. --- exporter/exporterhelper/common.go | 51 +++++++++-- exporter/exporterhelper/common_test.go | 8 +- exporter/exporterhelper/constants.go | 8 ++ .../internal/persistent_queue.go | 21 ++++- .../internal/persistent_queue_test.go | 10 ++- .../internal/persistent_storage.go | 16 ++-- .../internal/persistent_storage_batch.go | 6 +- .../internal/persistent_storage_test.go | 15 +++- exporter/exporterhelper/internal/request.go | 10 +-- exporter/exporterhelper/logs.go | 75 +++++++++++++++- exporter/exporterhelper/metrics.go | 75 +++++++++++++++- exporter/exporterhelper/metrics_test.go | 56 +++++++++++- exporter/exporterhelper/queued_retry.go | 77 ++++++++-------- exporter/exporterhelper/queued_retry_test.go | 53 ++++++----- exporter/exporterhelper/request.go | 88 +++++++++++++++++++ exporter/exporterhelper/request_test.go | 40 +++++++++ exporter/exporterhelper/traces.go | 75 +++++++++++++++- 17 files changed, 584 insertions(+), 100 deletions(-) create mode 100644 exporter/exporterhelper/request.go create mode 100644 exporter/exporterhelper/request_test.go diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 08b3e015e41e..b9a4cbe2b1d8 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -56,14 +56,25 @@ func (req *baseRequest) OnProcessingFinished() { } } +type queueSettings struct { + config QueueSettings + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler +} + +func (qs *queueSettings) persistenceEnabled() bool { + return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil +} + // baseSettings represents all the options that users can configure. type baseSettings struct { component.StartFunc component.ShutdownFunc consumerOptions []consumer.Option TimeoutSettings - QueueSettings + queueSettings RetrySettings + RequestSender } // fromOptions returns the internal options starting from the default and applying all configured options. @@ -72,7 +83,9 @@ func fromOptions(options ...Option) *baseSettings { opts := &baseSettings{ TimeoutSettings: NewDefaultTimeoutSettings(), // TODO: Enable queuing by default (call DefaultQueueSettings) - QueueSettings: QueueSettings{Enabled: false}, + queueSettings: queueSettings{ + config: QueueSettings{Enabled: false}, + }, // TODO: Enable retry by default (call DefaultRetrySettings) RetrySettings: RetrySettings{Enabled: false}, } @@ -121,9 +134,35 @@ func WithRetry(retrySettings RetrySettings) Option { // WithQueue overrides the default QueueSettings for an exporter. // The default QueueSettings is to disable queueing. -func WithQueue(queueSettings QueueSettings) Option { +func WithQueue(config QueueSettings) Option { return func(o *baseSettings) { - o.QueueSettings = queueSettings + o.queueSettings.config = config + } +} + +// WithPersistentQueue overrides the default QueueSettings for an exporter with persistent queue. +// Takes effect only for the new style helpers: NewTracesExporterV2, NewMetricsExporterV2, NewLogsExporterV2. +func WithPersistentQueue(config QueueSettings, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) Option { + return func(o *baseSettings) { + o.queueSettings = queueSettings{ + config: config, + marshaler: func(req internal.Request) ([]byte, error) { + return marshaler(req.(*request).Request) + }, + unmarshaler: func(buf []byte) (internal.Request, error) { + req, err := unmarshaler(buf) + if err != nil { + return nil, err + } + return &request{ + Request: req, + baseRequest: baseRequest{ + ctx: context.Background(), + }, + sender: o.RequestSender, + }, nil + }, + } } } @@ -145,7 +184,7 @@ type baseExporter struct { qrSender *queuedRetrySender } -func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) { +func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { be := &baseExporter{} var err error @@ -154,7 +193,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) + be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) be.sender = be.qrSender be.StartFunc = func(ctx context.Context, host component.Host) error { // First start the wrapped exporter. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 8f5a5376c392..e69be5ad7674 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -35,7 +34,7 @@ var ( ) func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -50,7 +49,6 @@ func TestBaseExporterWithOptions(t *testing.T) { WithShutdown(func(ctx context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings())), "", - nopRequestUnmarshaler(), ) require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) @@ -71,7 +69,3 @@ func nopTracePusher() consumer.ConsumeTracesFunc { return nil } } - -func nopRequestUnmarshaler() internal.RequestUnmarshaler { - return newTraceRequestUnmarshalerFunc(nopTracePusher()) -} diff --git a/exporter/exporterhelper/constants.go b/exporter/exporterhelper/constants.go index bdcbf1a4fd62..53c5a920d5ac 100644 --- a/exporter/exporterhelper/constants.go +++ b/exporter/exporterhelper/constants.go @@ -18,4 +18,12 @@ var ( errNilPushMetricsData = errors.New("nil PushMetrics") // errNilPushLogsData is returned when a nil PushLogs is given. errNilPushLogsData = errors.New("nil PushLogs") + // errNilRequestFromTracesConverter is returned when a nil RequestFromTracesConverter is given. + errNilRequestFromTracesConverter = errors.New("nil RequestFromTracesConverter") + // errNilRequestFromMetricsConverter is returned when a nil RequestFromMetricsConverter is given. + errNilRequestFromMetricsConverter = errors.New("nil RequestFromMetricsConverter") + // errNilRequestFromLogsConverter is returned when a nil RequestFromLogsConverter is given. + errNilRequestFromLogsConverter = errors.New("nil RequestFromLogsConverter") + // errNilRequestSender is returned when a nil RequestSender is given. + errNilRequestSender = errors.New("nil RequestSender") ) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index a7956d6f5698..d73063fbdcaf 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -35,11 +35,28 @@ func buildPersistentStorageName(name string, signal component.DataType) string { return fmt.Sprintf("%s-%s", name, signal) } +type PersistentQueueSettings struct { + // Name of the queue + Name string + // Signal type of the queue + Signal component.DataType + // Capacity of the queue + Capacity uint64 + // Logger to be used by the queue + Logger *zap.Logger + // Client to be used by the queue + Client storage.Client + // Unmarshaler to be used by the queue + Unmarshaler RequestUnmarshaler + // Marshaler to be used by the queue + Marshaler RequestMarshaler +} + // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue { +func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue { return &persistentQueue{ stopChan: make(chan struct{}), - storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler), + storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params), } } diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 135054405803..8b1ffb7674e5 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue panic(err) } - wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{ + Name: "foo", + Signal: component.DataTypeTraces, + Capacity: uint64(capacity), + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) return wq.(*persistentQueue) } diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 1d79d63a1246..5519acc9a3da 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -12,7 +12,7 @@ import ( "sync/atomic" "go.uber.org/zap" - + "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -43,6 +43,7 @@ type persistentContiguousStorage struct { queueName string client storage.Client unmarshaler RequestUnmarshaler + marshaler RequestMarshaler putChan chan struct{} stopChan chan struct{} @@ -80,14 +81,15 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. -func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { +func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: logger, - client: client, + logger: set.Logger, + client: set.Client, queueName: queueName, - unmarshaler: unmarshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), + unmarshaler: set.Unmarshaler, + marshaler: set.Marshaler, + capacity: set.Capacity, + putChan: make(chan struct{}, set.Capacity), reqChan: make(chan Request), stopChan: make(chan struct{}), itemsCount: &atomic.Uint64{}, diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index 85c99cf51e96..a80ba93c5c39 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) // setRequest adds Set operation over a given request to the batch func (bof *batchStruct) setRequest(key string, value Request) *batchStruct { - return bof.set(key, value, requestToBytes) + return bof.set(key, value, bof.requestToBytes) } // setItemIndex adds Set operation over a given itemIndex to the batch @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) { return val, err } -func requestToBytes(req any) ([]byte, error) { - return req.(Request).Marshal() +func (bof *batchStruct) requestToBytes(req any) ([]byte, error) { + return bof.pcs.marshaler(req.(Request)) } func (bof *batchStruct) bytesToRequest(b []byte) (any, error) { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index b27836fd2b64..5d33fc110642 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client { } func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage { - return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{ + Capacity: capacity, + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) } func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage { @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } } +func newFakeTracesRequestMarshalerFunc() RequestMarshaler { + return func(req Request) ([]byte, error) { + marshaler := ptrace.ProtoMarshaler{} + return marshaler.MarshalTraces(req.(*fakeTracesRequest).td) + } +} + func TestPersistentStorage_CorruptedData(t *testing.T) { path := t.TempDir() diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 390b35e94bdd..3120d7db1752 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -19,11 +19,8 @@ type Request interface { // Otherwise, it should return the original Request. OnError(error) Request - // Count returns the count of spans/metric points or log records. - Count() int - - // Marshal serializes the current request into a byte stream - Marshal() ([]byte, error) + // ItemsCount returns the number of basic items in the request (spans, date points or log records for OTLP) + ItemsCount() int // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished OnProcessingFinished() @@ -34,3 +31,6 @@ type Request interface { // RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request type RequestUnmarshaler func([]byte) (Request, error) + +// RequestMarshaler defines a function which takes a request and marshals it into a byte slice +type RequestMarshaler func(Request) ([]byte, error) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b53bfc8addb5..59c915ae325a 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -42,6 +42,10 @@ func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.Req } } +func logsRequestMarshaler(req internal.Request) ([]byte, error) { + return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) +} + func (req *logsRequest) OnError(err error) internal.Request { var logError consumererror.Logs if errors.As(err, &logError) { @@ -58,7 +62,7 @@ func (req *logsRequest) Marshal() ([]byte, error) { return logsMarshaler.MarshalLogs(req.ld) } -func (req *logsRequest) Count() int { +func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } @@ -88,7 +92,9 @@ func NewLogsExporter( } bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeLogs, newLogsRequestUnmarshalerFunc(pusher)) + bs.marshaler = logsRequestMarshaler + bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeLogs) if err != nil { return nil, err } @@ -103,7 +109,7 @@ func NewLogsExporter( req := newLogsRequest(ctx, ld, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -114,6 +120,67 @@ func NewLogsExporter( }, err } +type RequestFromLogsConverter interface { + // RequestFromLogs converts plog.Logs data into a request. + RequestFromLogs(ld plog.Logs) (Request, error) +} + +// NewLogsExporterV2 creates new logs exporter based on custom RequestFromLogsConverter and RequestSender. +func NewLogsExporterV2( + _ context.Context, + set exporter.CreateSettings, + converter RequestFromLogsConverter, + sender RequestSender, + options ...Option, +) (exporter.Logs, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilRequestFromLogsConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := fromOptions(options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeLogs) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &logsExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + req, cErr := converter.RequestFromLogs(ld) + if cErr != nil { + return cErr + } + sErr := be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordLogsEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &logsExporter{ + baseExporter: be, + Logs: lc, + }, err +} + type logsExporterWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -122,6 +189,6 @@ type logsExporterWithObservability struct { func (lewo *logsExporterWithObservability) send(req internal.Request) error { req.SetContext(lewo.obsrep.StartLogsOp(req.Context())) err := lewo.nextSender.send(req) - lewo.obsrep.EndLogsOp(req.Context(), req.Count(), err) + lewo.obsrep.EndLogsOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 1639a45fcab7..fb8709e0e286 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -42,6 +42,10 @@ func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) intern } } +func metricsRequestMarshaler(req internal.Request) ([]byte, error) { + return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) +} + func (req *metricsRequest) OnError(err error) internal.Request { var metricsError consumererror.Metrics if errors.As(err, &metricsError) { @@ -59,7 +63,7 @@ func (req *metricsRequest) Marshal() ([]byte, error) { return metricsMarshaler.MarshalMetrics(req.md) } -func (req *metricsRequest) Count() int { +func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } @@ -89,7 +93,9 @@ func NewMetricsExporter( } bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeMetrics, newMetricsRequestUnmarshalerFunc(pusher)) + bs.marshaler = metricsRequestMarshaler + bs.unmarshaler = newMetricsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) if err != nil { return nil, err } @@ -104,7 +110,7 @@ func NewMetricsExporter( req := newMetricsRequest(ctx, md, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -115,6 +121,67 @@ func NewMetricsExporter( }, err } +type RequestFromMetricsConverter interface { + // RequestFromMetrics converts pdata.Metrics into a request. + RequestFromMetrics(ctx context.Context, md pmetric.Metrics) (Request, error) +} + +// NewMetricsExporterV2 creates a new metrics exporter based on a custom RequestFromTracesConverter and RequestSender. +func NewMetricsExporterV2( + _ context.Context, + set exporter.CreateSettings, + converter RequestFromMetricsConverter, + sender RequestSender, + options ...Option, +) (exporter.Metrics, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilRequestFromMetricsConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := fromOptions(options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &metricsSenderWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { + req, cErr := converter.RequestFromMetrics(ctx, md) + if cErr != nil { + return cErr + } + sErr := be.sender.send(&request{ + Request: req, + baseRequest: baseRequest{ctx: ctx}, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordMetricsEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &metricsExporter{ + baseExporter: be, + Metrics: mc, + }, err +} + type metricsSenderWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -123,6 +190,6 @@ type metricsSenderWithObservability struct { func (mewo *metricsSenderWithObservability) send(req internal.Request) error { req.SetContext(mewo.obsrep.StartMetricsOp(req.Context())) err := mewo.nextSender.send(req) - mewo.obsrep.EndMetricsOp(req.Context(), req.Count(), err) + mewo.obsrep.EndMetricsOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 5ebc2d9ec65f..fa4b076729a2 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -47,7 +47,7 @@ func TestMetricsRequest(t *testing.T) { ) } -func TestMetricsExporter_InvalidName(t *testing.T) { +func TestMetricsExporter_NilConfig(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newPushMetricsData(nil)) require.Nil(t, me) require.Equal(t, errNilConfig, err) @@ -59,12 +59,33 @@ func TestMetricsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestMetricsExporterV2_NilLogger(t *testing.T) { + me, err := NewMetricsExporterV2(context.Background(), exporter.CreateSettings{}, fakeRequestConverter{}, + newFakeRequestSender(nil)) + require.Nil(t, me) + require.Equal(t, errNilLogger, err) +} + func TestMetricsExporter_NilPushMetricsData(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, nil) require.Nil(t, me) require.Equal(t, errNilPushMetricsData, err) } +func TestMetricsExporterV2_NilRequestConverter(t *testing.T) { + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), nil, + newFakeRequestSender(nil)) + require.Nil(t, me) + require.Equal(t, errNilRequestFromMetricsConverter, err) +} + +func TestMetricsExporterV2_NilRequestSender(t *testing.T) { + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + nil) + require.Nil(t, me) + require.Equal(t, errNilRequestSender, err) +} + func TestMetricsExporter_Default(t *testing.T) { md := pmetric.NewMetrics() me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil)) @@ -77,6 +98,19 @@ func TestMetricsExporter_Default(t *testing.T) { assert.NoError(t, me.Shutdown(context.Background())) } +func TestMetricsExporterV2_Default(t *testing.T) { + md := pmetric.NewMetrics() + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(nil)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) +} + func TestMetricsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil), WithCapabilities(capabilities)) @@ -86,6 +120,16 @@ func TestMetricsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, me.Capabilities()) } +func TestMetricsExporterV2_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(nil), WithCapabilities(capabilities)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, capabilities, me.Capabilities()) +} + func TestMetricsExporter_Default_ReturnError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("my_error") @@ -95,6 +139,16 @@ func TestMetricsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } +func TestMetricsExporterV2_Default_ReturnError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("my_error") + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) +} + func TestMetricsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2cf5f627d46f..e21d7a580a2b 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -70,33 +70,32 @@ func (qCfg *QueueSettings) Validate() error { } type queuedRetrySender struct { - fullName string - id component.ID - signal component.DataType - cfg QueueSettings - consumerSender requestSender - queue internal.ProducerConsumerQueue - retryStopCh chan struct{} - traceAttribute attribute.KeyValue - logger *zap.Logger - requeuingEnabled bool - requestUnmarshaler internal.RequestUnmarshaler + fullName string + id component.ID + signal component.DataType + queueSettings queueSettings + consumerSender requestSender + queue internal.ProducerConsumerQueue + retryStopCh chan struct{} + traceAttribute attribute.KeyValue + logger *zap.Logger + requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { +func newQueuedRetrySender(id component.ID, signal component.DataType, qs queueSettings, rCfg RetrySettings, + nextSender requestSender, logger *zap.Logger) *queuedRetrySender { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) qrs := &queuedRetrySender{ - fullName: id.String(), - id: id, - signal: signal, - cfg: qCfg, - retryStopCh: retryStopCh, - traceAttribute: traceAttr, - logger: sampledLogger, - requestUnmarshaler: reqUnmarshaler, + fullName: id.String(), + id: id, + signal: signal, + queueSettings: qs, + retryStopCh: retryStopCh, + traceAttribute: traceAttr, + logger: sampledLogger, } qrs.consumerSender = &retrySender{ @@ -109,8 +108,8 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue onTemporaryFailure: qrs.onTemporaryFailure, } - if qCfg.StorageID == nil { - qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize) + if !qs.persistenceEnabled() { + qrs.queue = internal.NewBoundedMemoryQueue(qs.config.QueueSize) } // The Persistent Queue is initialized separately as it needs extra information about the component @@ -143,16 +142,24 @@ func toStorageClient(ctx context.Context, storageID component.ID, host component // initializePersistentQueue uses extra information for initialization available from component.Host func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.StorageID == nil { + if !qrs.queueSettings.persistenceEnabled() { return nil } - storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) + storageClient, err := toStorageClient(ctx, *qrs.queueSettings.config.StorageID, host, qrs.id, qrs.signal) if err != nil { return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, internal.PersistentQueueSettings{ + Name: qrs.fullName, + Signal: qrs.signal, + Capacity: uint64(qrs.queueSettings.config.QueueSize), + Logger: qrs.logger, + Client: storageClient, + Marshaler: qrs.queueSettings.marshaler, + Unmarshaler: qrs.queueSettings.unmarshaler, + }) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true @@ -165,7 +172,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) return err } @@ -179,7 +186,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna logger.Error( "Exporting failed. Queue did not accept requeuing request. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) } return err @@ -191,13 +198,13 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return err } - qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item internal.Request) { + qrs.queue.StartConsumers(qrs.queueSettings.config.NumConsumers, func(item internal.Request) { _ = qrs.consumerSender.send(item) item.OnProcessingFinished() }) // Start reporting queue length metric - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { err := globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(qrs.queue.Size()) }, metricdata.NewLabelValue(qrs.fullName)) @@ -205,7 +212,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return fmt.Errorf("failed to create retry queue size metric: %w", err) } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.cfg.QueueSize) + return int64(qrs.queueSettings.config.QueueSize) }, metricdata.NewLabelValue(qrs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue capacity metric: %w", err) @@ -218,7 +225,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er // shutdown is invoked during service shutdown. func (qrs *queuedRetrySender) shutdown() { // Cleanup queue metrics reporting - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) }, metricdata.NewLabelValue(qrs.fullName)) @@ -287,12 +294,12 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { // send implements the requestSender interface func (qrs *queuedRetrySender) send(req internal.Request) error { - if !qrs.cfg.Enabled { + if !qrs.queueSettings.config.Enabled { err := qrs.consumerSender.send(req) if err != nil { qrs.logger.Error( "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) } return err @@ -306,7 +313,7 @@ func (qrs *queuedRetrySender) send(req internal.Request) error { if !qrs.queue.Produce(req) { qrs.logger.Error( "Dropping data because sending_queue is full. Try increasing queue_size.", - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qrs.traceAttribute)) return errSendingQueueIsFull @@ -391,7 +398,7 @@ func (rs *retrySender) send(req internal.Request) error { rs.logger.Error( "Exporting failed. The error is not retryable. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) return err } diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index d3e8f4b8c64c..97c4253a5a66 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -36,11 +36,18 @@ func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { } } +func mockRequestMarshaler(_ internal.Request) ([]byte, error) { + return nil, nil +} + func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", mockRequestUnmarshaler(mockR)) + bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(mockR) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -64,7 +71,10 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -90,7 +100,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -117,7 +127,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -151,7 +161,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -181,7 +191,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -228,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -261,7 +271,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -288,7 +298,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -309,7 +319,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -344,7 +354,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -478,7 +488,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -510,7 +520,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) be.qrSender.requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -535,7 +545,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -559,7 +569,10 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) + be, err := newBaseExporter(set, bs, "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -583,7 +596,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) @@ -633,7 +646,7 @@ func (mer *mockErrorRequest) Marshal() ([]byte, error) { return nil, nil } -func (mer *mockErrorRequest) Count() int { +func (mer *mockErrorRequest) ItemsCount() int { return 7 } @@ -684,7 +697,7 @@ func (m *mockRequest) checkNumRequests(t *testing.T, want int) { }, time.Second, 1*time.Millisecond) } -func (m *mockRequest) Count() int { +func (m *mockRequest) ItemsCount() int { return m.cnt } @@ -716,9 +729,9 @@ func newObservabilityConsumerSender(nextSender requestSender) *observabilityCons func (ocs *observabilityConsumerSender) send(req internal.Request) error { err := ocs.nextSender.send(req) if err != nil { - ocs.droppedItemsCount.Add(int64(req.Count())) + ocs.droppedItemsCount.Add(int64(req.ItemsCount())) } else { - ocs.sentItemsCount.Add(int64(req.Count())) + ocs.sentItemsCount.Add(int64(req.ItemsCount())) } ocs.waitGroup.Done() return err diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go new file mode 100644 index 000000000000..ce1a3ae86fe5 --- /dev/null +++ b/exporter/exporterhelper/request.go @@ -0,0 +1,88 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +type requestError struct { + error + req Request +} + +// Unwrap returns the wrapped error for functions Is and As in standard package errors. +func (err requestError) Unwrap() error { + return err.error +} + +// Request returns the request that failed to be sent and can be retried. +func (err requestError) Request() Request { + return err.req +} + +// NewRequestError creates a requestError that can encapsulate received data that failed to be processed or sent. +// It should be used for partial failures where some data was successfully sent and some was not. +func NewRequestError(err error, req Request) error { + return requestError{ + error: err, + req: req, + } +} + +// Request represents a single request that can be sent to the endpoint. +type Request interface { + // ItemsCount returns the count basic item in the request, the smallest peaces of data that can be sent to the endpoint. + // For example, for OTLP exporter, this value represents the number of spans, metric data points or log records. + ItemsCount() int +} + +// RequestMarshaler is a function serializing a Request into a byte stream. +type RequestMarshaler func(Request) ([]byte, error) + +// RequestUnmarshaler is a function deserializing a byte stream into a Request. +type RequestUnmarshaler func([]byte) (Request, error) + +// RequestSender is a helper function that sends a request. +type RequestSender func(ctx context.Context, req Request) error + +type request struct { + Request + baseRequest + sender RequestSender +} + +var _ internal.Request = (*request)(nil) + +func (req *request) Export(ctx context.Context) error { + return req.sender(ctx, req.Request) +} + +func (req *request) OnError(err error) internal.Request { + var reqError requestError + if errors.As(err, &reqError) { + return &request{ + baseRequest: req.baseRequest, + Request: reqError.Request(), + sender: req.sender, + } + } + return req +} + +func newRequestUnmarshaler(sender RequestSender, unmarshaler RequestUnmarshaler) internal.RequestUnmarshaler { + return func(bytes []byte) (internal.Request, error) { + req, err := unmarshaler(bytes) + if err != nil { + return nil, err + } + return &request{ + baseRequest: baseRequest{ctx: context.Background()}, + Request: req, + sender: sender, + }, nil + } +} diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go new file mode 100644 index 000000000000..70d9d2706ea2 --- /dev/null +++ b/exporter/exporterhelper/request_test.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type fakeRequest struct { + items int +} + +func (r fakeRequest) ItemsCount() int { + return r.items +} + +type fakeRequestConverter struct{} + +func (fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { + return fakeRequest{items: md.DataPointCount()}, nil +} + +func (fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { + return fakeRequest{items: td.SpanCount()}, nil +} + +func (fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { + return fakeRequest{items: ld.LogRecordCount()}, nil +} + +func newFakeRequestSender(err error) RequestSender { + return func(_ context.Context, _ Request) error { + return err + } +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 978ece2201cf..c20a8ee06529 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -42,6 +42,10 @@ func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal. } } +func tracesRequestMarshaler(req internal.Request) ([]byte, error) { + return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) +} + // Marshal provides serialization capabilities required by persistent queue func (req *tracesRequest) Marshal() ([]byte, error) { return tracesMarshaler.MarshalTraces(req.td) @@ -59,7 +63,7 @@ func (req *tracesRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.td) } -func (req *tracesRequest) Count() int { +func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } @@ -89,7 +93,9 @@ func NewTracesExporter( } bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeTraces, newTraceRequestUnmarshalerFunc(pusher)) + bs.marshaler = tracesRequestMarshaler + bs.unmarshaler = newTraceRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeTraces) if err != nil { return nil, err } @@ -104,7 +110,7 @@ func NewTracesExporter( req := newTracesRequest(ctx, td, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -115,6 +121,67 @@ func NewTracesExporter( }, err } +type RequestFromTracesConverter interface { + // RequestFromTraces converts ptrace.Traces into a Request. + RequestFromTraces(ptrace.Traces) (Request, error) +} + +// NewTracesExporterV2 creates a new traces exporter based on a custom RequestFromTracesConverter and RequestSender. +func NewTracesExporterV2( + _ context.Context, + set exporter.CreateSettings, + converter RequestFromTracesConverter, + sender RequestSender, + options ...Option, +) (exporter.Traces, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilRequestFromTracesConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := fromOptions(options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeTraces) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &tracesExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + req, cErr := converter.RequestFromTraces(td) + if cErr != nil { + return cErr + } + sErr := be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordTracesEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &traceExporter{ + baseExporter: be, + Traces: tc, + }, err +} + type tracesExporterWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -124,6 +191,6 @@ func (tewo *tracesExporterWithObservability) send(req internal.Request) error { req.SetContext(tewo.obsrep.StartTracesOp(req.Context())) // Forward the data to the next consumer (this pusher is the next). err := tewo.nextSender.send(req) - tewo.obsrep.EndTracesOp(req.Context(), req.Count(), err) + tewo.obsrep.EndTracesOp(req.Context(), req.ItemsCount(), err) return err }