Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup initialization order for exporterhelper #12239

Merged
merged 1 commit into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/clenup-initialization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix bug that the exporter with new batcher may have been marked as non mutation.

# One or more tracking issues or pull requests related to the change
issues: [12239]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Only affects users that manually turned on `exporter.UsePullingBasedExporterQueueBatcher` featuregate.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
140 changes: 76 additions & 64 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
)

type ObsrepSenderFactory = func(obsrep *ObsReport) Sender[internal.Request]
type ObsrepSenderFactory = func(obsrep *ObsReport, next Sender[internal.Request]) Sender[internal.Request]

// Option apply changes to BaseExporter.
type Option func(*BaseExporter) error
Expand All @@ -52,17 +52,20 @@
// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
// Most of the senders are optional, and initialized with a no-op path-through sender.
BatchSender Sender[internal.Request]
QueueSender Sender[internal.Request]
ObsrepSender Sender[internal.Request]
RetrySender Sender[internal.Request]
TimeoutSender *TimeoutSender // TimeoutSender is always initialized.
BatchSender Sender[internal.Request]
QueueSender Sender[internal.Request]
ObsrepSender Sender[internal.Request]
RetrySender Sender[internal.Request]

firstSender Sender[internal.Request]

ConsumerOptions []consumer.Option

queueCfg exporterqueue.Config
timeoutCfg TimeoutConfig
retryCfg configretry.BackOffConfig
queueFactory exporterqueue.Factory[internal.Request]
BatcherCfg exporterbatcher.Config
queueCfg exporterqueue.Config
batcherCfg exporterbatcher.Config
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
Expand All @@ -72,98 +75,107 @@
}

be := &BaseExporter{
BatchSender: &BaseSender[internal.Request]{},
QueueSender: &BaseSender[internal.Request]{},
ObsrepSender: osf(obsReport),
RetrySender: &BaseSender[internal.Request]{},
TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()},

Set: set,
timeoutCfg: NewDefaultTimeoutConfig(),
Set: set,
}

for _, op := range options {
err = multierr.Append(err, op(be))
if err = op(be); err != nil {
return nil, err
}
}
if err != nil {
return nil, err

// TimeoutSender is always initialized.
be.firstSender = &TimeoutSender{cfg: be.timeoutCfg}
if be.retryCfg.Enabled {
be.RetrySender = newRetrySender(be.retryCfg, set, be.firstSender)
be.firstSender = be.RetrySender
}

be.ObsrepSender = osf(obsReport, be.firstSender)
be.firstSender = be.ObsrepSender

if be.batcherCfg.Enabled {
// Batcher mutates the data.
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled && !be.queueCfg.Enabled {
concurrencyLimit := int64(0)
if be.queueCfg.Enabled {
concurrencyLimit = int64(be.queueCfg.NumConsumers)
}
be.BatchSender = NewBatchSender(be.batcherCfg, set, concurrencyLimit, be.firstSender)
be.firstSender = be.BatchSender
}

if be.queueCfg.Enabled {
qSet := exporterqueue.Settings{
Signal: signal,
ExporterSettings: be.Set,
ExporterSettings: set,
}
q := be.queueFactory(context.Background(), qSet, be.queueCfg)
q, err = newObsQueue(qSet, q)
be.QueueSender, err = NewQueueSender(be.queueFactory, qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
if err != nil {
return nil, err
}
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.BatcherCfg)
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}

be.connectSenders()

if bs, ok := be.BatchSender.(*BatchSender); ok {
// If queue sender is enabled assign to the batch sender the same number of workers.
if qs, ok := be.QueueSender.(*QueueSender); ok {
bs.concurrencyLimit = int64(qs.numConsumers)
}
// Batcher sender mutates the data.
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
be.firstSender = be.QueueSender
}

return be, nil
}

// Send sends the request using the first sender in the chain.
func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
err := be.QueueSender.Send(ctx, req)
err := be.firstSender.Send(ctx, req)
if err != nil {
be.Set.Logger.Error("Exporting failed. Rejecting data."+be.ExportFailureMessage,
zap.Error(err), zap.Int("rejected_items", req.ItemsCount()))
}
return err
}

// connectSenders connects the senders in the predefined order.
func (be *BaseExporter) connectSenders() {
be.QueueSender.SetNextSender(be.BatchSender)
be.BatchSender.SetNextSender(be.ObsrepSender)
be.ObsrepSender.SetNextSender(be.RetrySender)
be.RetrySender.SetNextSender(be.TimeoutSender)
}

func (be *BaseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.StartFunc.Start(ctx, host); err != nil {
return err
}

// If no error then start the BatchSender.
if err := be.BatchSender.Start(ctx, host); err != nil {
return err
if be.BatchSender != nil {
// If no error then start the BatchSender.
if err := be.BatchSender.Start(ctx, host); err != nil {
return err
}

Check warning on line 148 in exporter/exporterhelper/internal/base_exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/base_exporter.go#L147-L148

Added lines #L147 - L148 were not covered by tests
}

// Last start the queueSender.
return be.QueueSender.Start(ctx, host)
if be.QueueSender != nil {
return be.QueueSender.Start(ctx, host)
}

return nil
}

func (be *BaseExporter) Shutdown(ctx context.Context) error {
return multierr.Combine(
// First shutdown the retry sender, so the queue sender can flush the queue without retries.
be.RetrySender.Shutdown(ctx),
// Then shutdown the batch sender
be.BatchSender.Shutdown(ctx),
// Then shutdown the queue sender.
be.QueueSender.Shutdown(ctx),
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
var err error

// First shutdown the retry sender, so the queue sender can flush the queue without retries.
if be.RetrySender != nil {
err = multierr.Append(err, be.RetrySender.Shutdown(ctx))
}

// Then shutdown the batch sender
if be.BatchSender != nil {
err = multierr.Append(err, be.BatchSender.Shutdown(ctx))
}

// Then shutdown the queue sender.
if be.QueueSender != nil {
err = multierr.Append(err, be.QueueSender.Shutdown(ctx))
}

// Last shutdown the wrapped exporter itself.
return multierr.Append(err, be.ShutdownFunc.Shutdown(ctx))
}

// WithStart overrides the default Start function for an exporter.
Expand All @@ -188,7 +200,7 @@
// The default TimeoutConfig is 5 seconds.
func WithTimeout(timeoutConfig TimeoutConfig) Option {
return func(o *BaseExporter) error {
o.TimeoutSender.cfg = timeoutConfig
o.timeoutCfg = timeoutConfig
return nil
}
}
Expand All @@ -201,7 +213,7 @@
o.ExportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors."
return nil
}
o.RetrySender = newRetrySender(config, o.Set)
o.retryCfg = config
return nil
}
}
Expand Down Expand Up @@ -268,7 +280,7 @@
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config) Option {
return func(o *BaseExporter) error {
o.BatcherCfg = cfg
o.batcherCfg = cfg
return nil
}
}
Expand Down
10 changes: 8 additions & 2 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ var (
}()
)

func newNoopObsrepSender(*ObsReport) Sender[internal.Request] {
return &BaseSender[internal.Request]{}
type noopSender struct {
component.StartFunc
component.ShutdownFunc
SendFunc[internal.Request]
}

func newNoopObsrepSender(_ *ObsReport, next Sender[internal.Request]) Sender[internal.Request] {
return &noopSender{SendFunc: next.Send}
}

func TestBaseExporter(t *testing.T) {
Expand Down
18 changes: 10 additions & 8 deletions exporter/exporterhelper/internal/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
// - cfg.FlushTimeout is elapsed since the timestamp when the previous batch was sent out.
// - concurrencyLimit is reached.
type BatchSender struct {
BaseSender[internal.Request]
cfg exporterbatcher.Config
cfg exporterbatcher.Config
next Sender[internal.Request]

// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
Expand All @@ -43,11 +43,13 @@ type BatchSender struct {
stopped *atomic.Bool
}

// newBatchSender returns a new batch consumer component.
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender {
// NewBatchSender returns a new batch consumer component.
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings, concurrencyLimit int64, next Sender[internal.Request]) *BatchSender {
bs := &BatchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
next: next,
concurrencyLimit: concurrencyLimit,
activeBatch: newEmptyBatch(),
logger: set.Logger,
shutdownCh: nil,
shutdownCompleteCh: make(chan struct{}),
Expand Down Expand Up @@ -119,7 +121,7 @@ func newEmptyBatch() *batch {
// Caller must hold the lock.
func (bs *BatchSender) exportActiveBatch() {
go func(b *batch) {
b.err = bs.NextSender.Send(b.ctx, b.request)
b.err = bs.next.Send(b.ctx, b.request)
close(b.done)
bs.activeRequests.Add(-b.requestsBlocked)
}(bs.activeBatch)
Expand All @@ -138,7 +140,7 @@ func (bs *BatchSender) isActiveBatchReady() bool {
func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
// Stopped batch sender should act as pass-through to allow the queue to be drained.
if bs.stopped.Load() {
return bs.NextSender.Send(ctx, req)
return bs.next.Send(ctx, req)
}

if bs.cfg.MaxSizeItems > 0 {
Expand Down Expand Up @@ -190,7 +192,7 @@ func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Req
// Intentionally do not put the last request in the active batch to not block it.
// TODO: Consider including the partial request in the error to avoid double publishing.
for _, r := range reqs {
if err := bs.NextSender.Send(ctx, r); err != nil {
if err := bs.next.Send(ctx, r); err != nil {
return err
}
}
Expand Down
11 changes: 7 additions & 4 deletions exporter/exporterhelper/internal/obs_report_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/internal"
)

type obsReportSender[K internal.Request] struct {
BaseSender[K]
component.StartFunc
component.ShutdownFunc
obsrep *ObsReport
next Sender[K]
}

func NewObsReportSender[K internal.Request](obsrep *ObsReport) Sender[K] {
return &obsReportSender[K]{obsrep: obsrep}
func NewObsReportSender[K internal.Request](obsrep *ObsReport, next Sender[K]) Sender[K] {
return &obsReportSender[K]{obsrep: obsrep, next: next}
}

func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
c := ors.obsrep.StartOp(ctx)
items := req.ItemsCount()
// Forward the data to the next consumer (this pusher is the next).
err := ors.NextSender.Send(c, req)
err := ors.next.Send(c, req)
ors.obsrep.EndOp(c, items, err)
return err
}
Loading
Loading