diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 08b3e015e41e..b9a4cbe2b1d8 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -56,14 +56,25 @@ func (req *baseRequest) OnProcessingFinished() { } } +type queueSettings struct { + config QueueSettings + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler +} + +func (qs *queueSettings) persistenceEnabled() bool { + return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil +} + // baseSettings represents all the options that users can configure. type baseSettings struct { component.StartFunc component.ShutdownFunc consumerOptions []consumer.Option TimeoutSettings - QueueSettings + queueSettings RetrySettings + RequestSender } // fromOptions returns the internal options starting from the default and applying all configured options. @@ -72,7 +83,9 @@ func fromOptions(options ...Option) *baseSettings { opts := &baseSettings{ TimeoutSettings: NewDefaultTimeoutSettings(), // TODO: Enable queuing by default (call DefaultQueueSettings) - QueueSettings: QueueSettings{Enabled: false}, + queueSettings: queueSettings{ + config: QueueSettings{Enabled: false}, + }, // TODO: Enable retry by default (call DefaultRetrySettings) RetrySettings: RetrySettings{Enabled: false}, } @@ -121,9 +134,35 @@ func WithRetry(retrySettings RetrySettings) Option { // WithQueue overrides the default QueueSettings for an exporter. // The default QueueSettings is to disable queueing. -func WithQueue(queueSettings QueueSettings) Option { +func WithQueue(config QueueSettings) Option { return func(o *baseSettings) { - o.QueueSettings = queueSettings + o.queueSettings.config = config + } +} + +// WithPersistentQueue overrides the default QueueSettings for an exporter with persistent queue. +// Takes effect only for the new style helpers: NewTracesExporterV2, NewMetricsExporterV2, NewLogsExporterV2. +func WithPersistentQueue(config QueueSettings, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) Option { + return func(o *baseSettings) { + o.queueSettings = queueSettings{ + config: config, + marshaler: func(req internal.Request) ([]byte, error) { + return marshaler(req.(*request).Request) + }, + unmarshaler: func(buf []byte) (internal.Request, error) { + req, err := unmarshaler(buf) + if err != nil { + return nil, err + } + return &request{ + Request: req, + baseRequest: baseRequest{ + ctx: context.Background(), + }, + sender: o.RequestSender, + }, nil + }, + } } } @@ -145,7 +184,7 @@ type baseExporter struct { qrSender *queuedRetrySender } -func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) { +func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { be := &baseExporter{} var err error @@ -154,7 +193,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) + be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) be.sender = be.qrSender be.StartFunc = func(ctx context.Context, host component.Host) error { // First start the wrapped exporter. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 8f5a5376c392..e69be5ad7674 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -35,7 +34,7 @@ var ( ) func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -50,7 +49,6 @@ func TestBaseExporterWithOptions(t *testing.T) { WithShutdown(func(ctx context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings())), "", - nopRequestUnmarshaler(), ) require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) @@ -71,7 +69,3 @@ func nopTracePusher() consumer.ConsumeTracesFunc { return nil } } - -func nopRequestUnmarshaler() internal.RequestUnmarshaler { - return newTraceRequestUnmarshalerFunc(nopTracePusher()) -} diff --git a/exporter/exporterhelper/constants.go b/exporter/exporterhelper/constants.go index bdcbf1a4fd62..53c5a920d5ac 100644 --- a/exporter/exporterhelper/constants.go +++ b/exporter/exporterhelper/constants.go @@ -18,4 +18,12 @@ var ( errNilPushMetricsData = errors.New("nil PushMetrics") // errNilPushLogsData is returned when a nil PushLogs is given. errNilPushLogsData = errors.New("nil PushLogs") + // errNilRequestFromTracesConverter is returned when a nil RequestFromTracesConverter is given. + errNilRequestFromTracesConverter = errors.New("nil RequestFromTracesConverter") + // errNilRequestFromMetricsConverter is returned when a nil RequestFromMetricsConverter is given. + errNilRequestFromMetricsConverter = errors.New("nil RequestFromMetricsConverter") + // errNilRequestFromLogsConverter is returned when a nil RequestFromLogsConverter is given. + errNilRequestFromLogsConverter = errors.New("nil RequestFromLogsConverter") + // errNilRequestSender is returned when a nil RequestSender is given. + errNilRequestSender = errors.New("nil RequestSender") ) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index a7956d6f5698..d73063fbdcaf 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -35,11 +35,28 @@ func buildPersistentStorageName(name string, signal component.DataType) string { return fmt.Sprintf("%s-%s", name, signal) } +type PersistentQueueSettings struct { + // Name of the queue + Name string + // Signal type of the queue + Signal component.DataType + // Capacity of the queue + Capacity uint64 + // Logger to be used by the queue + Logger *zap.Logger + // Client to be used by the queue + Client storage.Client + // Unmarshaler to be used by the queue + Unmarshaler RequestUnmarshaler + // Marshaler to be used by the queue + Marshaler RequestMarshaler +} + // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue { +func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue { return &persistentQueue{ stopChan: make(chan struct{}), - storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler), + storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params), } } diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 135054405803..8b1ffb7674e5 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue panic(err) } - wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{ + Name: "foo", + Signal: component.DataTypeTraces, + Capacity: uint64(capacity), + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) return wq.(*persistentQueue) } diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 1d79d63a1246..5519acc9a3da 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -12,7 +12,7 @@ import ( "sync/atomic" "go.uber.org/zap" - + "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -43,6 +43,7 @@ type persistentContiguousStorage struct { queueName string client storage.Client unmarshaler RequestUnmarshaler + marshaler RequestMarshaler putChan chan struct{} stopChan chan struct{} @@ -80,14 +81,15 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. -func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { +func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: logger, - client: client, + logger: set.Logger, + client: set.Client, queueName: queueName, - unmarshaler: unmarshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), + unmarshaler: set.Unmarshaler, + marshaler: set.Marshaler, + capacity: set.Capacity, + putChan: make(chan struct{}, set.Capacity), reqChan: make(chan Request), stopChan: make(chan struct{}), itemsCount: &atomic.Uint64{}, diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index 85c99cf51e96..a80ba93c5c39 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) // setRequest adds Set operation over a given request to the batch func (bof *batchStruct) setRequest(key string, value Request) *batchStruct { - return bof.set(key, value, requestToBytes) + return bof.set(key, value, bof.requestToBytes) } // setItemIndex adds Set operation over a given itemIndex to the batch @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) { return val, err } -func requestToBytes(req any) ([]byte, error) { - return req.(Request).Marshal() +func (bof *batchStruct) requestToBytes(req any) ([]byte, error) { + return bof.pcs.marshaler(req.(Request)) } func (bof *batchStruct) bytesToRequest(b []byte) (any, error) { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index b27836fd2b64..5d33fc110642 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client { } func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage { - return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{ + Capacity: capacity, + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) } func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage { @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } } +func newFakeTracesRequestMarshalerFunc() RequestMarshaler { + return func(req Request) ([]byte, error) { + marshaler := ptrace.ProtoMarshaler{} + return marshaler.MarshalTraces(req.(*fakeTracesRequest).td) + } +} + func TestPersistentStorage_CorruptedData(t *testing.T) { path := t.TempDir() diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 390b35e94bdd..3120d7db1752 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -19,11 +19,8 @@ type Request interface { // Otherwise, it should return the original Request. OnError(error) Request - // Count returns the count of spans/metric points or log records. - Count() int - - // Marshal serializes the current request into a byte stream - Marshal() ([]byte, error) + // ItemsCount returns the number of basic items in the request (spans, date points or log records for OTLP) + ItemsCount() int // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished OnProcessingFinished() @@ -34,3 +31,6 @@ type Request interface { // RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request type RequestUnmarshaler func([]byte) (Request, error) + +// RequestMarshaler defines a function which takes a request and marshals it into a byte slice +type RequestMarshaler func(Request) ([]byte, error) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b53bfc8addb5..59c915ae325a 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -42,6 +42,10 @@ func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.Req } } +func logsRequestMarshaler(req internal.Request) ([]byte, error) { + return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) +} + func (req *logsRequest) OnError(err error) internal.Request { var logError consumererror.Logs if errors.As(err, &logError) { @@ -58,7 +62,7 @@ func (req *logsRequest) Marshal() ([]byte, error) { return logsMarshaler.MarshalLogs(req.ld) } -func (req *logsRequest) Count() int { +func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } @@ -88,7 +92,9 @@ func NewLogsExporter( } bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeLogs, newLogsRequestUnmarshalerFunc(pusher)) + bs.marshaler = logsRequestMarshaler + bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeLogs) if err != nil { return nil, err } @@ -103,7 +109,7 @@ func NewLogsExporter( req := newLogsRequest(ctx, ld, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -114,6 +120,67 @@ func NewLogsExporter( }, err } +type RequestFromLogsConverter interface { + // RequestFromLogs converts plog.Logs data into a request. + RequestFromLogs(ld plog.Logs) (Request, error) +} + +// NewLogsExporterV2 creates new logs exporter based on custom RequestFromLogsConverter and RequestSender. +func NewLogsExporterV2( + _ context.Context, + set exporter.CreateSettings, + converter RequestFromLogsConverter, + sender RequestSender, + options ...Option, +) (exporter.Logs, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilRequestFromLogsConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := fromOptions(options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeLogs) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &logsExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + req, cErr := converter.RequestFromLogs(ld) + if cErr != nil { + return cErr + } + sErr := be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordLogsEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &logsExporter{ + baseExporter: be, + Logs: lc, + }, err +} + type logsExporterWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -122,6 +189,6 @@ type logsExporterWithObservability struct { func (lewo *logsExporterWithObservability) send(req internal.Request) error { req.SetContext(lewo.obsrep.StartLogsOp(req.Context())) err := lewo.nextSender.send(req) - lewo.obsrep.EndLogsOp(req.Context(), req.Count(), err) + lewo.obsrep.EndLogsOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 1639a45fcab7..fb8709e0e286 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -42,6 +42,10 @@ func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) intern } } +func metricsRequestMarshaler(req internal.Request) ([]byte, error) { + return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) +} + func (req *metricsRequest) OnError(err error) internal.Request { var metricsError consumererror.Metrics if errors.As(err, &metricsError) { @@ -59,7 +63,7 @@ func (req *metricsRequest) Marshal() ([]byte, error) { return metricsMarshaler.MarshalMetrics(req.md) } -func (req *metricsRequest) Count() int { +func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } @@ -89,7 +93,9 @@ func NewMetricsExporter( } bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeMetrics, newMetricsRequestUnmarshalerFunc(pusher)) + bs.marshaler = metricsRequestMarshaler + bs.unmarshaler = newMetricsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) if err != nil { return nil, err } @@ -104,7 +110,7 @@ func NewMetricsExporter( req := newMetricsRequest(ctx, md, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -115,6 +121,67 @@ func NewMetricsExporter( }, err } +type RequestFromMetricsConverter interface { + // RequestFromMetrics converts pdata.Metrics into a request. + RequestFromMetrics(ctx context.Context, md pmetric.Metrics) (Request, error) +} + +// NewMetricsExporterV2 creates a new metrics exporter based on a custom RequestFromTracesConverter and RequestSender. +func NewMetricsExporterV2( + _ context.Context, + set exporter.CreateSettings, + converter RequestFromMetricsConverter, + sender RequestSender, + options ...Option, +) (exporter.Metrics, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilRequestFromMetricsConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := fromOptions(options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &metricsSenderWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { + req, cErr := converter.RequestFromMetrics(ctx, md) + if cErr != nil { + return cErr + } + sErr := be.sender.send(&request{ + Request: req, + baseRequest: baseRequest{ctx: ctx}, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordMetricsEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &metricsExporter{ + baseExporter: be, + Metrics: mc, + }, err +} + type metricsSenderWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -123,6 +190,6 @@ type metricsSenderWithObservability struct { func (mewo *metricsSenderWithObservability) send(req internal.Request) error { req.SetContext(mewo.obsrep.StartMetricsOp(req.Context())) err := mewo.nextSender.send(req) - mewo.obsrep.EndMetricsOp(req.Context(), req.Count(), err) + mewo.obsrep.EndMetricsOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 5ebc2d9ec65f..fa4b076729a2 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -47,7 +47,7 @@ func TestMetricsRequest(t *testing.T) { ) } -func TestMetricsExporter_InvalidName(t *testing.T) { +func TestMetricsExporter_NilConfig(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newPushMetricsData(nil)) require.Nil(t, me) require.Equal(t, errNilConfig, err) @@ -59,12 +59,33 @@ func TestMetricsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestMetricsExporterV2_NilLogger(t *testing.T) { + me, err := NewMetricsExporterV2(context.Background(), exporter.CreateSettings{}, fakeRequestConverter{}, + newFakeRequestSender(nil)) + require.Nil(t, me) + require.Equal(t, errNilLogger, err) +} + func TestMetricsExporter_NilPushMetricsData(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, nil) require.Nil(t, me) require.Equal(t, errNilPushMetricsData, err) } +func TestMetricsExporterV2_NilRequestConverter(t *testing.T) { + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), nil, + newFakeRequestSender(nil)) + require.Nil(t, me) + require.Equal(t, errNilRequestFromMetricsConverter, err) +} + +func TestMetricsExporterV2_NilRequestSender(t *testing.T) { + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + nil) + require.Nil(t, me) + require.Equal(t, errNilRequestSender, err) +} + func TestMetricsExporter_Default(t *testing.T) { md := pmetric.NewMetrics() me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil)) @@ -77,6 +98,19 @@ func TestMetricsExporter_Default(t *testing.T) { assert.NoError(t, me.Shutdown(context.Background())) } +func TestMetricsExporterV2_Default(t *testing.T) { + md := pmetric.NewMetrics() + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(nil)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) +} + func TestMetricsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil), WithCapabilities(capabilities)) @@ -86,6 +120,16 @@ func TestMetricsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, me.Capabilities()) } +func TestMetricsExporterV2_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(nil), WithCapabilities(capabilities)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, capabilities, me.Capabilities()) +} + func TestMetricsExporter_Default_ReturnError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("my_error") @@ -95,6 +139,16 @@ func TestMetricsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } +func TestMetricsExporterV2_Default_ReturnError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("my_error") + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) +} + func TestMetricsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2cf5f627d46f..e21d7a580a2b 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -70,33 +70,32 @@ func (qCfg *QueueSettings) Validate() error { } type queuedRetrySender struct { - fullName string - id component.ID - signal component.DataType - cfg QueueSettings - consumerSender requestSender - queue internal.ProducerConsumerQueue - retryStopCh chan struct{} - traceAttribute attribute.KeyValue - logger *zap.Logger - requeuingEnabled bool - requestUnmarshaler internal.RequestUnmarshaler + fullName string + id component.ID + signal component.DataType + queueSettings queueSettings + consumerSender requestSender + queue internal.ProducerConsumerQueue + retryStopCh chan struct{} + traceAttribute attribute.KeyValue + logger *zap.Logger + requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { +func newQueuedRetrySender(id component.ID, signal component.DataType, qs queueSettings, rCfg RetrySettings, + nextSender requestSender, logger *zap.Logger) *queuedRetrySender { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) qrs := &queuedRetrySender{ - fullName: id.String(), - id: id, - signal: signal, - cfg: qCfg, - retryStopCh: retryStopCh, - traceAttribute: traceAttr, - logger: sampledLogger, - requestUnmarshaler: reqUnmarshaler, + fullName: id.String(), + id: id, + signal: signal, + queueSettings: qs, + retryStopCh: retryStopCh, + traceAttribute: traceAttr, + logger: sampledLogger, } qrs.consumerSender = &retrySender{ @@ -109,8 +108,8 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue onTemporaryFailure: qrs.onTemporaryFailure, } - if qCfg.StorageID == nil { - qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize) + if !qs.persistenceEnabled() { + qrs.queue = internal.NewBoundedMemoryQueue(qs.config.QueueSize) } // The Persistent Queue is initialized separately as it needs extra information about the component @@ -143,16 +142,24 @@ func toStorageClient(ctx context.Context, storageID component.ID, host component // initializePersistentQueue uses extra information for initialization available from component.Host func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.StorageID == nil { + if !qrs.queueSettings.persistenceEnabled() { return nil } - storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) + storageClient, err := toStorageClient(ctx, *qrs.queueSettings.config.StorageID, host, qrs.id, qrs.signal) if err != nil { return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, internal.PersistentQueueSettings{ + Name: qrs.fullName, + Signal: qrs.signal, + Capacity: uint64(qrs.queueSettings.config.QueueSize), + Logger: qrs.logger, + Client: storageClient, + Marshaler: qrs.queueSettings.marshaler, + Unmarshaler: qrs.queueSettings.unmarshaler, + }) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true @@ -165,7 +172,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) return err } @@ -179,7 +186,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna logger.Error( "Exporting failed. Queue did not accept requeuing request. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) } return err @@ -191,13 +198,13 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return err } - qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item internal.Request) { + qrs.queue.StartConsumers(qrs.queueSettings.config.NumConsumers, func(item internal.Request) { _ = qrs.consumerSender.send(item) item.OnProcessingFinished() }) // Start reporting queue length metric - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { err := globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(qrs.queue.Size()) }, metricdata.NewLabelValue(qrs.fullName)) @@ -205,7 +212,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return fmt.Errorf("failed to create retry queue size metric: %w", err) } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.cfg.QueueSize) + return int64(qrs.queueSettings.config.QueueSize) }, metricdata.NewLabelValue(qrs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue capacity metric: %w", err) @@ -218,7 +225,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er // shutdown is invoked during service shutdown. func (qrs *queuedRetrySender) shutdown() { // Cleanup queue metrics reporting - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) }, metricdata.NewLabelValue(qrs.fullName)) @@ -287,12 +294,12 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { // send implements the requestSender interface func (qrs *queuedRetrySender) send(req internal.Request) error { - if !qrs.cfg.Enabled { + if !qrs.queueSettings.config.Enabled { err := qrs.consumerSender.send(req) if err != nil { qrs.logger.Error( "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) } return err @@ -306,7 +313,7 @@ func (qrs *queuedRetrySender) send(req internal.Request) error { if !qrs.queue.Produce(req) { qrs.logger.Error( "Dropping data because sending_queue is full. Try increasing queue_size.", - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qrs.traceAttribute)) return errSendingQueueIsFull @@ -391,7 +398,7 @@ func (rs *retrySender) send(req internal.Request) error { rs.logger.Error( "Exporting failed. The error is not retryable. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) return err } diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index d3e8f4b8c64c..97c4253a5a66 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -36,11 +36,18 @@ func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { } } +func mockRequestMarshaler(_ internal.Request) ([]byte, error) { + return nil, nil +} + func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", mockRequestUnmarshaler(mockR)) + bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(mockR) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -64,7 +71,10 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -90,7 +100,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -117,7 +127,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -151,7 +161,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -181,7 +191,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -228,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -261,7 +271,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -288,7 +298,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -309,7 +319,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -344,7 +354,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -478,7 +488,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -510,7 +520,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) be.qrSender.requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -535,7 +545,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -559,7 +569,10 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) + be, err := newBaseExporter(set, bs, "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -583,7 +596,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) @@ -633,7 +646,7 @@ func (mer *mockErrorRequest) Marshal() ([]byte, error) { return nil, nil } -func (mer *mockErrorRequest) Count() int { +func (mer *mockErrorRequest) ItemsCount() int { return 7 } @@ -684,7 +697,7 @@ func (m *mockRequest) checkNumRequests(t *testing.T, want int) { }, time.Second, 1*time.Millisecond) } -func (m *mockRequest) Count() int { +func (m *mockRequest) ItemsCount() int { return m.cnt } @@ -716,9 +729,9 @@ func newObservabilityConsumerSender(nextSender requestSender) *observabilityCons func (ocs *observabilityConsumerSender) send(req internal.Request) error { err := ocs.nextSender.send(req) if err != nil { - ocs.droppedItemsCount.Add(int64(req.Count())) + ocs.droppedItemsCount.Add(int64(req.ItemsCount())) } else { - ocs.sentItemsCount.Add(int64(req.Count())) + ocs.sentItemsCount.Add(int64(req.ItemsCount())) } ocs.waitGroup.Done() return err diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go new file mode 100644 index 000000000000..ce1a3ae86fe5 --- /dev/null +++ b/exporter/exporterhelper/request.go @@ -0,0 +1,88 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +type requestError struct { + error + req Request +} + +// Unwrap returns the wrapped error for functions Is and As in standard package errors. +func (err requestError) Unwrap() error { + return err.error +} + +// Request returns the request that failed to be sent and can be retried. +func (err requestError) Request() Request { + return err.req +} + +// NewRequestError creates a requestError that can encapsulate received data that failed to be processed or sent. +// It should be used for partial failures where some data was successfully sent and some was not. +func NewRequestError(err error, req Request) error { + return requestError{ + error: err, + req: req, + } +} + +// Request represents a single request that can be sent to the endpoint. +type Request interface { + // ItemsCount returns the count basic item in the request, the smallest peaces of data that can be sent to the endpoint. + // For example, for OTLP exporter, this value represents the number of spans, metric data points or log records. + ItemsCount() int +} + +// RequestMarshaler is a function serializing a Request into a byte stream. +type RequestMarshaler func(Request) ([]byte, error) + +// RequestUnmarshaler is a function deserializing a byte stream into a Request. +type RequestUnmarshaler func([]byte) (Request, error) + +// RequestSender is a helper function that sends a request. +type RequestSender func(ctx context.Context, req Request) error + +type request struct { + Request + baseRequest + sender RequestSender +} + +var _ internal.Request = (*request)(nil) + +func (req *request) Export(ctx context.Context) error { + return req.sender(ctx, req.Request) +} + +func (req *request) OnError(err error) internal.Request { + var reqError requestError + if errors.As(err, &reqError) { + return &request{ + baseRequest: req.baseRequest, + Request: reqError.Request(), + sender: req.sender, + } + } + return req +} + +func newRequestUnmarshaler(sender RequestSender, unmarshaler RequestUnmarshaler) internal.RequestUnmarshaler { + return func(bytes []byte) (internal.Request, error) { + req, err := unmarshaler(bytes) + if err != nil { + return nil, err + } + return &request{ + baseRequest: baseRequest{ctx: context.Background()}, + Request: req, + sender: sender, + }, nil + } +} diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go new file mode 100644 index 000000000000..70d9d2706ea2 --- /dev/null +++ b/exporter/exporterhelper/request_test.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type fakeRequest struct { + items int +} + +func (r fakeRequest) ItemsCount() int { + return r.items +} + +type fakeRequestConverter struct{} + +func (fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { + return fakeRequest{items: md.DataPointCount()}, nil +} + +func (fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { + return fakeRequest{items: td.SpanCount()}, nil +} + +func (fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { + return fakeRequest{items: ld.LogRecordCount()}, nil +} + +func newFakeRequestSender(err error) RequestSender { + return func(_ context.Context, _ Request) error { + return err + } +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 978ece2201cf..c20a8ee06529 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -42,6 +42,10 @@ func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal. } } +func tracesRequestMarshaler(req internal.Request) ([]byte, error) { + return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) +} + // Marshal provides serialization capabilities required by persistent queue func (req *tracesRequest) Marshal() ([]byte, error) { return tracesMarshaler.MarshalTraces(req.td) @@ -59,7 +63,7 @@ func (req *tracesRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.td) } -func (req *tracesRequest) Count() int { +func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } @@ -89,7 +93,9 @@ func NewTracesExporter( } bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeTraces, newTraceRequestUnmarshalerFunc(pusher)) + bs.marshaler = tracesRequestMarshaler + bs.unmarshaler = newTraceRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeTraces) if err != nil { return nil, err } @@ -104,7 +110,7 @@ func NewTracesExporter( req := newTracesRequest(ctx, td, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -115,6 +121,67 @@ func NewTracesExporter( }, err } +type RequestFromTracesConverter interface { + // RequestFromTraces converts ptrace.Traces into a Request. + RequestFromTraces(ptrace.Traces) (Request, error) +} + +// NewTracesExporterV2 creates a new traces exporter based on a custom RequestFromTracesConverter and RequestSender. +func NewTracesExporterV2( + _ context.Context, + set exporter.CreateSettings, + converter RequestFromTracesConverter, + sender RequestSender, + options ...Option, +) (exporter.Traces, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilRequestFromTracesConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := fromOptions(options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeTraces) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &tracesExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + req, cErr := converter.RequestFromTraces(td) + if cErr != nil { + return cErr + } + sErr := be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordTracesEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &traceExporter{ + baseExporter: be, + Traces: tc, + }, err +} + type tracesExporterWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -124,6 +191,6 @@ func (tewo *tracesExporterWithObservability) send(req internal.Request) error { req.SetContext(tewo.obsrep.StartTracesOp(req.Context())) // Forward the data to the next consumer (this pusher is the next). err := tewo.nextSender.send(req) - tewo.obsrep.EndTracesOp(req.Context(), req.Count(), err) + tewo.obsrep.EndTracesOp(req.Context(), req.ItemsCount(), err) return err }