diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..7646f4ec03a --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,33 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add API for enabling queue in the new exporter helpers. + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The following experimental API is introduced in exporter/exporterhelper package: + - `RequestMarshaler`: a new interface for marshaling client-provided requests. + - `RequestUnmarshaler`: a new interface for unmarshaling client-provided requests. + - `WithMemoryQueue`: a new exporter helper option for using a memory queue. + - `WithPersistentQueue`: a new exporter helper option for using a persistent queue. + - `QueueConfig`: a configuration for queueing requests used by WithMemoryQueue option. + - `NewDefaultQueueConfig`: a function for creating a default QueueConfig. + - `PersistentQueueConfig`: a configuration for queueing requests in persistent storage used by WithPersistentQueue option. + - `NewDefaultPersistentQueueConfig`: a function for creating a default PersistentQueueConfig. + All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata. + +# One or more tracking issues or pull requests related to the change +issues: [7874] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index f6277404619..6d0c6ad4963 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -112,7 +112,8 @@ func WithRetry(retrySettings RetrySettings) Option { func WithQueue(config QueueSettings) Option { return func(o *baseExporter) { if o.requestExporter { - panic("queueing is not available for the new request exporters yet") + panic("this option is not available for the new request exporters, " + + "use WithMemoryQueue or WithPersistentQueue instead") } var queue internal.ProducerConsumerQueue if config.Enabled { @@ -128,6 +129,52 @@ func WithQueue(config QueueSettings) Option { } } +// WithMemoryQueue overrides the default QueueConfig for an exporter to use an in-memory queue. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithMemoryQueue(config QueueConfig) Option { + return func(o *baseExporter) { + var queue internal.ProducerConsumerQueue + if config.Enabled { + queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) + } + qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger) + o.queueSender = qs + o.setOnTemporaryFailure(qs.onTemporaryFailure) + } +} + +// WithPersistentQueue overrides the default QueueConfig for an exporter to use a persistent queue. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithPersistentQueue(config PersistentQueueConfig, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) Option { + return func(o *baseExporter) { + var queue internal.ProducerConsumerQueue + if config.Enabled { + queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, + func(req internal.Request) ([]byte, error) { + return marshaler(req.(*request).Request) + }, + func(data []byte) (internal.Request, error) { + req, err := unmarshaler(data) + if err != nil { + return nil, err + } + return &request{ + Request: req, + baseRequest: baseRequest{ctx: context.Background()}, + }, nil + }, + ) + } + qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger) + o.queueSender = qs + o.setOnTemporaryFailure(qs.onTemporaryFailure) + } +} + // WithCapabilities overrides the default Capabilities() function for a Consumer. // The default is non-mutable data. // TODO: Verify if we can change the default to be mutable as we do for processors. diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 3a6973b6f6a..5d31c37f286 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -6,6 +6,7 @@ package exporterhelper import ( "context" "errors" + "sync/atomic" "testing" "time" @@ -143,12 +144,12 @@ func TestLogsRequestExporter_Default_ConvertError(t *testing.T) { func TestLogsRequestExporter_Default_ExportError(t *testing.T) { ld := plog.NewLogs() - want := errors.New("export_error") + wantErr := errors.New("export_error") le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), - &fakeRequestConverter{requestError: want}) + &fakeRequestConverter{exportCallback: func(Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, le) - require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) + require.Equal(t, wantErr, le.ConsumeLogs(context.Background(), ld)) } func TestLogsExporter_WithPersistentQueue(t *testing.T) { @@ -175,6 +176,34 @@ func TestLogsExporter_WithPersistentQueue(t *testing.T) { }, 500*time.Millisecond, 10*time.Millisecond) } +func TestLogsRequestExporter_WithPersistentQueue(t *testing.T) { + qCfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID + acc := &atomic.Uint32{} + set := exportertest.NewNopCreateSettings() + set.ID = component.NewIDWithName("test_logs_request", "with_persistent_queue") + rc := &fakeRequestConverter{exportCallback: func(req Request) error { + acc.Add(uint32(req.(RequestItemsCounter).ItemsCount())) + return nil + }} + te, err := NewLogsRequestExporter(context.Background(), set, rc, + WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc())) + require.NoError(t, err) + + host := &mockHost{ext: map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(nil), + }} + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + require.NoError(t, te.ConsumeLogs(context.Background(), testdata.GenerateLogs(1))) + require.NoError(t, te.ConsumeLogs(context.Background(), testdata.GenerateLogs(2))) + require.Eventually(t, func() bool { + return acc.Load() == 3 + }, 500*time.Millisecond, 10*time.Millisecond) +} + func TestLogsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -213,17 +242,17 @@ func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) { } func TestLogsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) { - want := errors.New("export_error") + wantErr := errors.New("export_error") tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), - &fakeRequestConverter{requestError: want}) + &fakeRequestConverter{exportCallback: func(Request) error { return wantErr }}) require.Nil(t, err) require.NotNil(t, le) - checkRecordedMetricsForLogsExporter(t, tt, le, want) + checkRecordedMetricsForLogsExporter(t, tt, le, wantErr) } func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { @@ -298,11 +327,12 @@ func TestLogsRequestExporter_WithSpan_ReturnError(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) - want := errors.New("my_error") - le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want}) + wantErr := errors.New("my_error") + le, err := NewLogsRequestExporter(context.Background(), set, + &fakeRequestConverter{exportCallback: func(Request) error { return wantErr }}) require.Nil(t, err) require.NotNil(t, le) - checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) + checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, wantErr, 1) } func TestLogsExporter_WithShutdown(t *testing.T) { diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 63280c118d1..0db59a42955 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -6,6 +6,7 @@ package exporterhelper import ( "context" "errors" + "sync/atomic" "testing" "time" @@ -144,12 +145,12 @@ func TestMetricsRequestExporter_Default_ConvertError(t *testing.T) { func TestMetricsRequestExporter_Default_ExportError(t *testing.T) { md := pmetric.NewMetrics() - want := errors.New("export_error") + wantErr := errors.New("export_error") me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), - fakeRequestConverter{requestError: want}) + fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, me) - require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) + require.Equal(t, wantErr, me.ConsumeMetrics(context.Background(), md)) } func TestMetricsExporter_WithPersistentQueue(t *testing.T) { @@ -176,6 +177,34 @@ func TestMetricsExporter_WithPersistentQueue(t *testing.T) { }, 500*time.Millisecond, 10*time.Millisecond) } +func TestMetricsRequestExporter_WithPersistentQueue(t *testing.T) { + qCfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID + acc := &atomic.Uint32{} + set := exportertest.NewNopCreateSettings() + set.ID = component.NewIDWithName("test_metrics_request", "with_persistent_queue") + rc := &fakeRequestConverter{exportCallback: func(req Request) error { + acc.Add(uint32(req.(RequestItemsCounter).ItemsCount())) + return nil + }} + te, err := NewMetricsRequestExporter(context.Background(), set, rc, + WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc())) + require.NoError(t, err) + + host := &mockHost{ext: map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(nil), + }} + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + require.NoError(t, te.ConsumeMetrics(context.Background(), testdata.GenerateMetrics(1))) // 2 data points + require.NoError(t, te.ConsumeMetrics(context.Background(), testdata.GenerateMetrics(2))) // 4 data points + require.Eventually(t, func() bool { + return acc.Load() == 6 + }, 500*time.Millisecond, 10*time.Millisecond) +} + func TestMetricsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -214,16 +243,17 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { } func TestMetricsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) { - want := errors.New("my_error") + wantErr := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), fakeRequestConverter{requestError: want}) + me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), + fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, me) - checkRecordedMetricsForMetricsExporter(t, tt, me, want) + checkRecordedMetricsForMetricsExporter(t, tt, me, wantErr) } func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { @@ -298,11 +328,12 @@ func TestMetricsRequestExporter_WithSpan_ExportError(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) - want := errors.New("my_error") - me, err := NewMetricsRequestExporter(context.Background(), set, fakeRequestConverter{requestError: want}) + wantErr := errors.New("my_error") + me, err := NewMetricsRequestExporter(context.Background(), set, + fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, me) - checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2) + checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, wantErr, 2) } func TestMetricsExporter_WithShutdown(t *testing.T) { diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index d3da29500fe..fab2dbb46e2 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -62,6 +62,64 @@ func (qCfg *QueueSettings) Validate() error { return nil } +// QueueConfig defines configuration for queueing requests before exporting. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type QueueConfig struct { + // Enabled indicates whether to not enqueue batches before exporting. + Enabled bool `mapstructure:"enabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueSize is the maximum number of batches allowed in queue at a given time. + // This field is left for backward compatibility with QueueSettings. + // Later, it will be replaced with size fields specified explicitly in terms of items or batches. + QueueSize int `mapstructure:"queue_size"` +} + +// NewDefaultQueueConfig returns the default QueueConfig. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultQueueConfig() QueueConfig { + return QueueConfig{ + Enabled: true, + NumConsumers: 10, + QueueSize: defaultQueueSize, + } +} + +// PersistentQueueConfig defines configuration for queueing requests before exporting using a persistent storage. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter and will replace +// QueueSettings in the future. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type PersistentQueueConfig struct { + QueueConfig `mapstructure:",squash"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *component.ID `mapstructure:"storage"` +} + +// NewDefaultPersistentQueueConfig returns the default PersistentQueueConfig. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultPersistentQueueConfig() PersistentQueueConfig { + return PersistentQueueConfig{ + QueueConfig: NewDefaultQueueConfig(), + } +} + +// Validate checks if the QueueSettings configuration is valid +func (qCfg *QueueConfig) Validate() error { + if !qCfg.Enabled { + return nil + } + if qCfg.QueueSize <= 0 { + return errors.New("queue size must be positive") + } + return nil +} + type queueSender struct { baseRequestSender fullName string diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index d5c7f00de6b..d040b58dc43 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -177,6 +177,18 @@ func TestQueueSettings_Validate(t *testing.T) { assert.NoError(t, qCfg.Validate()) } +func TestQueueConfig_Validate(t *testing.T) { + qCfg := NewDefaultQueueConfig() + assert.NoError(t, qCfg.Validate()) + + qCfg.QueueSize = 0 + assert.EqualError(t, qCfg.Validate(), "queue size must be positive") + + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) +} + // if requeueing is enabled, we eventually retry even if we failed at first func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg := NewDefaultQueueSettings() @@ -249,6 +261,34 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) } +func TestMemoryQueue(t *testing.T) { + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, true, nil, nil, newNoopObsrepSender, WithMemoryQueue(NewDefaultQueueConfig())) + require.NotNil(t, be.queueSender.(*queueSender).queue) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestMemoryQueueDisabled(t *testing.T) { + qs := NewDefaultQueueConfig() + qs.Enabled = false + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, true, nil, nil, newNoopObsrepSender, WithMemoryQueue(qs)) + require.Nil(t, be.queueSender.(*queueSender).queue) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestPersistentQueueDisabled(t *testing.T) { + qs := NewDefaultPersistentQueueConfig() + qs.Enabled = false + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, true, nil, nil, newNoopObsrepSender, WithPersistentQueue(qs, nil, nil)) + require.Nil(t, be.queueSender.(*queueSender).queue) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) +} + func TestQueuedRetryPersistenceEnabled(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(defaultID) require.NoError(t, err) @@ -295,6 +335,59 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { require.Error(t, be.Start(context.Background(), host), "could not get storage client") } +func TestPersistentQueueRetryStorageError(t *testing.T) { + storageError := errors.New("could not get storage client") + tt, err := obsreporttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := NewDefaultRetrySettings() + set := tt.ToExporterCreateSettings() + rc := fakeRequestConverter{} + be, err := newBaseExporter(set, "", true, nil, nil, newNoopObsrepSender, WithRetry(rCfg), + WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc())) + require.NoError(t, err) + + var extensions = map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(storageError), + } + host := &mockHost{ext: extensions} + + // we fail to start if we get an error creating the storage client + require.Error(t, be.Start(context.Background(), host), "could not get storage client") +} + +func TestPersistentQueueRetryUnmarshalError(t *testing.T) { + cfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + cfg.StorageID = &storageID // enable persistence + set := exportertest.NewNopCreateSettings() + set.ID = component.NewIDWithName("test_request", "with_persistent_queue") + unmarshalCalled := &atomic.Bool{} + rc := fakeRequestConverter{} + unmarshaler := func(bytes []byte) (Request, error) { + unmarshalCalled.Store(true) + return nil, errors.New("unmarshal error") + } + be, err := newBaseExporter(set, "", true, nil, nil, newNoopObsrepSender, WithPersistentQueue(cfg, rc.requestMarshalerFunc(), unmarshaler)) + require.NoError(t, err) + + require.Nil(t, be.Start(context.Background(), &mockHost{ext: map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(nil), + }})) + + require.NoError(t, be.send(&request{ + baseRequest: baseRequest{ctx: context.Background()}, + Request: rc.fakeRequest(1), + })) + require.Eventually(t, func() bool { return unmarshalCalled.Load() }, 100*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, be.Shutdown(context.Background())) +} + func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { produceCounter := &atomic.Uint32{} diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index ef05aa6395d..734445bffa6 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -59,3 +59,13 @@ func (req *request) Count() int { } return 0 } + +// RequestMarshaler is a function that can marshal a Request into bytes. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type RequestMarshaler func(req Request) ([]byte, error) + +// RequestUnmarshaler is a function that can unmarshal bytes into a Request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type RequestUnmarshaler func(data []byte) (Request, error) diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 6dd3f67800a..954847932b0 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,6 +5,8 @@ package exporterhelper import ( "context" + "encoding/json" + "errors" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -12,33 +14,61 @@ import ( ) type fakeRequest struct { - items int - err error + Items int + exportCallback func(req Request) error } func (r fakeRequest) Export(_ context.Context) error { - return r.err + if r.exportCallback == nil { + return nil + } + return r.exportCallback(r) } func (r fakeRequest) ItemsCount() int { - return r.items + return r.Items } type fakeRequestConverter struct { - metricsError error - tracesError error - logsError error - requestError error + metricsError error + tracesError error + logsError error + exportCallback func(req Request) error } func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError + return c.fakeRequest(md.DataPointCount()), c.metricsError } func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { - return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError + return c.fakeRequest(td.SpanCount()), c.tracesError } func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { - return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError + return c.fakeRequest(ld.LogRecordCount()), c.logsError +} + +func (c fakeRequestConverter) fakeRequest(items int) Request { + return fakeRequest{Items: items, exportCallback: c.exportCallback} +} + +func (c fakeRequestConverter) requestMarshalerFunc() RequestMarshaler { + return func(req Request) ([]byte, error) { + r, ok := req.(fakeRequest) + if !ok { + return nil, errors.New("invalid request type") + } + return json.Marshal(r) + } +} + +func (c fakeRequestConverter) requestUnmarshalerFunc() RequestUnmarshaler { + return func(bytes []byte) (Request, error) { + var r fakeRequest + if err := json.Unmarshal(bytes, &r); err != nil { + return nil, err + } + r.exportCallback = c.exportCallback + return r, nil + } } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 214587fb2e1..14580d2e29e 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -6,6 +6,7 @@ package exporterhelper import ( "context" "errors" + "sync/atomic" "testing" "time" @@ -142,11 +143,12 @@ func TestTracesRequestExporter_Default_ConvertError(t *testing.T) { func TestTracesRequestExporter_Default_ExportError(t *testing.T) { td := ptrace.NewTraces() - want := errors.New("export_error") - te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{requestError: want}) + wantErr := errors.New("export_error") + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, te) - require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) + require.Equal(t, wantErr, te.ConsumeTraces(context.Background(), td)) } func TestTracesExporter_WithPersistentQueue(t *testing.T) { @@ -173,6 +175,34 @@ func TestTracesExporter_WithPersistentQueue(t *testing.T) { }, 500*time.Millisecond, 10*time.Millisecond) } +func TestTracesRequestExporter_WithPersistentQueue(t *testing.T) { + qCfg := NewDefaultPersistentQueueConfig() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID + acc := &atomic.Uint32{} + set := exportertest.NewNopCreateSettings() + set.ID = component.NewIDWithName("test_traces_request", "with_persistent_queue") + rc := &fakeRequestConverter{exportCallback: func(req Request) error { + acc.Add(uint32(req.(RequestItemsCounter).ItemsCount())) + return nil + }} + te, err := NewTracesRequestExporter(context.Background(), set, rc, + WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc())) + require.NoError(t, err) + + host := &mockHost{ext: map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(nil), + }} + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + require.NoError(t, te.ConsumeTraces(context.Background(), testdata.GenerateTraces(1))) + require.NoError(t, te.ConsumeTraces(context.Background(), testdata.GenerateTraces(2))) + require.Eventually(t, func() bool { + return acc.Load() == 3 + }, 500*time.Millisecond, 10*time.Millisecond) +} + func TestTracesExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -211,16 +241,17 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { } func TestTracesRequestExporter_WithRecordMetrics_RequestSenderError(t *testing.T) { - want := errors.New("export_error") + wantErr := errors.New("export_error") tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{requestError: want}) + te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), + &fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, te) - checkRecordedMetricsForTracesExporter(t, tt, te, want) + checkRecordedMetricsForTracesExporter(t, tt, te, wantErr) } func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { @@ -298,12 +329,13 @@ func TestTracesRequestExporter_WithSpan_ExportError(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) - want := errors.New("export_error") - te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want}) + wantErr := errors.New("export_error") + te, err := NewTracesRequestExporter(context.Background(), set, + &fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }}) require.NoError(t, err) require.NotNil(t, te) - checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1) + checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, wantErr, 1) } func TestTracesExporter_WithShutdown(t *testing.T) {