Skip to content

Commit

Permalink
[exporterhelper] Add queue options to the new exporter helper
Browse files Browse the repository at this point in the history
This change enabled queue capability for the new exporter helper. For now, it preserves the same user configuration interface as the existing exporter helper has. The only difference is that implementing persistence is optional now as it requires providing marshal and unmarshal functions for the custom request. 

Later, it's possible to introduce more options for controlling the queue: count of items or bytes in the queue.
  • Loading branch information
dmitryax committed Aug 18, 2023
1 parent a29e672 commit 0589798
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 12 deletions.
50 changes: 49 additions & 1 deletion exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -139,12 +140,59 @@ func WithRetry(retrySettings RetrySettings) Option {
func WithQueue(config QueueSettings) Option {
return func(o *baseSettings) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
panic("this option is not available for the new request exporters, " +
"use WithMemoryQueue or WithPersistentQueue instead")
}
o.queueSettings.config = config
}
}

// WithMemoryQueue overrides the default QueueConfig for an exporter to use an in-memory queue.
func WithMemoryQueue(config QueueConfig) Option {
return func(o *baseSettings) {
o.queueSettings.config = QueueSettings{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
}
}
}

// WithPersistentQueue overrides the default QueueConfig for an exporter to use a persistent queue.
// This option can be used only with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithPersistentQueue(config PersistentQueueConfig, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) Option {
return func(o *baseSettings) {
if !o.requestExporter {
panic("this option is not available for the old exporters helpers, use WithQueue instead")
}
o.queueSettings = queueSettings{
config: QueueSettings{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
StorageID: config.StorageID,
},
marshaler: func(req internal.Request) ([]byte, error) {
r, ok := req.(*request)
if !ok {
return nil, fmt.Errorf("invalid request type: %T", req)
}
return marshaler(r.Request)
},
unmarshaler: func(data []byte) (internal.Request, error) {
req, err := unmarshaler(data)
if err != nil {
return nil, err
}
return &request{
Request: req,
baseRequest: baseRequest{ctx: context.Background()},
}, nil
},
}
}
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
// The default is non-mutable data.
// TODO: Verify if we can change the default to be mutable as we do for processors.
Expand Down
49 changes: 49 additions & 0 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,55 @@ func (qCfg *QueueSettings) Validate() error {
return nil
}

// QueueConfig defines configuration for queueing batches before exporting.
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
type QueueConfig struct {
// Enabled indicates whether to not enqueue batches before exporting.
Enabled bool `mapstructure:"enabled"`
// NumConsumers is the number of consumers from the queue.
NumConsumers int `mapstructure:"num_consumers"`
// QueueSize is the maximum number of batches allowed in queue at a given time.
// This field is left for backward compatibility with QueueSettings.
// Later, it will be replaced with size fields specified explicitly in terms of items or batches.
QueueSize int `mapstructure:"queue_size"`
}

// NewDefaultQueueConfig returns the default QueueConfig.
func NewDefaultQueueConfig() QueueConfig {
return QueueConfig{
Enabled: true,
NumConsumers: 10,
QueueSize: defaultQueueSize,
}
}

// PersistentQueueConfig defines configuration for queueing before exporting using a persistent storage.
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter and will replace
type PersistentQueueConfig struct {
QueueConfig `mapstructure:",squash"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
}

// NewDefaultPersistentQueueConfig returns the default PersistentQueueConfig.
func NewDefaultPersistentQueueConfig() PersistentQueueConfig {
return PersistentQueueConfig{
QueueConfig: NewDefaultQueueConfig(),
}
}

// Validate checks if the QueueSettings configuration is valid
func (qCfg *QueueConfig) Validate() error {
if !qCfg.Enabled {
return nil
}
if qCfg.QueueSize <= 0 {
return errors.New("queue size must be positive")
}
return nil
}

type queuedRetrySender struct {
fullName string
id component.ID
Expand Down
58 changes: 54 additions & 4 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
rCfg := NewDefaultRetrySettings()
mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data")))
bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg))
bs.marshaler = mockRequestMarshaler
bs.unmarshaler = mockRequestUnmarshaler(mockR)
be, err := newBaseExporter(defaultSettings, bs, "")
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
Expand All @@ -72,8 +70,6 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false
bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg))
bs.marshaler = mockRequestMarshaler
bs.unmarshaler = mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error")))
be, err := newBaseExporter(defaultSettings, bs, "")
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
Expand Down Expand Up @@ -396,6 +392,27 @@ func TestQueueSettings_Validate(t *testing.T) {
assert.NoError(t, qCfg.Validate())
}

func TestQueueConfig_Validate(t *testing.T) {
qCfg := NewDefaultQueueConfig()
assert.NoError(t, qCfg.Validate())

qCfg.QueueSize = 0
assert.EqualError(t, qCfg.Validate(), "queue size must be positive")

// Confirm Validate doesn't return error with invalid config when feature is disabled
qCfg.Enabled = false
assert.NoError(t, qCfg.Validate())
}

func TestWithMemoryQueue(t *testing.T) {
qs := NewDefaultQueueSettings()
qc := NewDefaultQueueConfig()
assert.Equal(t, newBaseSettings(false, WithQueue(qs)), newBaseSettings(false, WithMemoryQueue(qc)))

qc.NumConsumers = 5
assert.NotEqual(t, newBaseSettings(false, WithQueue(qs)), newBaseSettings(false, WithMemoryQueue(qc)))
}

func TestGetRetrySettings(t *testing.T) {
getStorageClientError := errors.New("unable to create storage client")
testCases := []struct {
Expand Down Expand Up @@ -584,6 +601,30 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
require.Error(t, be.Start(context.Background(), host), "could not get storage client")
}

func TestPersistentQueueRetryPersistenceEnabledStorageError(t *testing.T) {
storageError := errors.New("could not get storage client")
tt, err := obsreporttest.SetupTelemetry(defaultID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultPersistentQueueConfig()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
set := tt.ToExporterCreateSettings()
bs := newBaseSettings(true, WithRetry(rCfg), WithPersistentQueue(qCfg, fakeRequestMarshaler, fakeRequestUnmarshaler))
be, err := newBaseExporter(set, bs, "")
require.NoError(t, err)

var extensions = map[component.ID]component.Component{
storageID: &mockStorageExtension{GetClientError: storageError},
}
host := &mockHost{ext: extensions}

// we fail to start if we get an error creating the storage client
require.Error(t, be.Start(context.Background(), host), "could not get storage client")
}

func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {

produceCounter := &atomic.Uint32{}
Expand Down Expand Up @@ -638,6 +679,15 @@ func TestQueueRetryOptionsWithRequestExporter(t *testing.T) {
})
}

func TestPersistentQueueRetryOptionsWithExporter(t *testing.T) {
bs := newBaseSettings(false, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings()))
assert.False(t, bs.requestExporter)
assert.Panics(t, func() {
_ = newBaseSettings(false, WithRetry(NewDefaultRetrySettings()),
WithPersistentQueue(NewDefaultPersistentQueueConfig(), fakeRequestMarshaler, fakeRequestUnmarshaler))
})
}

type mockErrorRequest struct {
baseRequest
}
Expand Down
6 changes: 6 additions & 0 deletions exporter/exporterhelper/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,9 @@ func (req *request) Count() int {
}
return 0
}

// RequestMarshaler is a function that can marshal a Request into bytes.
type RequestMarshaler func(req Request) ([]byte, error)

// RequestUnmarshaler is a function that can unmarshal bytes into a Request.
type RequestUnmarshaler func(data []byte) (Request, error)
32 changes: 25 additions & 7 deletions exporter/exporterhelper/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@ package exporterhelper

import (
"context"
"encoding/json"
"errors"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type fakeRequest struct {
items int
err error
Items int
Err error
}

func (r fakeRequest) Export(_ context.Context) error {
return r.err
return r.Err
}

func (r fakeRequest) ItemsCount() int {
return r.items
return r.Items
}

type fakeRequestConverter struct {
Expand All @@ -32,13 +34,29 @@ type fakeRequestConverter struct {
}

func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) {
return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError
return fakeRequest{Items: md.DataPointCount(), Err: c.requestError}, c.metricsError
}

func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) {
return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError
return fakeRequest{Items: td.SpanCount(), Err: c.requestError}, c.tracesError
}

func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) {
return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError
return fakeRequest{Items: ld.LogRecordCount(), Err: c.requestError}, c.logsError
}

func fakeRequestMarshaler(req Request) ([]byte, error) {
r, ok := req.(fakeRequest)
if !ok {
return nil, errors.New("invalid request type")
}
return json.Marshal(r)
}

func fakeRequestUnmarshaler(bytes []byte) (Request, error) {
var r fakeRequest
if err := json.Unmarshal(bytes, &r); err != nil {
return nil, err
}
return r, nil
}

0 comments on commit 0589798

Please sign in to comment.