Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterhelper] Introduce batching functionality #8685

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/batch-exporter-helper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add experimental batching capabilities to the exporter helper

# One or more tracking issues or pull requests related to the change
issues: [8122]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
24 changes: 24 additions & 0 deletions exporter/exporterbatcher/batch_func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"

import "context"

// BatchMergeFunc is a function that merges two requests into a single request.
// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is
// marked as not mutable.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeFunc[T any] func(context.Context, T, T) (T, error)

// BatchMergeSplitFunc is a function that merge and/or splits one or two requests into multiple requests based on the
// configured limit provided in MaxSizeConfig.
// All the returned requests MUST have a number of items that does not exceed the maximum number of items.
// Size of the last returned request MUST be less or equal than the size of any other returned request.
// The original request MUST not be mutated if error is returned after mutation or if the exporter is
// marked as not mutable. The length of the returned slice MUST not be 0. The optionalReq argument can be nil,
// make sure to check it before using.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeSplitFunc[T any] func(ctx context.Context, cfg MaxSizeConfig, optionalReq T, req T) ([]T, error)
70 changes: 70 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"

import (
"errors"
"time"
)

// Config defines a configuration for batching requests based on a timeout and a minimum number of items.
// MaxSizeItems defines batch splitting functionality if it's more than zero.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type Config struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Enabled bool `mapstructure:"enabled"`

// FlushTimeout sets the time after which a batch will be sent regardless of its size.
FlushTimeout time.Duration `mapstructure:"flush_timeout"`

MinSizeConfig `mapstructure:",squash"`
MaxSizeConfig `mapstructure:",squash"`
}

// MinSizeConfig defines the configuration for the minimum number of items in a batch.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type MinSizeConfig struct {
// MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
MinSizeItems int `mapstructure:"min_size_items"`
}

// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type MaxSizeConfig struct {
// MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP.
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`
}

func (c Config) Validate() error {
if c.MinSizeItems < 0 {
return errors.New("min_size_items must be greater than or equal to zero")
}
if c.MaxSizeItems < 0 {
return errors.New("max_size_items must be greater than or equal to zero")
}
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
return errors.New("max_size_items must be greater than or equal to min_size_items")
}
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
}
return nil
}

func NewDefaultConfig() Config {
return Config{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
MinSizeConfig: MinSizeConfig{
MinSizeItems: 8192,
},
}
}
30 changes: 30 additions & 0 deletions exporter/exporterbatcher/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConfig_Validate(t *testing.T) {
cfg := NewDefaultConfig()
assert.NoError(t, cfg.Validate())

cfg.MinSizeItems = -1
assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero")

cfg = NewDefaultConfig()
cfg.FlushTimeout = 0
assert.EqualError(t, cfg.Validate(), "timeout must be greater than zero")

cfg.MaxSizeItems = -1
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero")

cfg = NewDefaultConfig()
cfg.MaxSizeItems = 20000
cfg.MinSizeItems = 20001
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")
}
218 changes: 218 additions & 0 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"

import (
"context"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
)

// batchSender is a component that places requests into batches before passing them to the downstream senders.
// Batches are sent out with any of the following conditions:
// - batch size reaches cfg.SendBatchSize
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
// - concurrencyLimit is reached.
type batchSender struct {
baseRequestSender
cfg exporterbatcher.Config
mergeFunc exporterbatcher.BatchMergeFunc[Request]
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[Request]

// concurrencyLimit is the maximum number of goroutines that can be created by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
// Populated from the number of queue consumers if queue is enabled.
concurrencyLimit uint64
activeRequests atomic.Uint64

resetTimerCh chan struct{}

mu sync.Mutex
activeBatch *batch

logger *zap.Logger

shutdownCh chan struct{}
stopped *atomic.Bool
}

// newBatchSender returns a new batch consumer component.
func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings) *batchSender {
bs := &batchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
logger: set.Logger,
shutdownCh: make(chan struct{}),
stopped: &atomic.Bool{},
resetTimerCh: make(chan struct{}),
}
return bs
}

func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
timer := time.NewTimer(bs.cfg.FlushTimeout)
go func() {
for {
select {
case <-bs.shutdownCh:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
if !timer.Stop() {
<-timer.C
}

Check warning on line 73 in exporter/exporterhelper/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/batch_sender.go#L72-L73

Added lines #L72 - L73 were not covered by tests
return
case <-timer.C:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
timer.Reset(bs.cfg.FlushTimeout)
case <-bs.resetTimerCh:
if !timer.Stop() {
<-timer.C
}

Check warning on line 85 in exporter/exporterhelper/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/batch_sender.go#L84-L85

Added lines #L84 - L85 were not covered by tests
timer.Reset(bs.cfg.FlushTimeout)
}
}
}()

return nil
}

type batch struct {
ctx context.Context
request Request
done chan struct{}
err error
}

func newEmptyBatch() *batch {
return &batch{
ctx: context.Background(),
done: make(chan struct{}),
}
}

// exportActiveBatch exports the active batch asynchronously and replaces it with a new one.
// Caller must hold the lock.
func (bs *batchSender) exportActiveBatch() {
go func(b *batch) {
b.err = b.request.Export(b.ctx)
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
close(b.done)
}(bs.activeBatch)
bs.activeBatch = newEmptyBatch()
}

// isActiveBatchReady returns true if the active batch is ready to be exported.
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
// Caller must hold the lock.
func (bs *batchSender) isActiveBatchReady() bool {
return bs.activeBatch.request.ItemsCount() >= bs.cfg.MinSizeItems ||
(bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit)
}

func (bs *batchSender) send(ctx context.Context, req Request) error {
// Stopped batch sender should act as pass-through to allow the queue to be drained.
if bs.stopped.Load() {
return bs.nextSender.send(ctx, req)
}

bs.activeRequests.Add(1)
defer bs.activeRequests.Add(^uint64(0))

if bs.cfg.MaxSizeItems > 0 {
return bs.sendMergeSplitBatch(ctx, req)
}
return bs.sendMergeBatch(ctx, req)
}

// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error {
bs.mu.Lock()

reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
if err != nil || len(reqs) == 0 {
bs.mu.Unlock()
return err
}
if len(reqs) == 1 || bs.activeBatch.request != nil {
bs.updateActiveBatch(ctx, reqs[0])
batch := bs.activeBatch
if bs.isActiveBatchReady() || len(reqs) > 1 {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
}
bs.mu.Unlock()
<-batch.done
if batch.err != nil {
return batch.err
}
reqs = reqs[1:]
} else {
bs.mu.Unlock()
}

// Intentionally do not put the last request in the active batch to not block it.
// TODO: Consider including the partial request in the error to avoid double publishing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading the code and TODO correctly -- it looks like the batch Export() error may or may not be returned to the caller depending on whether the export is triggered by a timer or by a full batch. When one caller sees the error, it's because a batch that (may have?) included its own data failed synchronously while trying to send. The error handling looks inaccurate.

I'm looking to replicate the functionality in https://github.com/open-telemetry/otel-arrow/blob/main/collector/processor/concurrentbatchprocessor/README.md, which has each caller block until the data they entered into one or more batches has been decisively exported or not. Each caller ends up with different partials depending on whether the data they submitted lands in one or more batches. If the data they submitted ends in exactly one batch, it will not get a partial error in that case, it will get a total error describing the uniform result.

The solution I'm referring to blocks each caller until each caller's data has been processed and returns one of three outcomes to each caller: total error, partial error, and no error. It looks like the solution here may or may not block the caller until their data is processed.

The reason this matters is that I want to have accurate success/failure counts from the producer perspective. If the batch processor does not return accurate error information to the SDKs that produced the data, then the SDK-level metrics become not very useful. It's not only to avoid double publishing, it's to improve SDK-level metrics accuracy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current solution always blocks all the callers until the batch is complete or timed out.

This particular comment is about splitting behavior. If one of the batch splits fails to be sent, we return an error as the whole batch failed. We should return a partial error, letting the caller know what portion failed. I can take care of this TODO in this PR. Addressing it should cover all your needs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW the line above is not related to the TODO. It just means that we send the remainder of a split right away, even if the size of it is smaller than the minimum threshold. It's needed to avoid stacking blocked requests on top of each other if they are not part of one batch.

for _, r := range reqs {
if err := r.Export(ctx); err != nil {
return err
}
}
return nil
}

// sendMergeBatch sends the request to the batch and waits for the batch to be exported.
func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
bs.mu.Lock()
if bs.activeBatch.request != nil {
var err error
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
if err != nil {
bs.mu.Unlock()
return err
}
}
bs.updateActiveBatch(ctx, req)
batch := bs.activeBatch
if bs.isActiveBatchReady() {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
}
bs.mu.Unlock()
<-batch.done
return batch.err
}

// updateActiveBatch update the active batch to the new merged request and context.
// The context is only set once and is not updated after the first call.
// Merging the context would be complex and require an additional goroutine to handle the context cancellation.
// We take the approach of using the context from the first request since it's likely to have the shortest timeout.
func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
if bs.activeBatch.request == nil {
bs.activeBatch.ctx = ctx
}
bs.activeBatch.request = req
}

func (bs *batchSender) Shutdown(context.Context) error {
bs.stopped.Store(true)
close(bs.shutdownCh)
// Wait for the active requests to finish.
for bs.activeRequests.Load() > 0 {
time.Sleep(10 * time.Millisecond)
}
return nil
}
Loading
Loading