Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use directly the consumer helper funcs in the other helpers packages. #4719

Merged
merged 1 commit into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions component/componenttest/nop_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ func (f *nopExporterFactory) CreateLogsExporter(
}

var nopExporterInstance = &nopExporter{
Component: componenthelper.New(),
Consumer: consumertest.NewNop(),
Consumer: consumertest.NewNop(),
}

// nopExporter stores consumed traces and metrics for testing purposes.
type nopExporter struct {
component.Component
componenthelper.StartFunc
componenthelper.ShutdownFunc
consumertest.Consumer
}
7 changes: 3 additions & 4 deletions component/componenttest/nop_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,10 @@ func (f *nopExtensionFactory) CreateExtension(
return nopExtensionInstance, nil
}

var nopExtensionInstance = &nopExtension{
Component: componenthelper.New(),
}
var nopExtensionInstance = &nopExtension{}

// nopExtension stores consumed traces and metrics for testing purposes.
type nopExtension struct {
component.Component
componenthelper.StartFunc
componenthelper.ShutdownFunc
}
6 changes: 3 additions & 3 deletions component/componenttest/nop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ func (f *nopProcessorFactory) CreateLogsProcessor(
}

var nopProcessorInstance = &nopProcessor{
Component: componenthelper.New(),
Consumer: consumertest.NewNop(),
Consumer: consumertest.NewNop(),
}

// nopProcessor stores consumed traces and metrics for testing purposes.
type nopProcessor struct {
component.Component
componenthelper.StartFunc
componenthelper.ShutdownFunc
consumertest.Consumer
}
7 changes: 3 additions & 4 deletions component/componenttest/nop_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ func (f *nopReceiverFactory) CreateLogsReceiver(
return nopReceiverInstance, nil
}

var nopReceiverInstance = &nopReceiver{
Component: componenthelper.New(),
}
var nopReceiverInstance = &nopReceiver{}

// nopReceiver stores consumed traces and metrics for testing purposes.
type nopReceiver struct {
component.Component
componenthelper.StartFunc
componenthelper.ShutdownFunc
}
51 changes: 23 additions & 28 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func (req *baseRequest) OnProcessingFinished() {

// baseSettings represents all the options that users can configure.
type baseSettings struct {
componentOptions []componenthelper.Option
consumerOptions []consumerhelper.Option
componenthelper.StartFunc
componenthelper.ShutdownFunc
consumerOptions []consumerhelper.Option
TimeoutSettings
QueueSettings
RetrySettings
Expand Down Expand Up @@ -120,15 +121,15 @@ type Option func(*baseSettings)
// The default start function does nothing and always returns nil.
func WithStart(start componenthelper.StartFunc) Option {
return func(o *baseSettings) {
o.componentOptions = append(o.componentOptions, componenthelper.WithStart(start))
o.StartFunc = start
}
}

// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown componenthelper.ShutdownFunc) Option {
return func(o *baseSettings) {
o.componentOptions = append(o.componentOptions, componenthelper.WithShutdown(shutdown))
o.ShutdownFunc = shutdown
}
}

Expand Down Expand Up @@ -167,16 +168,15 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.Component
componenthelper.StartFunc
componenthelper.ShutdownFunc
obsrep *obsExporter
sender requestSender
qrSender *queuedRetrySender
}

func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings, bs *baseSettings, signal config.DataType, reqUnmarshaler internal.RequestUnmarshaler) *baseExporter {
be := &baseExporter{
Component: componenthelper.New(bs.componentOptions...),
}
be := &baseExporter{}

be.obsrep = newObsExporter(obsreport.ExporterSettings{
Level: set.MetricsLevel,
Expand All @@ -185,7 +185,21 @@ func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings,
}, globalInstruments)
be.qrSender = newQueuedRetrySender(cfg.ID(), signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender

be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := bs.StartFunc.Start(ctx, host); err != nil {
return err
}

// If no error then start the queuedRetrySender.
return be.qrSender.start(ctx, host)
}
be.ShutdownFunc = func(ctx context.Context) error {
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
return bs.ShutdownFunc.Shutdown(ctx)
}
return be
}

Expand All @@ -195,25 +209,6 @@ func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) reques
be.qrSender.consumerSender = f(be.qrSender.consumerSender)
}

// Start all senders and exporter and is invoked during service start.
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.Component.Start(ctx, host); err != nil {
return err
}

// If no error then start the queuedRetrySender.
return be.qrSender.start(ctx, host)
}

// Shutdown all senders and exporter and is invoked during service shutdown.
func (be *baseExporter) Shutdown(ctx context.Context) error {
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
return be.Component.Shutdown(ctx)
}

// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.
type timeoutSender struct {
cfg TimeoutSettings
Expand Down
8 changes: 5 additions & 3 deletions processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
type ProcessLogsFunc func(context.Context, pdata.Logs) (pdata.Logs, error)

type logProcessor struct {
component.Component
componenthelper.StartFunc
componenthelper.ShutdownFunc
consumer.Logs
}

Expand Down Expand Up @@ -75,7 +76,8 @@ func NewLogsProcessor(
}

return &logProcessor{
Component: componenthelper.New(bs.componentOptions...),
Logs: logsConsumer,
StartFunc: bs.StartFunc,
ShutdownFunc: bs.ShutdownFunc,
Logs: logsConsumer,
}, nil
}
8 changes: 5 additions & 3 deletions processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
type ProcessMetricsFunc func(context.Context, pdata.Metrics) (pdata.Metrics, error)

type metricsProcessor struct {
component.Component
componenthelper.StartFunc
componenthelper.ShutdownFunc
consumer.Metrics
}

Expand Down Expand Up @@ -75,7 +76,8 @@ func NewMetricsProcessor(
}

return &metricsProcessor{
Component: componenthelper.New(bs.componentOptions...),
Metrics: metricsConsumer,
StartFunc: bs.StartFunc,
ShutdownFunc: bs.ShutdownFunc,
Metrics: metricsConsumer,
}, nil
}
9 changes: 5 additions & 4 deletions processor/processorhelper/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ type Option func(*baseSettings)
// The default shutdown function does nothing and always returns nil.
func WithStart(start componenthelper.StartFunc) Option {
return func(o *baseSettings) {
o.componentOptions = append(o.componentOptions, componenthelper.WithStart(start))
o.StartFunc = start
}
}

// WithShutdown overrides the default Shutdown function for an processor.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown componenthelper.ShutdownFunc) Option {
return func(o *baseSettings) {
o.componentOptions = append(o.componentOptions, componenthelper.WithShutdown(shutdown))
o.ShutdownFunc = shutdown
}
}

Expand All @@ -60,8 +60,9 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
}

type baseSettings struct {
componentOptions []componenthelper.Option
consumerOptions []consumerhelper.Option
componenthelper.StartFunc
componenthelper.ShutdownFunc
consumerOptions []consumerhelper.Option
}

// fromOptions returns the internal settings starting from the default and applying all options.
Expand Down
8 changes: 5 additions & 3 deletions processor/processorhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
type ProcessTracesFunc func(context.Context, pdata.Traces) (pdata.Traces, error)

type tracesProcessor struct {
component.Component
componenthelper.StartFunc
componenthelper.ShutdownFunc
consumer.Traces
}

Expand Down Expand Up @@ -76,7 +77,8 @@ func NewTracesProcessor(
}

return &tracesProcessor{
Component: componenthelper.New(bs.componentOptions...),
Traces: traceConsumer,
StartFunc: bs.StartFunc,
ShutdownFunc: bs.ShutdownFunc,
Traces: traceConsumer,
}, nil
}
30 changes: 12 additions & 18 deletions receiver/scraperhelper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,28 @@ type Scraper interface {
Scrape(context.Context) (pdata.Metrics, error)
}

type baseSettings struct {
componentOptions []componenthelper.Option
}

// ScraperOption apply changes to internal options.
type ScraperOption func(*baseSettings)
type ScraperOption func(*baseScraper)

// WithStart sets the function that will be called on startup.
func WithStart(start componenthelper.StartFunc) ScraperOption {
return func(o *baseSettings) {
o.componentOptions = append(o.componentOptions, componenthelper.WithStart(start))
return func(o *baseScraper) {
o.StartFunc = start
}
}

// WithShutdown sets the function that will be called on shutdown.
func WithShutdown(shutdown componenthelper.ShutdownFunc) ScraperOption {
return func(o *baseSettings) {
o.componentOptions = append(o.componentOptions, componenthelper.WithShutdown(shutdown))
return func(o *baseScraper) {
o.ShutdownFunc = shutdown
}
}

var _ Scraper = (*baseScraper)(nil)

type baseScraper struct {
component.Component
componenthelper.StartFunc
componenthelper.ShutdownFunc
ScrapeFunc
id config.ComponentID
}
Expand All @@ -81,16 +78,13 @@ func NewScraper(name string, scrape ScrapeFunc, options ...ScraperOption) (Scrap
if scrape == nil {
return nil, errNilFunc
}
set := &baseSettings{}
for _, op := range options {
op(set)
}

ms := &baseScraper{
Component: componenthelper.New(set.componentOptions...),
bs := &baseScraper{
ScrapeFunc: scrape,
id: config.NewComponentID(config.Type(name)),
}
for _, op := range options {
op(bs)
}

return ms, nil
return bs, nil
}