Skip to content

Commit

Permalink
[exporterhelper] refactor queue creation for WithQueue option
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Aug 23, 2023
1 parent b8af2fb commit d3b012d
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 176 deletions.
69 changes: 18 additions & 51 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -73,23 +72,24 @@ type baseSettings struct {
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
queueSettings
queue Queue
RetrySettings
requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

// newBaseSettings returns the baseSettings starting from the default and applying all configured options.
// requestExporter indicates whether the base settings are for a new request exporter or not.
func newBaseSettings(requestExporter bool, options ...Option) *baseSettings {
func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings {
bs := &baseSettings{
requestExporter: requestExporter,
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call DefaultQueueSettings)
queueSettings: queueSettings{
config: QueueSettings{Enabled: false},
},
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
marshaler: marshaler,
unmarshaler: unmarshaler,
}

for _, op := range options {
Expand Down Expand Up @@ -143,53 +143,20 @@ func WithQueue(config QueueSettings) Option {
panic("this option is not available for the new request exporters, " +
"use WithMemoryQueue or WithPersistentQueue instead")
}
o.queueSettings.config = config
}
}

// WithMemoryQueue overrides the default QueueConfig for an exporter to use an in-memory queue.
func WithMemoryQueue(config QueueConfig) Option {
return func(o *baseSettings) {
o.queueSettings.config = QueueSettings{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
if config.Enabled {
return
}
if config.StorageID == nil {
o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
}
o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
}

// WithPersistentQueue overrides the default QueueConfig for an exporter to use a persistent queue.
// This option can be used only with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithPersistentQueue(config PersistentQueueConfig, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) Option {
// WithRequestQueue enables queueing for an exporter.
func WithRequestQueue(queue Queue) Option {
return func(o *baseSettings) {
if !o.requestExporter {
panic("this option is not available for the old exporters helpers, use WithQueue instead")
}
o.queueSettings = queueSettings{
config: QueueSettings{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
StorageID: config.StorageID,
},
marshaler: func(req internal.Request) ([]byte, error) {
r, ok := req.(*request)
if !ok {
return nil, fmt.Errorf("invalid request type: %T", req)
}
return marshaler(r.Request)
},
unmarshaler: func(data []byte) (internal.Request, error) {
req, err := unmarshaler(data)
if err != nil {
return nil, err
}
return &request{
Request: req,
baseRequest: baseRequest{ctx: context.Background()},
}, nil
},
}
o.queue = queue
}
}

Expand Down Expand Up @@ -220,7 +187,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand All @@ -229,7 +196,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
}

// If no error then start the queuedRetrySender.
return be.qrSender.start(ctx, host)
return be.qrSender.start(ctx, host, set)
}
be.ShutdownFunc = func(ctx context.Context) error {
// First shutdown the queued retry sender
Expand Down
38 changes: 25 additions & 13 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"
"go.opentelemetry.io/collector/component"
"sync"
"sync/atomic"
)
Expand All @@ -14,41 +16,44 @@ import (
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer are dropped.
type boundedMemoryQueue struct {
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
capacity uint32
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
capacity uint32
numConsumers int
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue(capacity int) ProducerConsumerQueue {
func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
numConsumers: numConsumers,
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *boundedMemoryQueue) StartConsumers(numWorkers int, callback func(item Request)) {
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error {
var startWG sync.WaitGroup
for i := 0; i < numWorkers; i++ {
for i := 0; i < q.numConsumers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func() {
startWG.Done()
defer q.stopWG.Done()
for item := range q.items {
q.size.Add(^uint32(0))
callback(item)
set.Callback(item)
}
}()
}
startWG.Wait()
return nil
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
Expand Down Expand Up @@ -87,3 +92,10 @@ func (q *boundedMemoryQueue) Stop() {
func (q *boundedMemoryQueue) Size() int {
return int(q.size.Load())
}

// Capacity returns the capacity of the queue.
func (q *boundedMemoryQueue) Capacity() int {
return int(q.capacity)
}

func (q *boundedMemoryQueue) unexported() {}
85 changes: 62 additions & 23 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"errors"
"fmt"
"sync"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
)
Expand All @@ -19,14 +18,21 @@ var (
stopStorage = func(queue *persistentQueue) {
queue.storage.stop()
}
errNoStorageClient = errors.New("no storage client extension found")
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)

// persistentQueue holds the queue backed by file storage
type persistentQueue struct {
stopWG sync.WaitGroup
stopOnce sync.Once
stopChan chan struct{}
storage *persistentContiguousStorage
stopWG sync.WaitGroup
stopOnce sync.Once
stopChan chan struct{}
storageID component.ID
storage *persistentContiguousStorage
capacity uint64
numConsumers int
marshaler RequestMarshaler
unmarshaler RequestUnmarshaler
}

// buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done
Expand All @@ -35,40 +41,42 @@ func buildPersistentStorageName(name string, signal component.DataType) string {
return fmt.Sprintf("%s-%s", name, signal)
}

type PersistentQueueSettings struct {
Name string
Signal component.DataType
Capacity uint64
Logger *zap.Logger
Client storage.Client
Unmarshaler RequestUnmarshaler
Marshaler RequestMarshaler
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue {
func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler,
unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
return &persistentQueue{
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params),
capacity: uint64(capacity),
numConsumers: numConsumers,
storageID: storageID,
marshaler: marshaler,
unmarshaler: unmarshaler,
stopChan: make(chan struct{}),
}
}

// StartConsumers starts the given number of consumers which will be consuming items
func (pq *persistentQueue) StartConsumers(numWorkers int, callback func(item Request)) {
for i := 0; i < numWorkers; i++ {
// Start starts the persistentQueue with the given number of consumers.
func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set QueueSettings) error {
storageClient, err := toStorageClient(ctx, pq.storageID, host, set.ID, set.DataType)
if err != nil {
return err
}
storageName := buildPersistentStorageName(set.ID.Name(), set.DataType)
pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler)
for i := 0; i < pq.numConsumers; i++ {
pq.stopWG.Add(1)
go func() {
defer pq.stopWG.Done()
for {
select {
case req := <-pq.storage.get():
callback(req)
set.Callback(req)
case <-pq.stopChan:
return
}
}
}()
}
return nil
}

// Produce adds an item to the queue and returns true if it was accepted
Expand All @@ -91,3 +99,34 @@ func (pq *persistentQueue) Stop() {
func (pq *persistentQueue) Size() int {
return int(pq.storage.size())
}

// Capacity returns the capacity of the queue.
func (q *persistentQueue) Capacity() int {
return int(q.capacity)
}

func (pq *persistentQueue) unexported() {}

func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) {
extension, err := getStorageExtension(host.GetExtensions(), storageID)
if err != nil {
return nil, err
}

client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal))
if err != nil {
return nil, err
}

return client, err
}

func getStorageExtension(extensions map[component.ID]component.Component, storageID component.ID) (storage.Extension, error) {
if ext, found := extensions[storageID]; found {
if storageExt, ok := ext.(storage.Extension); ok {
return storageExt, nil
}
return nil, errWrongExtensionType
}
return nil, errNoStorageClient
}
15 changes: 8 additions & 7 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,16 @@ var (

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage {
func newPersistentContiguousStorage(ctx context.Context, queueName string, client storage.Client,
logger *zap.Logger, capacity uint64, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: set.Logger,
client: set.Client,
logger: logger,
client: client,
queueName: queueName,
unmarshaler: set.Unmarshaler,
marshaler: set.Marshaler,
capacity: set.Capacity,
putChan: make(chan struct{}, set.Capacity),
unmarshaler: unmarshaler,
marshaler: marshaler,
capacity: capacity,
putChan: make(chan struct{}, capacity),
reqChan: make(chan Request),
stopChan: make(chan struct{}),
itemsCount: &atomic.Uint64{},
Expand Down
20 changes: 18 additions & 2 deletions exporter/exporterhelper/internal/producer_consumer_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,24 @@

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
)

type QueueSettings struct {
exporter.CreateSettings
DataType component.DataType
Callback func(item Request)
}

// ProducerConsumerQueue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue
// (boundedMemoryQueue) or via a disk-based queue (persistentQueue)
type ProducerConsumerQueue interface {
// StartConsumers starts a given number of goroutines consuming items from the queue
// Start starts the queue with a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
StartConsumers(num int, callback func(item Request))
Start(ctx context.Context, host component.Host, set QueueSettings) error
// Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added
// to the queue due to queue overflow.
Produce(item Request) bool
Expand All @@ -19,4 +31,8 @@ type ProducerConsumerQueue interface {
// Stop stops all consumers, as well as the length reporter if started,
// and releases the items channel. It blocks until all consumers have stopped.
Stop()
// Capacity returns the capacity of the queue.
Capacity() int

unexported()
}
Loading

0 comments on commit d3b012d

Please sign in to comment.