Skip to content

Commit

Permalink
[exporterhelper] refactor queue creation for WithQueue option
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Aug 25, 2023
1 parent 149ae29 commit c66cf3f
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 408 deletions.
71 changes: 20 additions & 51 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -73,23 +72,25 @@ type baseSettings struct {
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
queueSettings
queue Queue
RetrySettings
requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

// newBaseSettings returns the baseSettings starting from the default and applying all configured options.
// requestExporter indicates whether the base settings are for a new request exporter or not.
func newBaseSettings(requestExporter bool, options ...Option) *baseSettings {
// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones.
func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings {
bs := &baseSettings{
requestExporter: requestExporter,
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call DefaultQueueSettings)
queueSettings: queueSettings{
config: QueueSettings{Enabled: false},
},
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
marshaler: marshaler,
unmarshaler: unmarshaler,
}

for _, op := range options {
Expand Down Expand Up @@ -143,53 +144,21 @@ func WithQueue(config QueueSettings) Option {
panic("this option is not available for the new request exporters, " +
"use WithMemoryQueue or WithPersistentQueue instead")
}
o.queueSettings.config = config
}
}

// WithMemoryQueue overrides the default QueueConfig for an exporter to use an in-memory queue.
func WithMemoryQueue(config QueueConfig) Option {
return func(o *baseSettings) {
o.queueSettings.config = QueueSettings{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
if !config.Enabled {
return
}

Check warning on line 149 in exporter/exporterhelper/common.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/common.go#L148-L149

Added lines #L148 - L149 were not covered by tests
if config.StorageID == nil {
o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
return
}
o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
}

// WithPersistentQueue overrides the default QueueConfig for an exporter to use a persistent queue.
// This option can be used only with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithPersistentQueue(config PersistentQueueConfig, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) Option {
// WithRequestQueue enables queueing for an exporter.
func WithRequestQueue(queue Queue) Option {
return func(o *baseSettings) {
if !o.requestExporter {
panic("this option is not available for the old exporters helpers, use WithQueue instead")
}
o.queueSettings = queueSettings{
config: QueueSettings{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
StorageID: config.StorageID,
},
marshaler: func(req internal.Request) ([]byte, error) {
r, ok := req.(*request)
if !ok {
return nil, fmt.Errorf("invalid request type: %T", req)
}
return marshaler(r.Request)
},
unmarshaler: func(data []byte) (internal.Request, error) {
req, err := unmarshaler(data)
if err != nil {
return nil, err
}
return &request{
Request: req,
baseRequest: baseRequest{ctx: context.Background()},
}, nil
},
}
o.queue = queue
}
}

Expand Down Expand Up @@ -220,7 +189,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand All @@ -229,7 +198,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
}

// If no error then start the queuedRetrySender.
return be.qrSender.start(ctx, host)
return be.qrSender.start(ctx, host, set)
}
be.ShutdownFunc = func(ctx context.Context) error {
// First shutdown the queued retry sender
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ var (
)

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "")
be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "")
be, err = newBaseExporter(defaultSettings, newBaseSettings(true, nil, nil), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -47,7 +47,7 @@ func TestBaseExporterWithOptions(t *testing.T) {
be, err := newBaseExporter(
defaultSettings,
newBaseSettings(
false,
false, nil, nil,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
Expand Down
42 changes: 29 additions & 13 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"
"go.opentelemetry.io/collector/component"
"sync"
"sync/atomic"
)
Expand All @@ -14,41 +16,44 @@ import (
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer are dropped.
type boundedMemoryQueue struct {
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
capacity uint32
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
capacity uint32
numConsumers int
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue(capacity int) ProducerConsumerQueue {
func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
numConsumers: numConsumers,
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *boundedMemoryQueue) StartConsumers(numWorkers int, callback func(item Request)) {
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error {
var startWG sync.WaitGroup
for i := 0; i < numWorkers; i++ {
for i := 0; i < q.numConsumers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func() {
startWG.Done()
defer q.stopWG.Done()
for item := range q.items {
q.size.Add(^uint32(0))
callback(item)
set.Callback(item)
}
}()
}
startWG.Wait()
return nil
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
Expand Down Expand Up @@ -87,3 +92,14 @@ func (q *boundedMemoryQueue) Stop() {
func (q *boundedMemoryQueue) Size() int {
return int(q.size.Load())
}

// Capacity returns the capacity of the queue.
func (q *boundedMemoryQueue) Capacity() int {
return int(q.capacity)
}

func (q *boundedMemoryQueue) IsPersistent() bool {
return false
}

func (q *boundedMemoryQueue) unexported() {}

Check warning on line 105 in exporter/exporterhelper/internal/bounded_memory_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/bounded_memory_queue.go#L105

Added line #L105 was not covered by tests
49 changes: 21 additions & 28 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package internal

import (
"context"
"reflect"
"sync"
"sync/atomic"
Expand All @@ -14,8 +15,20 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func newNopQueueSettings(callback func(item Request)) QueueSettings {
return QueueSettings{
CreateSettings: exportertest.NewNopCreateSettings(),
DataType: component.DataTypeMetrics,
Callback: callback,
}
}

type stringRequest struct {
Request
str string
Expand All @@ -29,7 +42,7 @@ func newStringRequest(str string) Request {
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerFn func(item Request))) {
q := NewBoundedMemoryQueue(1)
q := NewBoundedMemoryQueue(1, 1)

var startLock sync.Mutex

Expand Down Expand Up @@ -88,7 +101,7 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF

func TestBoundedQueue(t *testing.T) {
helper(t, func(q ProducerConsumerQueue, consumerFn func(item Request)) {
q.StartConsumers(1, consumerFn)
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(consumerFn)))
})
}

Expand All @@ -99,14 +112,14 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue(10)
q := NewBoundedMemoryQueue(10, 1)

consumerState := newConsumerState(t)

q.StartConsumers(1, func(item Request) {
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {
consumerState.record(item.(stringRequest).str)
time.Sleep(1 * time.Second)
})
})))

q.Produce(newStringRequest("a"))
q.Produce(newStringRequest("b"))
Expand Down Expand Up @@ -183,30 +196,10 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
}

func TestZeroSize(t *testing.T) {
q := NewBoundedMemoryQueue(0)
q := NewBoundedMemoryQueue(0, 1)

q.StartConsumers(1, func(item Request) {
})
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {}))
assert.NoError(t, err)

assert.False(t, q.Produce(newStringRequest("a"))) // in process
}

func BenchmarkBoundedQueue(b *testing.B) {
q := NewBoundedMemoryQueue(1000)

q.StartConsumers(10, func(item Request) {})

for n := 0; n < b.N; n++ {
q.Produce(newStringRequest("a"))
}
}

func BenchmarkBoundedQueueWithFactory(b *testing.B) {
q := NewBoundedMemoryQueue(1000)

q.StartConsumers(10, func(item Request) {})

for n := 0; n < b.N; n++ {
q.Produce(newStringRequest("a"))
}
}
Loading

0 comments on commit c66cf3f

Please sign in to comment.