Skip to content

Commit

Permalink
Require the storage to be explicitly set for persistent queue (#5784)
Browse files Browse the repository at this point in the history
Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
Mikołaj Świątek and bogdandrutu authored Aug 4, 2022
1 parent d146f29 commit 3bb6860
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### 🛑 Breaking changes 🛑

- Require the storage to be explicitly set for the (experimental) persistent queue (#5784)
- Remove deprecated `confighttp.HTTPClientSettings.ToClientWithHost` (#5803)
- Remove deprecated component stability helpers (#5802):
- `component.WithTracesExporterAndStabilityLevel`
Expand Down
13 changes: 6 additions & 7 deletions exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,13 @@ The following configuration options can be modified:
With this build tag set, additional configuration option can be enabled:

- `sending_queue`
- `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension
- `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue
(note, `enable_unstable` build tag needs to be enabled first, see below for more details)

The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which,
similarly as for in-memory buffering, defaults to 5000 batches).

When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using
[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage).
If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and
the exporting is continued.
When persistent queue is enabled, the batches are being buffered using the provided storage extension - [filestorage] is a popular and safe choice. If the collector instance is killed while having some items in the persistent queue, on restart the items will be be picked and the exporting is continued.

```
┌─Consumer #1─┐
Expand Down Expand Up @@ -93,9 +90,9 @@ exporters:
otlp:
endpoint: <ENDPOINT>
sending_queue:
persistent_storage_enabled: true
storage: file_storage/otc
extensions:
file_storage:
file_storage/otc:
directory: /var/lib/storage/otc
timeout: 10s
service:
Expand All @@ -112,3 +109,5 @@ service:
exporters: [otlp]
```

[filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage
45 changes: 23 additions & 22 deletions exporter/exporterhelper/queued_retry_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ type QueueSettings struct {
NumConsumers int `mapstructure:"num_consumers"`
// QueueSize is the maximum number of batches allowed in queue at a given time.
QueueSize int `mapstructure:"queue_size"`
// PersistentStorageEnabled describes whether persistence via a file storage extension is enabled
PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *config.ComponentID `mapstructure:"storage"`
}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
Expand All @@ -57,8 +58,7 @@ func NewDefaultQueueSettings() QueueSettings {
// This is a pretty decent value for production.
// User should calculate this from the perspective of how many seconds to buffer in case of a backend outage,
// multiply that by the number of requests per seconds.
QueueSize: 5000,
PersistentStorageEnabled: false,
QueueSize: 5000,
}
}

Expand All @@ -76,8 +76,8 @@ func (qCfg *QueueSettings) Validate() error {
}

var (
errNoStorageClient = errors.New("no storage client extension found")
errMultipleStorageClients = errors.New("multiple storage extensions found")
errNoStorageClient = errors.New("no storage client extension found")
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)

type queuedRetrySender struct {
Expand Down Expand Up @@ -120,46 +120,47 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu
onTemporaryFailure: qrs.onTemporaryFailure,
}

if !qCfg.PersistentStorageEnabled {
if qCfg.StorageID == nil {
qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {})
}
// The Persistent Queue is initialized separately as it needs extra information about the component

return qrs
}

func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) {
var storageExtension storage.Extension
for _, ext := range host.GetExtensions() {
if se, ok := ext.(storage.Extension); ok {
if storageExtension != nil {
return nil, errMultipleStorageClients
}
storageExtension = se
func getStorageExtension(extensions map[config.ComponentID]component.Extension, storageID config.ComponentID) (storage.Extension, error) {
if ext, found := extensions[storageID]; found {
if storageExt, ok := ext.(storage.Extension); ok {
return storageExt, nil
}
return nil, errWrongExtensionType
}
return nil, errNoStorageClient
}

if storageExtension == nil {
return nil, errNoStorageClient
func toStorageClient(ctx context.Context, storageID config.ComponentID, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) {
extension, err := getStorageExtension(host.GetExtensions(), storageID)
if err != nil {
return nil, err
}

client, err := storageExtension.GetClient(ctx, component.KindExporter, id, string(signal))
client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal))
if err != nil {
return nil, err
}

return &client, err
return client, err
}

// initializePersistentQueue uses extra information for initialization available from component.Host
func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error {
if qrs.cfg.PersistentStorageEnabled {
storageClient, err := getStorageClient(ctx, host, qrs.id, qrs.signal)
if qrs.cfg.StorageID != nil {
storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal)
if err != nil {
return err
}

qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler)
qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler)

// TODO: this can be further exposed as a config param rather than relying on a type of queue
qrs.requeuingEnabled = true
Expand Down
65 changes: 46 additions & 19 deletions exporter/exporterhelper/queued_retry_experimental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,39 +70,41 @@ func TestGetRetrySettings(t *testing.T) {
desc string
storage storage.Extension
numStorages int
storageEnabled bool
storageIndex int
expectedError error
getClientError error
}{
{
desc: "no storage selected",
numStorages: 0,
expectedError: errNoStorageClient,
desc: "obtain storage extension by name",
numStorages: 2,
storageIndex: 0,
expectedError: nil,
},
{
desc: "obtain default storage extension",
numStorages: 1,
storageEnabled: true,
expectedError: nil,
desc: "fail on not existing storage extension",
numStorages: 2,
storageIndex: 100,
expectedError: errNoStorageClient,
},
{
desc: "fail on obtaining default storage extension",
numStorages: 2,
storageEnabled: true,
expectedError: errMultipleStorageClients,
desc: "invalid extension type",
numStorages: 2,
storageIndex: 100,
expectedError: errNoStorageClient,
},
{
desc: "fail on error getting storage client from extension",
numStorages: 1,
storageEnabled: true,
storageIndex: 0,
expectedError: getStorageClientError,
getClientError: getStorageClientError,
},
}

for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
// prepare
storageID := config.NewComponentIDWithName("file_storage", strconv.Itoa(tC.storageIndex))

var extensions = map[config.ComponentID]component.Extension{}
for i := 0; i < tC.numStorages; i++ {
extensions[config.NewComponentIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError}
Expand All @@ -111,7 +113,7 @@ func TestGetRetrySettings(t *testing.T) {
ownerID := config.NewComponentID("foo_exporter")

// execute
client, err := getStorageClient(context.Background(), host, ownerID, config.TracesDataType)
client, err := toStorageClient(context.Background(), storageID, host, ownerID, config.TracesDataType)

// verify
if tC.expectedError != nil {
Expand All @@ -125,6 +127,29 @@ func TestGetRetrySettings(t *testing.T) {
}
}

func TestInvalidStorageExtensionType(t *testing.T) {
storageID := config.NewComponentIDWithName("extension", "extension")

// make a test extension
factory := componenttest.NewNopExtensionFactory()
extConfig := factory.CreateDefaultConfig()
settings := componenttest.NewNopExtensionCreateSettings()
extension, err := factory.CreateExtension(context.Background(), settings, extConfig)
assert.NoError(t, err)
var extensions = map[config.ComponentID]component.Extension{
storageID: extension,
}
host := &mockHost{ext: extensions}
ownerID := config.NewComponentID("foo_exporter")

// execute
client, err := toStorageClient(context.Background(), storageID, host, ownerID, config.TracesDataType)

// we should get an error about the extension type
assert.ErrorIs(t, err, errWrongExtensionType)
assert.Nil(t, client)
}

// if requeueing is enabled, we eventually retry even if we failed at first
func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
qCfg := NewDefaultQueueSettings()
Expand Down Expand Up @@ -182,12 +207,13 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg.PersistentStorageEnabled = true // enable persistence
storageID := config.NewComponentIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())

var extensions = map[config.ComponentID]component.Extension{
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{},
storageID: &mockStorageExtension{},
}
host := &mockHost{ext: extensions}

Expand All @@ -203,12 +229,13 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg.PersistentStorageEnabled = true // enable persistence
storageID := config.NewComponentIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())

var extensions = map[config.ComponentID]component.Extension{
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{GetClientError: storageError},
storageID: &mockStorageExtension{GetClientError: storageError},
}
host := &mockHost{ext: extensions}

Expand Down

0 comments on commit 3bb6860

Please sign in to comment.