Skip to content

Commit

Permalink
[exporterhelper] Add WithRequestQueue option to the exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Oct 2, 2023
1 parent 7c5ecef commit 2d1aec3
Show file tree
Hide file tree
Showing 33 changed files with 785 additions and 439 deletions.
35 changes: 35 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add API for enabling queue in the new exporter helpers.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental API is introduced in exporter/exporterhelper package:
- `WithRequestQueue`: a new exporter helper option for using a queue.
- queue.Queue: an interface for queue implementations.
- queue.Factory: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option.
- queue.Settings: queue factory settings.
- queue.Config: common configuration for queue implementations.
- queue.NewDefaultConfig: a function for creating a default queue configuration.
- queue/memoryqueue.NewFactory: a new factory for creating a memory queue.
- queue/memoryqueue.Config: a configuration for the memory queue factory.
- queue/memoryqueue.NewDefaultConfig: a function for creating a default memory queue configuration.
All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
85 changes: 40 additions & 45 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue"
exprequest "go.opentelemetry.io/collector/exporter/exporterhelper/request"
)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error
start(ctx context.Context, host component.Host) error
shutdown()
send(req internal.Request) error
send(req *intrequest.Request) error
setNextSender(nextSender requestSender)
}

Expand All @@ -26,13 +29,13 @@ type baseRequestSender struct {

var _ requestSender = (*baseRequestSender)(nil)

func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error {
func (b *baseRequestSender) start(context.Context, component.Host) error {
return nil
}

func (b *baseRequestSender) shutdown() {}

func (b *baseRequestSender) send(req internal.Request) error {
func (b *baseRequestSender) send(req *intrequest.Request) error {
return b.nextSender.send(req)
}

Expand All @@ -42,30 +45,6 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) {

type obsrepSenderFactory func(obsrep *obsExporter) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
ctx context.Context
processingFinishedCallback func()
}

func (req *baseRequest) Context() context.Context {
return req.ctx
}

func (req *baseRequest) SetContext(ctx context.Context) {
req.ctx = ctx
}

func (req *baseRequest) SetOnProcessingFinished(callback func()) {
req.processingFinishedCallback = callback
}

func (req *baseRequest) OnProcessingFinished() {
if req.processingFinishedCallback != nil {
req.processingFinishedCallback()
}
}

// Option apply changes to baseExporter.
type Option func(*baseExporter)

Expand Down Expand Up @@ -107,19 +86,34 @@ func WithRetry(retrySettings RetrySettings) Option {
func WithQueue(config QueueSettings) Option {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
}
var queue internal.ProducerConsumerQueue
if config.Enabled {
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
panic("this option is not available for the new request exporters, " +
"use WithMemoryQueue or WithPersistentQueue instead")
}
qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger)
factory := persistentqueue.NewFactory(persistentqueue.Config{
Config: queue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
},
StorageID: config.StorageID,
}, o.marshaler, o.unmarshaler)
qs := newQueueSender(o.set.ID, o.signal, o.set.Logger)
qs.queue = factory.Create(queue.Settings{CreateSettings: o.set, DataType: o.signal})
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender = qs
}
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(queueFactory queue.Factory) Option {
return func(o *baseExporter) {
qs := newQueueSender(o.set.ID, o.signal, o.set.Logger)
qs.queue = queueFactory.Create(queue.Settings{CreateSettings: o.set, DataType: o.signal})
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender = qs
}
}

Expand All @@ -138,8 +132,8 @@ type baseExporter struct {
component.ShutdownFunc

requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
marshaler exprequest.Marshaler
unmarshaler exprequest.Unmarshaler
signal component.DataType

set exporter.CreateSettings
Expand All @@ -160,8 +154,9 @@ type baseExporter struct {
}

// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool,
marshaler exprequest.Marshaler, unmarshaler exprequest.Unmarshaler, osf obsrepSenderFactory,
options ...Option) (*baseExporter, error) {

obsrep, err := newObsExporter(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
if err != nil {
Expand Down Expand Up @@ -192,7 +187,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}

// send sends the request using the first sender in the chain.
func (be *baseExporter) send(req internal.Request) error {
func (be *baseExporter) send(req *intrequest.Request) error {
return be.queueSender.send(req)
}

Expand All @@ -210,7 +205,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
}

// If no error then start the queueSender.
return be.queueSender.start(ctx, host, be.set)
return be.queueSender.start(ctx, host)
}

func (be *baseExporter) Shutdown(ctx context.Context) error {
Expand Down
21 changes: 15 additions & 6 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"sync"
"sync/atomic"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue"
)

// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue,
Expand All @@ -20,26 +22,28 @@ type boundedMemoryQueue struct {
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
items chan *intrequest.Request
capacity uint32
numConsumers int
callback func(item *intrequest.Request)
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue {
func NewBoundedMemoryQueue(capacity int, numConsumers int) queue.Queue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
items: make(chan *intrequest.Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
numConsumers: numConsumers,
callback: func(item *intrequest.Request) {},
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error {
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host) error {
var startWG sync.WaitGroup
for i := 0; i < q.numConsumers; i++ {
q.stopWG.Add(1)
Expand All @@ -49,16 +53,21 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu
defer q.stopWG.Done()
for item := range q.items {
q.size.Add(^uint32(0))
set.Callback(item)
q.callback(item)
}
}()
}
startWG.Wait()
return nil
}

// SetCallback sets the callback function to be called by the consumer when an item is consumed from the queue.
func (q *boundedMemoryQueue) SetCallback(callback func(*intrequest.Request)) {
q.callback = callback
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *boundedMemoryQueue) Produce(item Request) bool {
func (q *boundedMemoryQueue) Produce(item *intrequest.Request) bool {
if q.stopped.Load() {
return false
}
Expand Down
Loading

0 comments on commit 2d1aec3

Please sign in to comment.