Skip to content

Commit

Permalink
[exporterhelper] Add queue options to the new exporter helper
Browse files Browse the repository at this point in the history
This change enabled queue capability for the new exporter helper. For now, it preserves the same user configuration interface as the existing exporter helper has. The only difference is that implementing persistence is optional now as it requires providing marshal and unmarshal functions for the custom request. 

Later, it's possible to introduce more options for controlling the queue: count of items or bytes in the queue.
  • Loading branch information
dmitryax committed Sep 4, 2023
1 parent 8176e6d commit 8070a59
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 39 deletions.
33 changes: 33 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add API for enabling queue in the new exporter helpers.

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental API is introduced in exporter/exporterhelper package:
- `RequestMarshaler`: a new interface for marshaling client-provided requests.
- `RequestUnmarshaler`: a new interface for unmarshaling client-provided requests.
- `WithMemoryQueue`: a new exporter helper option for using a memory queue.
- `WithPersistentQueue`: a new exporter helper option for using a persistent queue.
- `QueueConfig`: a configuration for queueing requests used by WithMemoryQueue option.
- `NewDefaultQueueConfig`: a function for creating a default QueueConfig.
- `PersistentQueueConfig`: a configuration for queueing requests in persistent storage used by WithPersistentQueue option.
- `NewDefaultPersistentQueueConfig`: a function for creating a default PersistentQueueConfig.
All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata.
# One or more tracking issues or pull requests related to the change
issues: [7874]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
48 changes: 47 additions & 1 deletion exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -131,7 +132,8 @@ func WithRetry(retrySettings RetrySettings) Option {
func WithQueue(config QueueSettings) Option {
return func(o *baseSettings) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
panic("this option is not available for the new request exporters, " +
"use WithMemoryQueue or WithPersistentQueue instead")
}
if !config.Enabled {
return
Expand All @@ -144,6 +146,50 @@ func WithQueue(config QueueSettings) Option {
}
}

// WithMemoryQueue overrides the default QueueConfig for an exporter to use an in-memory queue.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithMemoryQueue(config QueueConfig) Option {
return func(o *baseSettings) {
if !config.Enabled {
return
}
o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
}
}

// WithPersistentQueue overrides the default QueueConfig for an exporter to use a persistent queue.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithPersistentQueue(config PersistentQueueConfig, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) Option {
return func(o *baseSettings) {
if !config.Enabled {
return
}
o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID,
func(req internal.Request) ([]byte, error) {
r, ok := req.(*request)
if !ok {
return nil, fmt.Errorf("invalid request type: %T", req)
}
return marshaler(r.Request)
},
func(data []byte) (internal.Request, error) {
req, err := unmarshaler(data)
if err != nil {
return nil, err
}
return &request{
Request: req,
baseRequest: baseRequest{ctx: context.Background()},
}, nil
},
)
}
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
// The default is non-mutable data.
// TODO: Verify if we can change the default to be mutable as we do for processors.
Expand Down
48 changes: 39 additions & 9 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exporterhelper
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -143,12 +144,12 @@ func TestLogsRequestExporter_Default_ConvertError(t *testing.T) {

func TestLogsRequestExporter_Default_ExportError(t *testing.T) {
ld := plog.NewLogs()
want := errors.New("export_error")
wantErr := errors.New("export_error")
le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(),
&fakeRequestConverter{requestError: want})
&fakeRequestConverter{exportCallback: func(Request) error { return wantErr }})
require.NoError(t, err)
require.NotNil(t, le)
require.Equal(t, want, le.ConsumeLogs(context.Background(), ld))
require.Equal(t, wantErr, le.ConsumeLogs(context.Background(), ld))
}

func TestLogsExporter_WithPersistentQueue(t *testing.T) {
Expand All @@ -175,6 +176,34 @@ func TestLogsExporter_WithPersistentQueue(t *testing.T) {
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestLogsRequestExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultPersistentQueueConfig()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
acc := &atomic.Uint32{}
set := exportertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("test_logs_request", "with_persistent_queue")
rc := &fakeRequestConverter{exportCallback: func(req Request) error {
acc.Add(uint32(req.(RequestItemsCounter).ItemsCount()))
return nil
}}
te, err := NewLogsRequestExporter(context.Background(), set, rc,
WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc()))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })

require.NoError(t, te.ConsumeLogs(context.Background(), testdata.GenerateLogs(1)))
require.NoError(t, te.ConsumeLogs(context.Background(), testdata.GenerateLogs(2)))
require.Eventually(t, func() bool {
return acc.Load() == 3
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestLogsExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
Expand Down Expand Up @@ -213,17 +242,17 @@ func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
}

func TestLogsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) {
want := errors.New("export_error")
wantErr := errors.New("export_error")
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(),
&fakeRequestConverter{requestError: want})
&fakeRequestConverter{exportCallback: func(Request) error { return wantErr }})
require.Nil(t, err)
require.NotNil(t, le)

checkRecordedMetricsForLogsExporter(t, tt, le, want)
checkRecordedMetricsForLogsExporter(t, tt, le, wantErr)
}

func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
Expand Down Expand Up @@ -298,11 +327,12 @@ func TestLogsRequestExporter_WithSpan_ReturnError(t *testing.T) {
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

want := errors.New("my_error")
le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want})
wantErr := errors.New("my_error")
le, err := NewLogsRequestExporter(context.Background(), set,
&fakeRequestConverter{exportCallback: func(Request) error { return wantErr }})
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1)
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, wantErr, 1)
}

func TestLogsExporter_WithShutdown(t *testing.T) {
Expand Down
49 changes: 40 additions & 9 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exporterhelper
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -144,12 +145,12 @@ func TestMetricsRequestExporter_Default_ConvertError(t *testing.T) {

func TestMetricsRequestExporter_Default_ExportError(t *testing.T) {
md := pmetric.NewMetrics()
want := errors.New("export_error")
wantErr := errors.New("export_error")
me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(),
fakeRequestConverter{requestError: want})
fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }})
require.NoError(t, err)
require.NotNil(t, me)
require.Equal(t, want, me.ConsumeMetrics(context.Background(), md))
require.Equal(t, wantErr, me.ConsumeMetrics(context.Background(), md))
}

func TestMetricsExporter_WithPersistentQueue(t *testing.T) {
Expand All @@ -176,6 +177,34 @@ func TestMetricsExporter_WithPersistentQueue(t *testing.T) {
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestMetricsRequestExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultPersistentQueueConfig()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
acc := &atomic.Uint32{}
set := exportertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("test_metrics_request", "with_persistent_queue")
rc := &fakeRequestConverter{exportCallback: func(req Request) error {
acc.Add(uint32(req.(RequestItemsCounter).ItemsCount()))
return nil
}}
te, err := NewMetricsRequestExporter(context.Background(), set, rc,
WithPersistentQueue(qCfg, rc.requestMarshalerFunc(), rc.requestUnmarshalerFunc()))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })

require.NoError(t, te.ConsumeMetrics(context.Background(), testdata.GenerateMetrics(1))) // 2 data points
require.NoError(t, te.ConsumeMetrics(context.Background(), testdata.GenerateMetrics(2))) // 4 data points
require.Eventually(t, func() bool {
return acc.Load() == 6
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
Expand Down Expand Up @@ -214,16 +243,17 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
}

func TestMetricsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) {
want := errors.New("my_error")
wantErr := errors.New("my_error")
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), fakeRequestConverter{requestError: want})
me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(),
fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }})
require.NoError(t, err)
require.NotNil(t, me)

checkRecordedMetricsForMetricsExporter(t, tt, me, want)
checkRecordedMetricsForMetricsExporter(t, tt, me, wantErr)
}

func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
Expand Down Expand Up @@ -298,11 +328,12 @@ func TestMetricsRequestExporter_WithSpan_ExportError(t *testing.T) {
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

want := errors.New("my_error")
me, err := NewMetricsRequestExporter(context.Background(), set, fakeRequestConverter{requestError: want})
wantErr := errors.New("my_error")
me, err := NewMetricsRequestExporter(context.Background(), set,
fakeRequestConverter{exportCallback: func(req Request) error { return wantErr }})
require.NoError(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2)
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, wantErr, 2)
}

func TestMetricsExporter_WithShutdown(t *testing.T) {
Expand Down
58 changes: 58 additions & 0 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,64 @@ func (qCfg *QueueSettings) Validate() error {
return nil
}

// QueueConfig defines configuration for queueing requests before exporting.
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type QueueConfig struct {
// Enabled indicates whether to not enqueue batches before exporting.
Enabled bool `mapstructure:"enabled"`
// NumConsumers is the number of consumers from the queue.
NumConsumers int `mapstructure:"num_consumers"`
// QueueSize is the maximum number of batches allowed in queue at a given time.
// This field is left for backward compatibility with QueueSettings.
// Later, it will be replaced with size fields specified explicitly in terms of items or batches.
QueueSize int `mapstructure:"queue_size"`
}

// NewDefaultQueueConfig returns the default QueueConfig.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewDefaultQueueConfig() QueueConfig {
return QueueConfig{
Enabled: true,
NumConsumers: 10,
QueueSize: defaultQueueSize,
}
}

// PersistentQueueConfig defines configuration for queueing requests before exporting using a persistent storage.
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter and will replace
// QueueSettings in the future.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type PersistentQueueConfig struct {
QueueConfig `mapstructure:",squash"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
}

// NewDefaultPersistentQueueConfig returns the default PersistentQueueConfig.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewDefaultPersistentQueueConfig() PersistentQueueConfig {
return PersistentQueueConfig{
QueueConfig: NewDefaultQueueConfig(),
}
}

// Validate checks if the QueueSettings configuration is valid
func (qCfg *QueueConfig) Validate() error {
if !qCfg.Enabled {
return nil
}
if qCfg.QueueSize <= 0 {
return errors.New("queue size must be positive")
}
return nil
}

type queuedRetrySender struct {
fullName string
id component.ID
Expand Down
Loading

0 comments on commit 8070a59

Please sign in to comment.