diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..a1a0468de39 --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,35 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add API for enabling queue in the new exporter helpers. + +# One or more tracking issues or pull requests related to the change +issues: [7874] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The following experimental API is introduced in exporter/exporterhelper package: + - `WithRequestQueue`: a new exporter helper option for using a queue. + - queue.Queue: an interface for queue implementations. + - queue.Factory: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option. + - queue.Settings: queue factory settings. + - queue.Config: common configuration for queue implementations. + - queue.NewDefaultConfig: a function for creating a default queue configuration. + - queue/memoryqueue.NewFactory: a new factory for creating a memory queue. + - queue/memoryqueue.Config: a configuration for the memory queue factory. + - queue/memoryqueue.NewDefaultConfig: a function for creating a default memory queue configuration. + All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata. + + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 8a68efcc617..40584b417c9 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -9,14 +9,17 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" ) // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { - start(ctx context.Context, host component.Host, set exporter.CreateSettings) error + start(ctx context.Context, host component.Host) error shutdown() - send(req internal.Request) error + send(req *intrequest.Request) error setNextSender(nextSender requestSender) } @@ -26,13 +29,13 @@ type baseRequestSender struct { var _ requestSender = (*baseRequestSender)(nil) -func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error { +func (b *baseRequestSender) start(context.Context, component.Host) error { return nil } func (b *baseRequestSender) shutdown() {} -func (b *baseRequestSender) send(req internal.Request) error { +func (b *baseRequestSender) send(req *intrequest.Request) error { return b.nextSender.send(req) } @@ -42,30 +45,6 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) { type obsrepSenderFactory func(obsrep *obsExporter) requestSender -// baseRequest is a base implementation for the internal.Request. -type baseRequest struct { - ctx context.Context - processingFinishedCallback func() -} - -func (req *baseRequest) Context() context.Context { - return req.ctx -} - -func (req *baseRequest) SetContext(ctx context.Context) { - req.ctx = ctx -} - -func (req *baseRequest) SetOnProcessingFinished(callback func()) { - req.processingFinishedCallback = callback -} - -func (req *baseRequest) OnProcessingFinished() { - if req.processingFinishedCallback != nil { - req.processingFinishedCallback() - } -} - // Option apply changes to baseExporter. type Option func(*baseExporter) @@ -107,19 +86,34 @@ func WithRetry(retrySettings RetrySettings) Option { func WithQueue(config QueueSettings) Option { return func(o *baseExporter) { if o.requestExporter { - panic("queueing is not available for the new request exporters yet") - } - var queue internal.ProducerConsumerQueue - if config.Enabled { - if config.StorageID == nil { - queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) - } else { - queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) - } + panic("this option is not available for the new request exporters, " + + "use WithMemoryQueue or WithPersistentQueue instead") } - qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger) + factory := persistentqueue.NewFactory(persistentqueue.Config{ + Config: queue.Config{ + Enabled: config.Enabled, + NumConsumers: config.NumConsumers, + QueueSize: config.QueueSize, + }, + StorageID: config.StorageID, + }, o.marshaler, o.unmarshaler) + qs := newQueueSender(o.set.ID, o.signal, o.set.Logger) + qs.queue = factory.Create(queue.Settings{CreateSettings: o.set, DataType: o.signal}) + o.setOnTemporaryFailure(qs.onTemporaryFailure) o.queueSender = qs + } +} + +// WithRequestQueue enables queueing for an exporter. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithRequestQueue(queueFactory queue.Factory) Option { + return func(o *baseExporter) { + qs := newQueueSender(o.set.ID, o.signal, o.set.Logger) + qs.queue = queueFactory.Create(queue.Settings{CreateSettings: o.set, DataType: o.signal}) o.setOnTemporaryFailure(qs.onTemporaryFailure) + o.queueSender = qs } } @@ -138,8 +132,8 @@ type baseExporter struct { component.ShutdownFunc requestExporter bool - marshaler internal.RequestMarshaler - unmarshaler internal.RequestUnmarshaler + marshaler request.Marshaler + unmarshaler request.Unmarshaler signal component.DataType set exporter.CreateSettings @@ -160,8 +154,8 @@ type baseExporter struct { } // TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones. -func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler, - unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { +func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler request.Marshaler, + unmarshaler request.Unmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { obsrep, err := newObsExporter(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments) if err != nil { @@ -192,7 +186,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req } // send sends the request using the first sender in the chain. -func (be *baseExporter) send(req internal.Request) error { +func (be *baseExporter) send(req *intrequest.Request) error { return be.queueSender.send(req) } @@ -210,7 +204,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { } // If no error then start the queueSender. - return be.queueSender.start(ctx, host, be.set) + return be.queueSender.start(ctx, host) } func (be *baseExporter) Shutdown(ctx context.Context) error { diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index c7f8655338a..471f768a127 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -11,6 +11,8 @@ import ( "sync/atomic" "go.opentelemetry.io/collector/component" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" ) // boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue, @@ -20,26 +22,28 @@ type boundedMemoryQueue struct { stopWG sync.WaitGroup size *atomic.Uint32 stopped *atomic.Bool - items chan Request + items chan *intrequest.Request capacity uint32 numConsumers int + callback func(item *intrequest.Request) } // NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). -func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue { +func NewBoundedMemoryQueue(capacity int, numConsumers int) queue.Queue { return &boundedMemoryQueue{ - items: make(chan Request, capacity), + items: make(chan *intrequest.Request, capacity), stopped: &atomic.Bool{}, size: &atomic.Uint32{}, capacity: uint32(capacity), numConsumers: numConsumers, + callback: func(item *intrequest.Request) {}, } } // StartConsumers starts a given number of goroutines consuming items from the queue // and passing them into the consumer callback. -func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error { +func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host) error { var startWG sync.WaitGroup for i := 0; i < q.numConsumers; i++ { q.stopWG.Add(1) @@ -49,7 +53,7 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu defer q.stopWG.Done() for item := range q.items { q.size.Add(^uint32(0)) - set.Callback(item) + q.callback(item) } }() } @@ -57,8 +61,13 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu return nil } +// SetCallback sets the callback function to be called by the consumer when an item is consumed from the queue. +func (q *boundedMemoryQueue) SetCallback(callback func(*intrequest.Request)) { + q.callback = callback +} + // Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow. -func (q *boundedMemoryQueue) Produce(item Request) bool { +func (q *boundedMemoryQueue) Produce(item *intrequest.Request) bool { if q.stopped.Load() { return false } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index 9fe809cf2a2..5246f8c7af4 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -18,39 +18,40 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/exporter/exportertest" ) -func newNopQueueSettings(callback func(item Request)) QueueSettings { - return QueueSettings{ +func newNopQueueSettings() queue.Settings { + return queue.Settings{ CreateSettings: exportertest.NewNopCreateSettings(), DataType: component.DataTypeMetrics, - Callback: callback, } } type stringRequest struct { - Request + request.Request str string } -func newStringRequest(str string) Request { +func newStringRequest(str string) request.Request { return stringRequest{str: str} } // In this test we run a queue with capacity 1 and a single consumer. // We want to test the overflow behavior, so we block the consumer // by holding a startLock before submitting items to the queue. -func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerFn func(item Request))) { - q := NewBoundedMemoryQueue(1, 1) - +func helper(t *testing.T, startConsumers func(q queue.Queue)) { var startLock sync.Mutex startLock.Lock() // block consumers consumerState := newConsumerState(t) - startConsumers(q, func(item Request) { - consumerState.record(item.(stringRequest).str) + q := NewBoundedMemoryQueue(1, 1) + q.SetCallback(func(item *intrequest.Request) { + consumerState.record(item.Request.(stringRequest).str) // block further processing until startLock is released startLock.Lock() @@ -58,7 +59,9 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF startLock.Unlock() }) - assert.True(t, q.Produce(newStringRequest("a"))) + startConsumers(q) + + assert.True(t, q.Produce(intrequest.New(context.Background(), newStringRequest("a")))) // at this point "a" may or may not have been received by the consumer go-routine // so let's make sure it has been @@ -71,10 +74,10 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF }) // produce two more items. The first one should be accepted, but not consumed. - assert.True(t, q.Produce(newStringRequest("b"))) + assert.True(t, q.Produce(intrequest.New(context.Background(), newStringRequest("b")))) assert.Equal(t, 1, q.Size()) // the second should be rejected since the queue is full - assert.False(t, q.Produce(newStringRequest("c"))) + assert.False(t, q.Produce(intrequest.New(context.Background(), newStringRequest("c")))) assert.Equal(t, 1, q.Size()) startLock.Unlock() // unblock consumer @@ -90,18 +93,19 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF "b": true, } for _, item := range []string{"d", "e", "f"} { - assert.True(t, q.Produce(newStringRequest(item))) + assert.True(t, q.Produce(intrequest.New(context.Background(), newStringRequest(item)))) expected[item] = true consumerState.assertConsumed(expected) } q.Stop() - assert.False(t, q.Produce(newStringRequest("x")), "cannot push to closed queue") + assert.False(t, q.Produce(intrequest.New(context.Background(), newStringRequest("x"))), + "cannot push to closed queue") } func TestBoundedQueue(t *testing.T) { - helper(t, func(q ProducerConsumerQueue, consumerFn func(item Request)) { - assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(consumerFn))) + helper(t, func(q queue.Queue) { + assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) }) } @@ -112,29 +116,30 @@ func TestBoundedQueue(t *testing.T) { // only after Stop will mean the consumers are still locked while // trying to perform the final consumptions. func TestShutdownWhileNotEmpty(t *testing.T) { + consumerState := newConsumerState(t) q := NewBoundedMemoryQueue(10, 1) + q.SetCallback(func(req *intrequest.Request) { + consumerState.record(req.Request.(stringRequest).str) + time.Sleep(1 * time.Second) + }) - consumerState := newConsumerState(t) + assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) { - consumerState.record(item.(stringRequest).str) - time.Sleep(1 * time.Second) - }))) - - q.Produce(newStringRequest("a")) - q.Produce(newStringRequest("b")) - q.Produce(newStringRequest("c")) - q.Produce(newStringRequest("d")) - q.Produce(newStringRequest("e")) - q.Produce(newStringRequest("f")) - q.Produce(newStringRequest("g")) - q.Produce(newStringRequest("h")) - q.Produce(newStringRequest("i")) - q.Produce(newStringRequest("j")) + q.Produce(intrequest.New(context.Background(), newStringRequest("a"))) + q.Produce(intrequest.New(context.Background(), newStringRequest("b"))) + q.Produce(intrequest.New(context.Background(), newStringRequest("c"))) + q.Produce(intrequest.New(context.Background(), newStringRequest("d"))) + q.Produce(intrequest.New(context.Background(), newStringRequest("e"))) + q.Produce(intrequest.New(context.Background(), newStringRequest("f"))) + q.Produce(intrequest.New(context.Background(), newStringRequest("g"))) + q.Produce(intrequest.New(context.Background(), newStringRequest("h"))) + q.Produce(intrequest.New(context.Background(), newStringRequest("i"))) + q.Produce(intrequest.New(context.Background(), newStringRequest("j"))) q.Stop() - assert.False(t, q.Produce(newStringRequest("x")), "cannot push to closed queue") + assert.False(t, q.Produce(intrequest.New(context.Background(), newStringRequest("x"))), + "cannot push to closed queue") consumerState.assertConsumed(map[string]bool{ "a": true, "b": true, @@ -198,8 +203,8 @@ func (s *consumerState) assertConsumed(expected map[string]bool) { func TestZeroSize(t *testing.T) { q := NewBoundedMemoryQueue(0, 1) - err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {})) + err := q.Start(context.Background(), componenttest.NewNopHost()) assert.NoError(t, err) - assert.False(t, q.Produce(newStringRequest("a"))) // in process + assert.False(t, q.Produce(intrequest.New(context.Background(), newStringRequest("a")))) // in process } diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index ba1dcc67230..6c2d28d32b5 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -10,6 +10,10 @@ import ( "sync" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -31,8 +35,11 @@ type persistentQueue struct { storage *persistentContiguousStorage capacity uint64 numConsumers int - marshaler RequestMarshaler - unmarshaler RequestUnmarshaler + marshaler request.Marshaler + unmarshaler request.Unmarshaler + set exporter.CreateSettings + dataType component.DataType + callback func(*intrequest.Request) } // buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done @@ -42,8 +49,8 @@ func buildPersistentStorageName(name string, signal component.DataType) string { } // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler, - unmarshaler RequestUnmarshaler) ProducerConsumerQueue { +func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, set queue.Settings, marshaler request.Marshaler, + unmarshaler request.Unmarshaler) queue.Queue { return &persistentQueue{ capacity: uint64(capacity), numConsumers: numConsumers, @@ -51,17 +58,20 @@ func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler: marshaler, unmarshaler: unmarshaler, stopChan: make(chan struct{}), + set: set.CreateSettings, + dataType: set.DataType, + callback: func(req *intrequest.Request) {}, } } // Start starts the persistentQueue with the given number of consumers. -func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set QueueSettings) error { - storageClient, err := toStorageClient(ctx, pq.storageID, host, set.ID, set.DataType) +func (pq *persistentQueue) Start(ctx context.Context, host component.Host) error { + storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, pq.dataType) if err != nil { return err } - storageName := buildPersistentStorageName(set.ID.Name(), set.DataType) - pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler) + storageName := buildPersistentStorageName(pq.set.ID.Name(), pq.dataType) + pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, pq.set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler) for i := 0; i < pq.numConsumers; i++ { pq.stopWG.Add(1) go func() { @@ -69,7 +79,7 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q for { select { case req := <-pq.storage.get(): - set.Callback(req) + pq.callback(req) case <-pq.stopChan: return } @@ -80,8 +90,8 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q } // Produce adds an item to the queue and returns true if it was accepted -func (pq *persistentQueue) Produce(item Request) bool { - err := pq.storage.put(item) +func (pq *persistentQueue) Produce(req *intrequest.Request) bool { + err := pq.storage.put(req) return err == nil } @@ -108,6 +118,10 @@ func (pq *persistentQueue) IsPersistent() bool { return true } +func (pq *persistentQueue) SetCallback(callback func(*intrequest.Request)) { + pq.callback = callback +} + func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) { extension, err := getStorageExtension(host.GetExtensions(), storageID) if err != nil { diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 08570e92342..217aade1075 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -16,6 +16,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/pdata/pcommon" @@ -32,13 +34,14 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { } // createTestQueue creates and starts a fake queue with the given capacity and number of consumers. -func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item Request)) ProducerConsumerQueue { - pq := NewPersistentQueue(capacity, numConsumers, component.ID{}, newFakeTracesRequestMarshalerFunc(), - newFakeTracesRequestUnmarshalerFunc()) +func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item *intrequest.Request)) queue.Queue { + pq := NewPersistentQueue(capacity, numConsumers, component.ID{}, newNopQueueSettings(), + newFakeTracesRequestMarshalerFunc(), newFakeTracesRequestUnmarshalerFunc()) + pq.SetCallback(callback) host := &mockHost{ext: map[component.ID]component.Component{ {}: NewMockStorageExtension(nil), }} - err := pq.Start(context.Background(), host, newNopQueueSettings(callback)) + err := pq.Start(context.Background(), host) require.NoError(t, err) t.Cleanup(pq.Stop) return pq @@ -46,12 +49,12 @@ func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(ite func TestPersistentQueue_Capacity(t *testing.T) { for i := 0; i < 100; i++ { - pq := NewPersistentQueue(5, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(), - newFakeTracesRequestUnmarshalerFunc()) + pq := NewPersistentQueue(5, 1, component.ID{}, newNopQueueSettings(), + newFakeTracesRequestMarshalerFunc(), newFakeTracesRequestUnmarshalerFunc()) host := &mockHost{ext: map[component.ID]component.Component{ {}: NewMockStorageExtension(nil), }} - err := pq.Start(context.Background(), host, newNopQueueSettings(func(req Request) {})) + err := pq.Start(context.Background(), host) require.NoError(t, err) // Stop consumer to imitate queue overflow @@ -64,7 +67,7 @@ func TestPersistentQueue_Capacity(t *testing.T) { req := newFakeTracesRequest(traces) for i := 0; i < 10; i++ { - result := pq.Produce(req) + result := pq.Produce(intrequest.New(context.Background(), req)) if i < 6 { assert.True(t, result) } else { @@ -85,12 +88,12 @@ func TestPersistentQueue_Capacity(t *testing.T) { } func TestPersistentQueue_Close(t *testing.T) { - wq := createTestQueue(t, 1001, 100, func(item Request) {}) + wq := createTestQueue(t, 1001, 100, func(item *intrequest.Request) {}) traces := newTraces(1, 10) req := newFakeTracesRequest(traces) for i := 0; i < 1000; i++ { - wq.Produce(req) + wq.Produce(intrequest.New(context.Background(), req)) } // This will close the queue very quickly, consumers might not be able to consume anything and should finish gracefully assert.NotPanics(t, func() { @@ -104,14 +107,14 @@ func TestPersistentQueue_Close(t *testing.T) { // Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { - wq := createTestQueue(t, 1001, 1, func(item Request) {}) + wq := createTestQueue(t, 1001, 1, func(item *intrequest.Request) {}) traces := newTraces(1, 10) lastRequestProcessedTime := time.Now() - req := newFakeTracesRequest(traces) - req.processingFinishedCallback = func() { + req := intrequest.New(context.Background(), newFakeTracesRequest(traces)) + req.SetOnProcessingFinished(func() { lastRequestProcessedTime = time.Now() - } + }) fnBefore := stopStorage stopStorageTime := time.Now() @@ -163,14 +166,14 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { req := newFakeTracesRequest(traces) numMessagesConsumed := &atomic.Int32{} - tq := createTestQueue(t, 1000, c.numConsumers, func(item Request) { + tq := createTestQueue(t, 1000, c.numConsumers, func(item *intrequest.Request) { if item != nil { numMessagesConsumed.Add(int32(1)) } }) for i := 0; i < c.numMessagesProduced; i++ { - tq.Produce(req) + tq.Produce(intrequest.New(context.Background(), req)) } assert.Eventually(t, func() bool { diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 49f204bbb0d..4f0ef132f12 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -13,6 +13,8 @@ import ( "go.uber.org/zap" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -42,15 +44,15 @@ type persistentContiguousStorage struct { logger *zap.Logger queueName string client storage.Client - unmarshaler RequestUnmarshaler - marshaler RequestMarshaler + unmarshaler intrequest.Unmarshaler + marshaler intrequest.Marshaler putChan chan struct{} stopChan chan struct{} stopOnce sync.Once capacity uint64 - reqChan chan Request + reqChan chan *intrequest.Request mu sync.Mutex readIndex itemIndex @@ -82,18 +84,26 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. func newPersistentContiguousStorage(ctx context.Context, queueName string, client storage.Client, - logger *zap.Logger, capacity uint64, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { + logger *zap.Logger, capacity uint64, marshaler request.Marshaler, unmarshaler request.Unmarshaler) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: logger, - client: client, - queueName: queueName, - unmarshaler: unmarshaler, - marshaler: marshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), - reqChan: make(chan Request), - stopChan: make(chan struct{}), - itemsCount: &atomic.Uint64{}, + logger: logger, + client: client, + queueName: queueName, + marshaler: func(req *intrequest.Request) ([]byte, error) { + return marshaler(req.Request) + }, + unmarshaler: func(b []byte) (*intrequest.Request, error) { + req, err := unmarshaler(b) + if err != nil { + return nil, err + } + return intrequest.New(context.Background(), req), nil + }, + capacity: capacity, + putChan: make(chan struct{}, capacity), + reqChan: make(chan *intrequest.Request), + stopChan: make(chan struct{}), + itemsCount: &atomic.Uint64{}, } initPersistentContiguousStorage(ctx, pcs) @@ -145,7 +155,7 @@ func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContigu pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex)) } -func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []Request) { +func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []*intrequest.Request) { if len(reqs) > 0 { errCount := 0 for _, req := range reqs { @@ -183,7 +193,7 @@ func (pcs *persistentContiguousStorage) loop() { } // get returns the request channel that all the requests will be send on -func (pcs *persistentContiguousStorage) get() <-chan Request { +func (pcs *persistentContiguousStorage) get() <-chan *intrequest.Request { return pcs.reqChan } @@ -203,7 +213,7 @@ func (pcs *persistentContiguousStorage) stop() { } // put marshals the request and puts it into the persistent queue -func (pcs *persistentContiguousStorage) put(req Request) error { +func (pcs *persistentContiguousStorage) put(req *intrequest.Request) error { // Nil requests are ignored if req == nil { return nil @@ -231,7 +241,7 @@ func (pcs *persistentContiguousStorage) put(req Request) error { } // getNextItem pulls the next available item from the persistent storage; if none is found, returns (nil, false) -func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Request, bool) { +func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (*intrequest.Request, bool) { pcs.mu.Lock() defer pcs.mu.Unlock() @@ -244,7 +254,7 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques pcs.updateReadIndex(ctx) pcs.itemDispatchingStart(ctx, index) - var req Request + var req *intrequest.Request batch, err := newBatch(pcs).get(pcs.itemKey(index)).execute(ctx) if err == nil { req, err = batch.getRequestResult(pcs.itemKey(index)) @@ -278,8 +288,8 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques // retrieveNotDispatchedReqs gets the items for which sending was not finished, cleans the storage // and moves the items back to the queue. The function returns an array which might contain nils // if unmarshalling of the value at a given index was not possible. -func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []Request { - var reqs []Request +func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []*intrequest.Request { + var reqs []*intrequest.Request var dispatchedItems []itemIndex pcs.mu.Lock() @@ -302,7 +312,7 @@ func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Co pcs.logger.Debug("No items left for dispatch by consumers") } - reqs = make([]Request, len(dispatchedItems)) + reqs = make([]*intrequest.Request, len(dispatchedItems)) keys := make([]string, len(dispatchedItems)) retrieveBatch := newBatch(pcs) cleanupBatch := newBatch(pcs) diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index a80ba93c5c3..2fb10e8a80f 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -93,7 +94,7 @@ func (bof *batchStruct) getResult(key string, unmarshal func([]byte) (any, error // getRequestResult returns the result of a Get operation as a request // If the value cannot be retrieved, it returns an error -func (bof *batchStruct) getRequestResult(key string) (Request, error) { +func (bof *batchStruct) getRequestResult(key string) (*intrequest.Request, error) { reqIf, err := bof.getResult(key, bof.bytesToRequest) if err != nil { return nil, err @@ -102,7 +103,7 @@ func (bof *batchStruct) getRequestResult(key string) (Request, error) { return nil, errValueNotSet } - return reqIf.(Request), nil + return reqIf.(*intrequest.Request), nil } // getItemIndexResult returns the result of a Get operation as an itemIndex @@ -136,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) } // setRequest adds Set operation over a given request to the batch -func (bof *batchStruct) setRequest(key string, value Request) *batchStruct { +func (bof *batchStruct) setRequest(key string, value *intrequest.Request) *batchStruct { return bof.set(key, value, bof.requestToBytes) } @@ -207,7 +208,7 @@ func bytesToItemIndexArray(b []byte) (any, error) { } func (bof *batchStruct) requestToBytes(req any) ([]byte, error) { - return bof.pcs.marshaler(req.(Request)) + return bof.pcs.marshaler(req.(*intrequest.Request)) } func (bof *batchStruct) bytesToRequest(b []byte) (any, error) { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index d48bbd75285..e703ff1f11c 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -18,6 +18,8 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -41,29 +43,25 @@ func createTestPersistentStorage(client storage.Client) *persistentContiguousSto } type fakeTracesRequest struct { - td ptrace.Traces - processingFinishedCallback func() - Request + td ptrace.Traces } -func newFakeTracesRequest(td ptrace.Traces) *fakeTracesRequest { - return &fakeTracesRequest{ - td: td, - } +func (ftr *fakeTracesRequest) Export(context.Context) error { + return nil } -func (fd *fakeTracesRequest) OnProcessingFinished() { - if fd.processingFinishedCallback != nil { - fd.processingFinishedCallback() - } +func (ftr *fakeTracesRequest) ItemsCount() int { + return ftr.td.SpanCount() } -func (fd *fakeTracesRequest) SetOnProcessingFinished(callback func()) { - fd.processingFinishedCallback = callback +func newFakeTracesRequest(td ptrace.Traces) *fakeTracesRequest { + return &fakeTracesRequest{ + td: td, + } } -func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { - return func(bytes []byte) (Request, error) { +func newFakeTracesRequestUnmarshalerFunc() request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { unmarshaler := ptrace.ProtoUnmarshaler{} traces, err := unmarshaler.UnmarshalTraces(bytes) if err != nil { @@ -73,8 +71,8 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } } -func newFakeTracesRequestMarshalerFunc() RequestMarshaler { - return func(req Request) ([]byte, error) { +func newFakeTracesRequestMarshalerFunc() request.Marshaler { + return func(req request.Request) ([]byte, error) { marshaler := ptrace.ProtoMarshaler{} return marshaler.MarshalTraces(req.(*fakeTracesRequest).td) } @@ -153,7 +151,7 @@ func TestPersistentStorage_CorruptedData(t *testing.T) { // Put some items, make sure they are loaded and shutdown the storage... for i := 0; i < 3; i++ { - err := ps.put(req) + err := ps.put(intrequest.New(context.Background(), req)) require.NoError(t, err) } require.Eventually(t, func() bool { @@ -203,7 +201,7 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { ps := createTestPersistentStorage(client) for i := 0; i < 5; i++ { - err := ps.put(req) + err := ps.put(intrequest.New(context.Background(), req)) require.NoError(t, err) } @@ -212,7 +210,7 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { // Now, this will take item 0 and pull item 1 into the unbuffered channel readReq := <-ps.get() - assert.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + assert.Equal(t, req.td, readReq.Request.(*fakeTracesRequest).td) requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1}) // This takes item 1 from channel and pulls another one (item 2) into the unbuffered channel @@ -263,7 +261,7 @@ func TestPersistentStorage_StartWithNonDispatched(t *testing.T) { logger := zap.NewNop() traces := newTraces(5, 10) - req := newFakeTracesRequest(traces) + req := intrequest.New(context.Background(), newFakeTracesRequest(traces)) ext := NewMockStorageExtension(nil) client := createTestClient(ext) @@ -307,9 +305,9 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) { require.Equal(t, uint64(0), ps.size()) // Put two elements - err := ps.put(req) + err := ps.put(intrequest.New(context.Background(), req)) require.NoError(t, err) - err = ps.put(req) + err = ps.put(intrequest.New(context.Background(), req)) require.NoError(t, err) err = ext.Shutdown(context.Background()) @@ -326,10 +324,10 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) { // Lets read both of the elements we put readReq := <-ps.get() - require.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + require.Equal(t, req.td, readReq.Request.(*fakeTracesRequest).td) readReq = <-ps.get() - require.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + require.Equal(t, req.td, readReq.Request.(*fakeTracesRequest).td) require.Equal(t, uint64(0), ps.size()) err = ext.Shutdown(context.Background()) @@ -338,7 +336,7 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) { // No more items ext := NewMockStorageExtension(nil) - wq := createTestQueue(t, 1000, 1, func(Request) {}) + wq := createTestQueue(t, 1000, 1, func(*intrequest.Request) {}) require.Equal(t, 0, wq.Size()) require.NoError(t, ext.Shutdown(context.Background())) } @@ -390,7 +388,7 @@ func BenchmarkPersistentStorage_TraceSpans(b *testing.B) { bb.ResetTimer() for i := 0; i < bb.N; i++ { - err := ps.put(req) + err := ps.put(intrequest.New(context.Background(), req)) require.NoError(bb, err) } @@ -464,7 +462,7 @@ func TestPersistentStorage_StorageFull(t *testing.T) { // Put enough items in to fill the underlying storage reqCount := 0 for { - err = ps.put(req) + err = ps.put(intrequest.New(context.Background(), req)) if errors.Is(err, syscall.ENOSPC) { break } @@ -477,7 +475,7 @@ func TestPersistentStorage_StorageFull(t *testing.T) { client.SetMaxSizeInBytes(newMaxSize) // Try to put an item in, should fail - err = ps.put(req) + err = ps.put(intrequest.New(context.Background(), req)) require.Error(t, err) // Take out all the items @@ -488,7 +486,7 @@ func TestPersistentStorage_StorageFull(t *testing.T) { // We should be able to put a new item in // However, this will fail if deleting items fails with full storage - err = ps.put(req) + err = ps.put(intrequest.New(context.Background(), req)) require.NoError(t, err) } diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go deleted file mode 100644 index 454a42782ce..00000000000 --- a/exporter/exporterhelper/internal/request.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - -import "context" - -// Request defines capabilities required for persistent storage of a request -type Request interface { - // Context returns the context.Context of the requests. - Context() context.Context - - // SetContext updates the context.Context of the requests. - SetContext(context.Context) - - Export(ctx context.Context) error - - // OnError returns a new Request may contain the items left to be sent if some items failed to process and can be retried. - // Otherwise, it should return the original Request. - OnError(error) Request - - // Count returns the count of spans/metric points or log records. - Count() int - - // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished - OnProcessingFinished() - - // SetOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue) - SetOnProcessingFinished(callback func()) -} - -// RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request -type RequestUnmarshaler func([]byte) (Request, error) - -// RequestMarshaler defines a function which takes a request and marshals it into a byte slice -type RequestMarshaler func(Request) ([]byte, error) diff --git a/exporter/exporterhelper/internal/request/request.go b/exporter/exporterhelper/internal/request/request.go new file mode 100644 index 00000000000..23fd77ec156 --- /dev/null +++ b/exporter/exporterhelper/internal/request/request.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + +import ( + "context" + + "go.opentelemetry.io/collector/exporter/exporterhelper/request" +) + +// Request is a wrapper around request.Request which adds context and a callback to be called when the request +// is finished processing. +type Request struct { + request.Request + ctx context.Context + onProcessingFinishedCallback func() +} + +func New(ctx context.Context, req request.Request) *Request { + return &Request{ + Request: req, + ctx: ctx, + } +} + +// Context returns the context.Context of the requests. +func (req *Request) Context() context.Context { + return req.ctx +} + +// SetContext updates the context.Context of the request. +func (req *Request) SetContext(ctx context.Context) { + req.ctx = ctx +} + +func (req *Request) OnError(err error) *Request { + if r, ok := req.Request.(request.ErrorHandler); ok { + return New(req.ctx, r.OnError(err)) + } + return req +} + +// Count returns a number of items in the request. If the request does not implement RequestItemsCounter +// then 0 is returned. +func (req *Request) Count() int { + if counter, ok := req.Request.(request.ItemsCounter); ok { + return counter.ItemsCount() + } + return 0 +} + +// OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished +func (req *Request) OnProcessingFinished() { + if req.onProcessingFinishedCallback != nil { + req.onProcessingFinishedCallback() + } +} + +// SetOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue) +func (req *Request) SetOnProcessingFinished(callback func()) { + req.onProcessingFinishedCallback = callback +} + +// Unmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request +type Unmarshaler func([]byte) (*Request, error) + +// Marshaler defines a function which takes a request and marshals it into a byte slice +type Marshaler func(*Request) ([]byte, error) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b098e722921..e1fbf3048f8 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -6,14 +6,15 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" "errors" - + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/plog" ) @@ -21,37 +22,35 @@ var logsMarshaler = &plog.ProtoMarshaler{} var logsUnmarshaler = &plog.ProtoUnmarshaler{} type logsRequest struct { - baseRequest ld plog.Logs pusher consumer.ConsumeLogsFunc } -func newLogsRequest(ctx context.Context, ld plog.Logs, pusher consumer.ConsumeLogsFunc) internal.Request { +func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) request.Request { return &logsRequest{ - baseRequest: baseRequest{ctx: ctx}, - ld: ld, - pusher: pusher, + ld: ld, + pusher: pusher, } } -func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { +func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { logs, err := logsUnmarshaler.UnmarshalLogs(bytes) if err != nil { return nil, err } - return newLogsRequest(context.Background(), logs, pusher), nil + return newLogsRequest(logs, pusher), nil } } -func logsRequestMarshaler(req internal.Request) ([]byte, error) { +func logsRequestMarshaler(req request.Request) ([]byte, error) { return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) } -func (req *logsRequest) OnError(err error) internal.Request { +func (req *logsRequest) OnError(err error) request.Request { var logError consumererror.Logs if errors.As(err, &logError) { - return newLogsRequest(req.ctx, logError.Data(), req.pusher) + return newLogsRequest(logError.Data(), req.pusher) } return req } @@ -60,7 +59,7 @@ func (req *logsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.ld) } -func (req *logsRequest) Count() int { +func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } @@ -96,7 +95,7 @@ func NewLogsExporter( } lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { - req := newLogsRequest(ctx, ld, pusher) + req := intrequest.New(ctx, newLogsRequest(ld, pusher)) serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) @@ -115,7 +114,7 @@ func NewLogsExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type LogsConverter interface { // RequestFromLogs converts plog.Logs data into a request. - RequestFromLogs(context.Context, plog.Logs) (Request, error) + RequestFromLogs(context.Context, plog.Logs) (request.Request, error) } // NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender. @@ -148,7 +147,7 @@ func NewLogsRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - r := newRequest(ctx, req) + r := intrequest.New(ctx, req) sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count())) @@ -171,7 +170,7 @@ func newLogsExporterWithObservability(obsrep *obsExporter) requestSender { return &logsExporterWithObservability{obsrep: obsrep} } -func (lewo *logsExporterWithObservability) send(req internal.Request) error { +func (lewo *logsExporterWithObservability) send(req *intrequest.Request) error { req.SetContext(lewo.obsrep.StartLogsOp(req.Context())) err := lewo.nextSender.send(req) lewo.obsrep.EndLogsOp(req.Context(), req.Count(), err) diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index ad4b130401c..505f62da210 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" @@ -41,12 +42,13 @@ var ( ) func TestLogsRequest(t *testing.T) { - lr := newLogsRequest(context.Background(), testdata.GenerateLogs(1), nil) + ctx := context.Background() + lr := intrequest.New(ctx, newLogsRequest(testdata.GenerateLogs(1), nil)) logErr := consumererror.NewLogs(errors.New("some error"), plog.NewLogs()) assert.EqualValues( t, - newLogsRequest(context.Background(), plog.NewLogs(), nil), + intrequest.New(ctx, newLogsRequest(plog.NewLogs(), nil)), lr.OnError(logErr), ) } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index e3f09f361c7..34d2c9dd92e 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -6,14 +6,14 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" "errors" - "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -21,37 +21,35 @@ var metricsMarshaler = &pmetric.ProtoMarshaler{} var metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} type metricsRequest struct { - baseRequest md pmetric.Metrics pusher consumer.ConsumeMetricsFunc } -func newMetricsRequest(ctx context.Context, md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) internal.Request { +func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) request.Request { return &metricsRequest{ - baseRequest: baseRequest{ctx: ctx}, - md: md, - pusher: pusher, + md: md, + pusher: pusher, } } -func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { +func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { metrics, err := metricsUnmarshaler.UnmarshalMetrics(bytes) if err != nil { return nil, err } - return newMetricsRequest(context.Background(), metrics, pusher), nil + return newMetricsRequest(metrics, pusher), nil } } -func metricsRequestMarshaler(req internal.Request) ([]byte, error) { +func metricsRequestMarshaler(req request.Request) ([]byte, error) { return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) } -func (req *metricsRequest) OnError(err error) internal.Request { +func (req *metricsRequest) OnError(err error) request.Request { var metricsError consumererror.Metrics if errors.As(err, &metricsError) { - return newMetricsRequest(req.ctx, metricsError.Data(), req.pusher) + return newMetricsRequest(metricsError.Data(), req.pusher) } return req } @@ -60,7 +58,7 @@ func (req *metricsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.md) } -func (req *metricsRequest) Count() int { +func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } @@ -96,7 +94,7 @@ func NewMetricsExporter( } mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { - req := newMetricsRequest(ctx, md, pusher) + req := intrequest.New(ctx, newMetricsRequest(md, pusher)) serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count())) @@ -115,7 +113,7 @@ func NewMetricsExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type MetricsConverter interface { // RequestFromMetrics converts pdata.Metrics into a request. - RequestFromMetrics(context.Context, pmetric.Metrics) (Request, error) + RequestFromMetrics(context.Context, pmetric.Metrics) (request.Request, error) } // NewMetricsRequestExporter creates a new metrics exporter based on a custom MetricsConverter and RequestSender. @@ -148,7 +146,7 @@ func NewMetricsRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - r := newRequest(ctx, req) + r := intrequest.New(ctx, req) sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count())) @@ -171,7 +169,7 @@ func newMetricsSenderWithObservability(obsrep *obsExporter) requestSender { return &metricsSenderWithObservability{obsrep: obsrep} } -func (mewo *metricsSenderWithObservability) send(req internal.Request) error { +func (mewo *metricsSenderWithObservability) send(req *intrequest.Request) error { req.SetContext(mewo.obsrep.StartMetricsOp(req.Context())) err := mewo.nextSender.send(req) mewo.obsrep.EndMetricsOp(req.Context(), req.Count(), err) diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 934db88cce5..99475ad60f5 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" @@ -41,12 +42,13 @@ var ( ) func TestMetricsRequest(t *testing.T) { - mr := newMetricsRequest(context.Background(), testdata.GenerateMetrics(1), nil) + ctx := context.Background() + mr := intrequest.New(ctx, newMetricsRequest(testdata.GenerateMetrics(1), nil)) metricsErr := consumererror.NewMetrics(errors.New("some error"), pmetric.NewMetrics()) assert.EqualValues( t, - newMetricsRequest(context.Background(), pmetric.NewMetrics(), nil), + intrequest.New(ctx, newMetricsRequest(pmetric.NewMetrics(), nil)), mr.OnError(metricsErr), ) } diff --git a/exporter/exporterhelper/queue/config.go b/exporter/exporterhelper/queue/config.go new file mode 100644 index 00000000000..bdaf9852f04 --- /dev/null +++ b/exporter/exporterhelper/queue/config.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + +import "errors" + +// Config defines configuration for queueing requests before exporting. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Config struct { + // Enabled indicates whether to not enqueue batches before exporting. + Enabled bool `mapstructure:"enabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueSize is the maximum number of batches allowed in queue at a given time. + // This field is left for backward compatibility with QueueSettings. + // Later, it will be replaced with size fields specified explicitly in terms of items or batches. + QueueSize int `mapstructure:"queue_size"` +} + +// NewDefaultConfig returns the default QueueConfig. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultConfig() Config { + return Config{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1000, + } +} + +// Validate checks if the QueueSettings configuration is valid +func (qCfg *Config) Validate() error { + if !qCfg.Enabled { + return nil + } + if qCfg.NumConsumers <= 0 { + return errors.New("number of consumers must be positive") + } + if qCfg.QueueSize <= 0 { + return errors.New("queue size must be positive") + } + return nil +} diff --git a/exporter/exporterhelper/queue/factory.go b/exporter/exporterhelper/queue/factory.go new file mode 100644 index 00000000000..82aa4f49440 --- /dev/null +++ b/exporter/exporterhelper/queue/factory.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" +) + +// Settings defines parameters for creating a Queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Settings struct { + exporter.CreateSettings + DataType component.DataType +} + +// Factory defines a factory interface for creating a Queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Factory interface { + Create(Settings) Queue +} diff --git a/exporter/exporterhelper/queue/memoryqueue/config.go b/exporter/exporterhelper/queue/memoryqueue/config.go new file mode 100644 index 00000000000..a23a460d546 --- /dev/null +++ b/exporter/exporterhelper/queue/memoryqueue/config.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package memoryqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue/memoryqueue" + +import ( + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" +) + +// Config defines configuration for queueing requests before exporting using a memory storage. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Config struct { + queue.Config `mapstructure:",squash"` +} + +// NewDefaultConfig returns the default Config. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultConfig() Config { + return Config{ + Config: queue.NewDefaultConfig(), + } +} diff --git a/exporter/exporterhelper/queue/memoryqueue/factory.go b/exporter/exporterhelper/queue/memoryqueue/factory.go new file mode 100644 index 00000000000..578ec28d831 --- /dev/null +++ b/exporter/exporterhelper/queue/memoryqueue/factory.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package memoryqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue/memoryqueue" + +import ( + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" +) + +type factory struct { + cfg Config +} + +// NewFactory returns a factory of memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewFactory(cfg Config) queue.Factory { + return &factory{cfg: cfg} +} + +// Create creates a memory queue based on the given settings. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func (f *factory) Create(_ queue.Settings) queue.Queue { + if !f.cfg.Enabled { + return nil + } + return internal.NewBoundedMemoryQueue(f.cfg.QueueSize, f.cfg.NumConsumers) +} + +var _ queue.Factory = (*factory)(nil) diff --git a/exporter/exporterhelper/queue/persistentqueue/config.go b/exporter/exporterhelper/queue/persistentqueue/config.go new file mode 100644 index 00000000000..8a02dde2115 --- /dev/null +++ b/exporter/exporterhelper/queue/persistentqueue/config.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" +) + +// Config defines configuration for queueing requests before exporting using a persistent storage. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter and will replace +// QueueSettings in the future. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Config struct { + queue.Config `mapstructure:",squash"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *component.ID `mapstructure:"storage"` +} + +// NewDefaultConfig returns the default Config. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultConfig() Config { + return Config{ + Config: queue.NewDefaultConfig(), + } +} diff --git a/exporter/exporterhelper/queue/persistentqueue/factory.go b/exporter/exporterhelper/queue/persistentqueue/factory.go new file mode 100644 index 00000000000..00e1a99b4f0 --- /dev/null +++ b/exporter/exporterhelper/queue/persistentqueue/factory.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue" + +import ( + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue/memoryqueue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" +) + +type factory struct { + cfg Config + marshaler request.Marshaler + unmarshaler request.Unmarshaler +} + +// NewFactory returns a factory of persistent queue. +// If cfg.StorageID is nil then it falls back to memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewFactory(cfg Config, marshaler request.Marshaler, unmarshaler request.Unmarshaler) queue.Factory { + if cfg.StorageID == nil { + return memoryqueue.NewFactory(memoryqueue.Config{Config: cfg.Config}) + } + return &factory{cfg: cfg, marshaler: marshaler, unmarshaler: unmarshaler} +} + +// Create creates a persistent queue based on the given settings. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func (f *factory) Create(set queue.Settings) queue.Queue { + if !f.cfg.Enabled { + return nil + } + return internal.NewPersistentQueue(f.cfg.QueueSize, f.cfg.NumConsumers, *f.cfg.StorageID, set, f.marshaler, f.unmarshaler) +} + +var _ queue.Factory = (*factory)(nil) diff --git a/exporter/exporterhelper/internal/producer_consumer_queue.go b/exporter/exporterhelper/queue/queue.go similarity index 50% rename from exporter/exporterhelper/internal/producer_consumer_queue.go rename to exporter/exporterhelper/queue/queue.go index 7b17106a564..7b24f904321 100644 --- a/exporter/exporterhelper/internal/producer_consumer_queue.go +++ b/exporter/exporterhelper/queue/queue.go @@ -1,32 +1,29 @@ // Copyright The OpenTelemetry Authors -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue" import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" ) -type QueueSettings struct { - exporter.CreateSettings - DataType component.DataType - Callback func(item Request) -} - -// ProducerConsumerQueue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue +// Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue // (boundedMemoryQueue) or via a disk-based queue (persistentQueue) -type ProducerConsumerQueue interface { +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Queue interface { // Start starts the queue with a given number of goroutines consuming items from the queue // and passing them into the consumer callback. - Start(ctx context.Context, host component.Host, set QueueSettings) error + Start(ctx context.Context, host component.Host) error + // SetCallback sets the callback function to be called by the consumer when an item is consumed from the queue. + // Should be called before Start. + SetCallback(callback func(*intrequest.Request)) // Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added // to the queue due to queue overflow. - Produce(item Request) bool + Produce(item *intrequest.Request) bool // Size returns the current Size of the queue Size() int // Stop stops all consumers, as well as the length reporter if started, @@ -35,6 +32,6 @@ type ProducerConsumerQueue interface { // Capacity returns the capacity of the queue. Capacity() int // IsPersistent returns true if the queue is persistent. - // TODO: Do not expose this method if the interface moves to a public package. + // TODO: Remove this method once we move it to the config. IsPersistent() bool } diff --git a/exporter/exporterhelper/queue/queue_test.go b/exporter/exporterhelper/queue/queue_test.go new file mode 100644 index 00000000000..911658f6e50 --- /dev/null +++ b/exporter/exporterhelper/queue/queue_test.go @@ -0,0 +1,22 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueueConfig_Validate(t *testing.T) { + qCfg := NewDefaultConfig() + assert.NoError(t, qCfg.Validate()) + + qCfg.QueueSize = 0 + assert.EqualError(t, qCfg.Validate(), "queue size must be positive") + + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) +} diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index d3da29500fe..abf94a3f8e6 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "time" "go.opencensus.io/metric/metricdata" @@ -15,8 +16,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -67,26 +67,23 @@ type queueSender struct { fullName string id component.ID signal component.DataType - queue internal.ProducerConsumerQueue + queue queue.Queue traceAttribute attribute.KeyValue logger *zap.Logger requeuingEnabled bool } -func newQueueSender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, logger *zap.Logger) *queueSender { +func newQueueSender(id component.ID, signal component.DataType, logger *zap.Logger) *queueSender { return &queueSender{ fullName: id.String(), id: id, signal: signal, - queue: queue, traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), logger: logger, - // TODO: this can be further exposed as a config param rather than relying on a type of queue - requeuingEnabled: queue != nil && queue.IsPersistent(), } } -func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { +func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req *intrequest.Request, err error) error { if !qs.requeuingEnabled || qs.queue == nil { logger.Error( "Exporting failed. No more retries left. Dropping data.", @@ -112,19 +109,12 @@ func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Reque } // start is invoked during service startup. -func (qs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { +func (qs *queueSender) start(ctx context.Context, host component.Host) error { if qs.queue == nil { return nil } - err := qs.queue.Start(ctx, host, internal.QueueSettings{ - CreateSettings: set, - DataType: qs.signal, - Callback: func(item internal.Request) { - _ = qs.nextSender.send(item) - item.OnProcessingFinished() - }, - }) + err := qs.queue.Start(ctx, host) if err != nil { return err } @@ -143,6 +133,9 @@ func (qs *queueSender) start(ctx context.Context, host component.Host, set expor return fmt.Errorf("failed to create retry queue capacity metric: %w", err) } + // TODO: this can be further exposed as a config param rather than relying on a type of queue + qs.requeuingEnabled = qs.queue.IsPersistent() + return nil } @@ -161,7 +154,7 @@ func (qs *queueSender) shutdown() { } // send implements the requestSender interface -func (qs *queueSender) send(req internal.Request) error { +func (qs *queueSender) send(req *intrequest.Request) error { if qs.queue == nil { err := qs.nextSender.send(req) if err != nil { @@ -191,6 +184,16 @@ func (qs *queueSender) send(req internal.Request) error { return nil } +func (qs *queueSender) setNextSender(nextSender requestSender) { + qs.nextSender = nextSender + if qs.queue != nil { + qs.queue.SetCallback(func(req *intrequest.Request) { + _ = nextSender.send(req) + req.OnProcessingFinished() + }) + } +} + type noCancellationContext struct { context.Context } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 84cccb6635e..6f7af5fea9a 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -18,6 +18,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/obsreport/obsreporttest" @@ -39,10 +41,10 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { }) // Enqueue another request to ensure when calling shutdown we drain the queue. - secondMockR := newMockRequest(context.Background(), 3, nil) + secondMockR := newMockRequest(3, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(secondMockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), secondMockR))) }) require.LessOrEqual(t, 1, be.queueSender.(*queueSender).queue.Size()) @@ -69,10 +71,10 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) cancelFunc() - mockR := newMockRequest(ctx, 2, nil) + mockR := newMockRequest(2, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(ctx, mockR))) }) ocs.awaitAsyncProcessing() @@ -91,7 +93,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) - require.Error(t, be.send(newMockRequest(context.Background(), 2, nil))) + require.Error(t, be.send(intrequest.New(context.Background(), newMockRequest(2, nil)))) } func TestQueuedRetryHappyPath(t *testing.T) { @@ -114,9 +116,9 @@ func TestQueuedRetryHappyPath(t *testing.T) { reqs := make([]*mockRequest, 0, 10) for i := 0; i < wantRequests; i++ { ocs.run(func() { - req := newMockRequest(context.Background(), 2, nil) + req := newMockRequest(2, nil) reqs = append(reqs, req) - require.NoError(t, be.send(req)) + require.NoError(t, be.send(intrequest.New(context.Background(), req))) }) } @@ -187,17 +189,17 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) - be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + be.queueSender.(*queueSender).requeuingEnabled = true t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(context.Background(), 1, traceErr) + mockR := newMockRequest(1, traceErr) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing }) ocs.awaitAsyncProcessing() @@ -224,9 +226,9 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { }) traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(context.Background(), 1, traceErr) + mockR := newMockRequest(1, traceErr) - require.Error(t, be.retrySender.send(mockR), "sending_queue is full") + require.Error(t, be.retrySender.send(intrequest.New(context.Background(), mockR)), "sending_queue is full") mockR.checkNumRequests(t, 1) } @@ -238,10 +240,10 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + mockR := newMockRequest(2, errors.New("some error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.Error(t, be.send(mockR)) + require.Error(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) @@ -306,7 +308,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered - req := newMockRequest(context.Background(), 3, errors.New("some error")) + req := newMockRequest(3, errors.New("some error")) be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -315,8 +317,8 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { // wraps original queue so we can count operations be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{ - ProducerConsumerQueue: be.queueSender.(*queueSender).queue, - produceCounter: produceCounter, + Queue: be.queueSender.(*queueSender).queue, + produceCounter: produceCounter, } be.queueSender.(*queueSender).requeuingEnabled = true @@ -326,7 +328,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { }) // Invoke queuedRetrySender so the producer will put the item for consumer to poll - require.NoError(t, be.send(req)) + require.NoError(t, be.send(intrequest.New(context.Background(), req))) // first wait for the item to be produced to the queue initially assert.Eventually(t, func() bool { @@ -340,6 +342,30 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { }, time.Second, 1*time.Millisecond) } +func TestPersistentQueueRetryPersistenceEnabledStorageError(t *testing.T) { + storageError := errors.New("could not get storage client") + tt, err := obsreporttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := persistentqueue.NewDefaultConfig() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := NewDefaultRetrySettings() + set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()} + be, err := newBaseExporter(set, "", true, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), + WithRequestQueue(persistentqueue.NewFactory(qCfg, fakeRequestMarshaler, fakeRequestUnmarshaler))) + require.NoError(t, err) + + var extensions = map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(storageError), + } + host := &mockHost{ext: extensions} + + // we fail to start if we get an error creating the storage client + require.Error(t, be.Start(context.Background(), host), "could not get storage client") +} + type mockHost struct { component.Host ext map[component.ID]component.Component diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go deleted file mode 100644 index ef05aa6395d..00000000000 --- a/exporter/exporterhelper/request.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" - -import ( - "context" - - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" -) - -// Request represents a single request that can be sent to an external endpoint. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type Request interface { - // Export exports the request to an external endpoint. - Export(ctx context.Context) error -} - -// RequestItemsCounter is an optional interface that can be implemented by Request to provide a number of items -// in the request. This is a recommended interface to implement for exporters. It is required for batching and queueing -// based on number of items. Also, it's used for reporting number of items in collector's logs, metrics and traces. -// If not implemented, collector's logs, metrics and traces will report 0 items. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type RequestItemsCounter interface { - // ItemsCount returns a number of basic items in the request where item is the smallest piece of data that can be - // sent. For example, for OTLP exporter, this value represents the number of spans, - // metric data points or log records. - ItemsCount() int -} - -type request struct { - Request - baseRequest -} - -var _ internal.Request = (*request)(nil) - -func newRequest(ctx context.Context, req Request) *request { - return &request{ - Request: req, - baseRequest: baseRequest{ctx: ctx}, - } -} - -func (req *request) OnError(_ error) internal.Request { - // Potentially we could introduce a new RequestError type that would represent partially succeeded request. - // In that case we should consider returning them back to the pipeline converted back to pdata in case if - // sending queue is disabled. We leave it as a future improvement if decided that it's needed. - return req -} - -// Count returns a number of items in the request. If the request does not implement RequestItemsCounter -// then 0 is returned. -func (req *request) Count() int { - if counter, ok := req.Request.(RequestItemsCounter); ok { - return counter.ItemsCount() - } - return 0 -} diff --git a/exporter/exporterhelper/request/request.go b/exporter/exporterhelper/request/request.go new file mode 100644 index 00000000000..32499c9fbcc --- /dev/null +++ b/exporter/exporterhelper/request/request.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/exporter/exporterhelper/request" + +import ( + "context" +) + +// Request represents a single request that can be sent to an external endpoint. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Request interface { + // Export exports the request to an external endpoint. + Export(ctx context.Context) error +} + +// ItemsCounter is an optional interface that can be implemented by Request to provide a number of items +// in the request. This is a recommended interface to implement for exporters. It is required for batching and queueing +// based on number of items. Also, it's used for reporting number of items in collector's logs, metrics and traces. +// If not implemented, collector's logs, metrics and traces will report 0 items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type ItemsCounter interface { + // ItemsCount returns a number of basic items in the request where item is the smallest piece of data that can be + // sent. For example, for OTLP exporter, this value represents the number of spans, + // metric data points or log records. + ItemsCount() int +} + +// ErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial +// temporary failures. For example, if some items failed to process and can be retried, this interface allows to +// return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned. +// If not implemented, the original Request will be returned assuming the error is applied to the whole Request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type ErrorHandler interface { + // OnError returns a new Request may contain the items left to be sent if some items failed to process and can be retried. + // Otherwise, it should return the original Request. + OnError(error) Request +} + +// Marshaler is a function that can marshal a Request into bytes. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Marshaler func(req Request) ([]byte, error) + +// Unmarshaler is a function that can unmarshal bytes into a Request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Unmarshaler func(data []byte) (Request, error) diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 6dd3f67800a..460f7b6aa78 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,23 +5,26 @@ package exporterhelper import ( "context" + "encoding/json" + "errors" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) type fakeRequest struct { - items int - err error + Items int + Err error } func (r fakeRequest) Export(_ context.Context) error { - return r.err + return r.Err } func (r fakeRequest) ItemsCount() int { - return r.items + return r.Items } type fakeRequestConverter struct { @@ -31,14 +34,30 @@ type fakeRequestConverter struct { requestError error } -func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError +func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (request.Request, error) { + return fakeRequest{Items: md.DataPointCount(), Err: c.requestError}, c.metricsError } -func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { - return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError +func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (request.Request, error) { + return fakeRequest{Items: td.SpanCount(), Err: c.requestError}, c.tracesError } -func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { - return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError +func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (request.Request, error) { + return fakeRequest{Items: ld.LogRecordCount(), Err: c.requestError}, c.logsError +} + +func fakeRequestMarshaler(req request.Request) ([]byte, error) { + r, ok := req.(fakeRequest) + if !ok { + return nil, errors.New("invalid request type") + } + return json.Marshal(r) +} + +func fakeRequestUnmarshaler(bytes []byte) (request.Request, error) { + var r fakeRequest + if err := json.Unmarshal(bytes, &r); err != nil { + return nil, err + } + return r, nil } diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 14a90a9c1e6..156ae64c1cf 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "errors" "fmt" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "time" "github.com/cenkalti/backoff/v4" @@ -15,7 +16,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -73,7 +73,7 @@ func NewThrottleRetry(err error, delay time.Duration) error { } } -type onRequestHandlingFinishedFunc func(*zap.Logger, internal.Request, error) error +type onRequestHandlingFinishedFunc func(*zap.Logger, *intrequest.Request, error) error type retrySender struct { baseRequestSender @@ -86,7 +86,7 @@ type retrySender struct { func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { if onTemporaryFailure == nil { - onTemporaryFailure = func(logger *zap.Logger, req internal.Request, err error) error { + onTemporaryFailure = func(logger *zap.Logger, req *intrequest.Request, err error) error { return err } } @@ -104,7 +104,7 @@ func (rs *retrySender) shutdown() { } // send implements the requestSender interface -func (rs *retrySender) send(req internal.Request) error { +func (rs *retrySender) send(req *intrequest.Request) error { if !rs.cfg.Enabled { err := rs.nextSender.send(req) if err != nil { diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index 1da5fb29ebd..82f1f13b048 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -21,25 +21,27 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/testdata" ) -func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { +func mockRequestUnmarshaler(mr *mockRequest) request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { return mr, nil } } -func mockRequestMarshaler(_ internal.Request) ([]byte, error) { +func mockRequestMarshaler(_ request.Request) ([]byte, error) { return nil, nil } func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() - mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) + mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -50,7 +52,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -64,7 +66,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.Enabled = false be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, - mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))), + mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error"))), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -73,10 +75,10 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { assert.NoError(t, be.Shutdown(context.Background())) }) - mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) + mockR := newMockRequest(2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -98,11 +100,11 @@ func TestQueuedRetry_OnError(t *testing.T) { }) traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(context.Background(), 2, traceErr) + mockR := newMockRequest(2, traceErr) ocs := be.obsrepSender.(*observabilityConsumerSender) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() @@ -131,11 +133,11 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { require.NoError(t, be.send(newErrorRequest(context.Background()))) }) - mockR := newMockRequest(context.Background(), 2, nil) + mockR := newMockRequest(2, nil) start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() @@ -173,11 +175,11 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { }) retry := NewThrottleRetry(errors.New("throttle error"), 100*time.Millisecond) - mockR := newMockRequest(context.Background(), 2, wrappedError{retry}) + mockR := newMockRequest(2, wrappedError{retry}) start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() @@ -204,10 +206,10 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { assert.NoError(t, be.Shutdown(context.Background())) }) - mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) + mockR := newMockRequest(2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() @@ -225,10 +227,10 @@ func TestQueueRetryWithNoQueue(t *testing.T) { require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + mockR := newMockRequest(2, errors.New("some error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.Error(t, be.send(mockR)) + require.Error(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) @@ -237,30 +239,21 @@ func TestQueueRetryWithNoQueue(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) } -type mockErrorRequest struct { - baseRequest -} +type mockErrorRequest struct{} func (mer *mockErrorRequest) Export(_ context.Context) error { return errors.New("transient error") } -func (mer *mockErrorRequest) OnError(error) internal.Request { - return mer -} - -func (mer *mockErrorRequest) Count() int { +func (mer *mockErrorRequest) ItemsCount() int { return 7 } -func newErrorRequest(ctx context.Context) internal.Request { - return &mockErrorRequest{ - baseRequest: baseRequest{ctx: ctx}, - } +func newErrorRequest(ctx context.Context) *intrequest.Request { + return intrequest.New(ctx, &mockErrorRequest{}) } type mockRequest struct { - baseRequest cnt int mu sync.Mutex consumeError error @@ -280,9 +273,8 @@ func (m *mockRequest) Export(ctx context.Context) error { return ctx.Err() } -func (m *mockRequest) OnError(error) internal.Request { +func (m *mockRequest) OnError(error) request.Request { return &mockRequest{ - baseRequest: m.baseRequest, cnt: 1, consumeError: nil, requestCount: m.requestCount, @@ -295,13 +287,12 @@ func (m *mockRequest) checkNumRequests(t *testing.T, want int) { }, time.Second, 1*time.Millisecond) } -func (m *mockRequest) Count() int { +func (m *mockRequest) ItemsCount() int { return m.cnt } -func newMockRequest(ctx context.Context, cnt int, consumeError error) *mockRequest { +func newMockRequest(cnt int, consumeError error) *mockRequest { return &mockRequest{ - baseRequest: baseRequest{ctx: ctx}, cnt: cnt, consumeError: consumeError, requestCount: &atomic.Int64{}, @@ -323,7 +314,7 @@ func newObservabilityConsumerSender(_ *obsExporter) requestSender { } } -func (ocs *observabilityConsumerSender) send(req internal.Request) error { +func (ocs *observabilityConsumerSender) send(req *intrequest.Request) error { err := ocs.nextSender.send(req) if err != nil { ocs.droppedItemsCount.Add(int64(req.Count())) @@ -396,13 +387,13 @@ func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []met } type producerConsumerQueueWithCounter struct { - internal.ProducerConsumerQueue + queue.Queue produceCounter *atomic.Uint32 } -func (pcq *producerConsumerQueueWithCounter) Produce(item internal.Request) bool { +func (pcq *producerConsumerQueueWithCounter) Produce(req *intrequest.Request) bool { pcq.produceCounter.Add(1) - return pcq.ProducerConsumerQueue.Produce(item) + return pcq.Queue.Produce(req) } type errorRequestSender struct { @@ -410,6 +401,6 @@ type errorRequestSender struct { errToReturn error } -func (rs *errorRequestSender) send(_ internal.Request) error { +func (rs *errorRequestSender) send(_ *intrequest.Request) error { return rs.errToReturn } diff --git a/exporter/exporterhelper/timeout_sender.go b/exporter/exporterhelper/timeout_sender.go index 11b85cf08be..ccee8f37094 100644 --- a/exporter/exporterhelper/timeout_sender.go +++ b/exporter/exporterhelper/timeout_sender.go @@ -5,9 +5,8 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "time" - - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" ) // TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend. @@ -29,13 +28,13 @@ type timeoutSender struct { cfg TimeoutSettings } -func (ts *timeoutSender) send(req internal.Request) error { +func (ts *timeoutSender) send(req *intrequest.Request) error { // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be // updated because this deadline most likely is before the next one. ctx := req.Context() if ts.cfg.Timeout > 0 { var cancelFunc func() - ctx, cancelFunc = context.WithTimeout(req.Context(), ts.cfg.Timeout) + ctx, cancelFunc = context.WithTimeout(ctx, ts.cfg.Timeout) defer cancelFunc() } return req.Export(ctx) diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 4b9e397ec43..2175cf3277e 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -6,14 +6,14 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" "errors" - "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -21,37 +21,35 @@ var tracesMarshaler = &ptrace.ProtoMarshaler{} var tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} type tracesRequest struct { - baseRequest td ptrace.Traces pusher consumer.ConsumeTracesFunc } -func newTracesRequest(ctx context.Context, td ptrace.Traces, pusher consumer.ConsumeTracesFunc) internal.Request { +func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) request.Request { return &tracesRequest{ - baseRequest: baseRequest{ctx: ctx}, - td: td, - pusher: pusher, + td: td, + pusher: pusher, } } -func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { +func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { traces, err := tracesUnmarshaler.UnmarshalTraces(bytes) if err != nil { return nil, err } - return newTracesRequest(context.Background(), traces, pusher), nil + return newTracesRequest(traces, pusher), nil } } -func tracesRequestMarshaler(req internal.Request) ([]byte, error) { +func tracesRequestMarshaler(req request.Request) ([]byte, error) { return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) } -func (req *tracesRequest) OnError(err error) internal.Request { +func (req *tracesRequest) OnError(err error) request.Request { var traceError consumererror.Traces if errors.As(err, &traceError) { - return newTracesRequest(req.ctx, traceError.Data(), req.pusher) + return newTracesRequest(traceError.Data(), req.pusher) } return req } @@ -60,7 +58,7 @@ func (req *tracesRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.td) } -func (req *tracesRequest) Count() int { +func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } @@ -96,7 +94,7 @@ func NewTracesExporter( } tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { - req := newTracesRequest(ctx, td, pusher) + req := intrequest.New(ctx, newTracesRequest(td, pusher)) serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count())) @@ -115,7 +113,7 @@ func NewTracesExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type TracesConverter interface { // RequestFromTraces converts ptrace.Traces into a Request. - RequestFromTraces(context.Context, ptrace.Traces) (Request, error) + RequestFromTraces(context.Context, ptrace.Traces) (request.Request, error) } // NewTracesRequestExporter creates a new traces exporter based on a custom TracesConverter and RequestSender. @@ -148,7 +146,7 @@ func NewTracesRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - r := newRequest(ctx, req) + r := intrequest.New(ctx, req) sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count())) @@ -171,7 +169,7 @@ func newTracesExporterWithObservability(obsrep *obsExporter) requestSender { return &tracesExporterWithObservability{obsrep: obsrep} } -func (tewo *tracesExporterWithObservability) send(req internal.Request) error { +func (tewo *tracesExporterWithObservability) send(req *intrequest.Request) error { req.SetContext(tewo.obsrep.StartTracesOp(req.Context())) // Forward the data to the next consumer (this pusher is the next). err := tewo.nextSender.send(req) diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 52837d23edf..2395a08364f 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" @@ -41,10 +42,12 @@ var ( ) func TestTracesRequest(t *testing.T) { - mr := newTracesRequest(context.Background(), testdata.GenerateTraces(1), nil) + ctx := context.Background() + mr := intrequest.New(ctx, newTracesRequest(testdata.GenerateTraces(1), nil)) traceErr := consumererror.NewTraces(errors.New("some error"), ptrace.NewTraces()) - assert.EqualValues(t, newTracesRequest(context.Background(), ptrace.NewTraces(), nil), mr.OnError(traceErr)) + assert.EqualValues(t, intrequest.New(ctx, newTracesRequest(ptrace.NewTraces(), nil)), + mr.OnError(traceErr)) } func TestTracesExporter_InvalidName(t *testing.T) {