Skip to content

Commit

Permalink
Move everything from processorhelper to component.
Browse files Browse the repository at this point in the history
Updates open-telemetry#4681

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Feb 20, 2022
1 parent 70271f2 commit 5c851ea
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 124 deletions.
110 changes: 108 additions & 2 deletions component/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component"
import (
"context"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/internalinterface"
Expand Down Expand Up @@ -54,8 +55,8 @@ type ProcessorCreateSettings struct {
BuildInfo BuildInfo
}

// ProcessorFactory is factory interface for processors. This is the
// new factory type that can create new style processors.
// ProcessorFactory is processorFactory interface for processors. This is the
// new processorFactory type that can create new style processors.
//
// This interface cannot be directly implemented. Implementations must
// use the processorhelper.NewFactory to implement it.
Expand Down Expand Up @@ -102,3 +103,108 @@ type ProcessorFactory interface {
nextConsumer consumer.Logs,
) (LogsProcessor, error)
}

// ProcessorFactoryOption apply changes to ProcessorOptions.
type ProcessorFactoryOption func(o *processorFactory)

// ProcessorCreateDefaultConfigFunc is the equivalent of ProcessorFactory.CreateDefaultConfig()
type ProcessorCreateDefaultConfigFunc func() config.Processor

// CreateDefaultConfig implements ExtensionFactory.CreateDefaultConfig()
func (f ProcessorCreateDefaultConfigFunc) CreateDefaultConfig() config.Processor {
return f()
}

// CreateTracesProcessorFunc is the equivalent of ProcessorFactory.CreateTracesProcessor()
type CreateTracesProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Traces) (TracesProcessor, error)

// CreateTracesProcessor creates a TracesProcessor based on this config.
func (f CreateTracesProcessorFunc) CreateTracesProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces) (TracesProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

// CreateMetricsProcessorFunc is the equivalent of ProcessorFactory.CreateMetricsProcessor()
type CreateMetricsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Metrics) (MetricsProcessor, error)

// CreateMetricsProcessor creates a MetricsProcessor based on this config.
func (f CreateMetricsProcessorFunc) CreateMetricsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (MetricsProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

// CreateLogsProcessorFunc is the equivalent of ProcessorFactory.CreateLogsProcessor()
type CreateLogsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Logs) (LogsProcessor, error)

// CreateLogsProcessor creates a LogsProcessor based on this config.
func (f CreateLogsProcessorFunc) CreateLogsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (LogsProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

type processorFactory struct {
internalinterface.BaseInternal
cfgType config.Type
ProcessorCreateDefaultConfigFunc
CreateTracesProcessorFunc
CreateMetricsProcessorFunc
CreateLogsProcessorFunc
}

// WithTracesProcessor overrides the default "error not supported" implementation for CreateTracesProcessor.
func WithTracesProcessor(createTracesProcessor CreateTracesProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateTracesProcessorFunc = createTracesProcessor
}
}

// WithMetricsProcessor overrides the default "error not supported" implementation for CreateMetricsProcessor.
func WithMetricsProcessor(createMetricsProcessor CreateMetricsProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateMetricsProcessorFunc = createMetricsProcessor
}
}

// WithLogsProcessor overrides the default "error not supported" implementation for CreateLogsProcessor.
func WithLogsProcessor(createLogsProcessor CreateLogsProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateLogsProcessorFunc = createLogsProcessor
}
}

// NewProcessorFactory returns a ProcessorFactory.
func NewProcessorFactory(cfgType config.Type, createDefaultConfig ProcessorCreateDefaultConfigFunc, options ...ProcessorFactoryOption) ProcessorFactory {
f := &processorFactory{
cfgType: cfgType,
ProcessorCreateDefaultConfigFunc: createDefaultConfig,
}
for _, opt := range options {
opt(f)
}
return f
}

// Type gets the type of the Processor config created by this processorFactory.
func (f *processorFactory) Type() config.Type {
return f.cfgType
}
9 changes: 4 additions & 5 deletions internal/testcomponents/example_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

// ExampleProcessorCfg is for testing purposes. We are defining an example config and factory
Expand All @@ -35,12 +34,12 @@ type ExampleProcessorCfg struct {
const procType = "exampleprocessor"

// ExampleProcessorFactory is factory for exampleProcessor.
var ExampleProcessorFactory = processorhelper.NewFactory(
var ExampleProcessorFactory = component.NewProcessorFactory(
procType,
createDefaultConfig,
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor))

// CreateDefaultConfig creates the default configuration for the Processor.
func createDefaultConfig() config.Processor {
Expand Down
9 changes: 4 additions & 5 deletions processor/batchprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

const (
Expand All @@ -34,12 +33,12 @@ const (

// NewFactory returns a new factory for the Batch processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor))
}

func createDefaultConfig() config.Processor {
Expand Down
8 changes: 4 additions & 4 deletions processor/memorylimiterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ var processorCapabilities = consumer.Capabilities{MutatesData: false}

// NewFactory returns a new factory for the Memory Limiter processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor))
}

// CreateDefaultConfig creates the default configuration for processor. Notice
Expand Down
124 changes: 18 additions & 106 deletions processor/processorhelper/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,120 +15,32 @@
package processorhelper // import "go.opentelemetry.io/collector/processor/processorhelper"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/internalinterface"
)

// FactoryOption apply changes to ProcessorOptions.
type FactoryOption func(o *factory)

// CreateDefaultConfig is the equivalent of component.ProcessorFactory.CreateDefaultConfig()
type CreateDefaultConfig func() config.Processor

// CreateTracesProcessor is the equivalent of component.ProcessorFactory.CreateTracesProcessor()
type CreateTracesProcessor func(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Traces) (component.TracesProcessor, error)

// CreateMetricsProcessor is the equivalent of component.ProcessorFactory.CreateMetricsProcessor()
type CreateMetricsProcessor func(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Metrics) (component.MetricsProcessor, error)

// CreateLogsProcessor is the equivalent of component.ProcessorFactory.CreateLogsProcessor()
type CreateLogsProcessor func(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Logs) (component.LogsProcessor, error)

type factory struct {
internalinterface.BaseInternal
cfgType config.Type
createDefaultConfig CreateDefaultConfig
createTracesProcessor CreateTracesProcessor
createMetricsProcessor CreateMetricsProcessor
createLogsProcessor CreateLogsProcessor
}

// WithTraces overrides the default "error not supported" implementation for CreateTracesProcessor.
func WithTraces(createTracesProcessor CreateTracesProcessor) FactoryOption {
return func(o *factory) {
o.createTracesProcessor = createTracesProcessor
}
}
// Deprecated: use component.ProcessorFactoryOption.
type FactoryOption = component.ProcessorFactoryOption

// WithMetrics overrides the default "error not supported" implementation for CreateMetricsProcessor.
func WithMetrics(createMetricsProcessor CreateMetricsProcessor) FactoryOption {
return func(o *factory) {
o.createMetricsProcessor = createMetricsProcessor
}
}
// Deprecated: use component.ProcessorCreateDefaultConfigFunc.
type CreateDefaultConfig = component.ProcessorCreateDefaultConfigFunc

// WithLogs overrides the default "error not supported" implementation for CreateLogsProcessor.
func WithLogs(createLogsProcessor CreateLogsProcessor) FactoryOption {
return func(o *factory) {
o.createLogsProcessor = createLogsProcessor
}
}
// Deprecated: use component.CreateTracesProcessorFunc.
type CreateTracesProcessor = component.CreateTracesProcessorFunc

// NewFactory returns a component.ProcessorFactory.
func NewFactory(
cfgType config.Type,
createDefaultConfig CreateDefaultConfig,
options ...FactoryOption) component.ProcessorFactory {
f := &factory{
cfgType: cfgType,
createDefaultConfig: createDefaultConfig,
}
for _, opt := range options {
opt(f)
}
return f
}
// Deprecated: use component.CreateMetricsProcessorFunc.
type CreateMetricsProcessor = component.CreateMetricsProcessorFunc

// Type gets the type of the Processor config created by this factory.
func (f *factory) Type() config.Type {
return f.cfgType
}
// Deprecated: use component.CreateLogsProcessorFunc.
type CreateLogsProcessor = component.CreateLogsProcessorFunc

// CreateDefaultConfig creates the default configuration for processor.
func (f *factory) CreateDefaultConfig() config.Processor {
return f.createDefaultConfig()
}
// Deprecated: use component.WithTracesProcessor.
var WithTraces = component.WithTracesProcessor

// CreateTracesProcessor creates a component.TracesProcessor based on this config.
func (f *factory) CreateTracesProcessor(
ctx context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces,
) (component.TracesProcessor, error) {
if f.createTracesProcessor == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f.createTracesProcessor(ctx, set, cfg, nextConsumer)
}
// Deprecated: use component.WithMetricsProcessor.
var WithMetrics = component.WithMetricsProcessor

// CreateMetricsProcessor creates a component.MetricsProcessor based on this config.
func (f *factory) CreateMetricsProcessor(
ctx context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (component.MetricsProcessor, error) {
if f.createMetricsProcessor == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f.createMetricsProcessor(ctx, set, cfg, nextConsumer)
}
// Deprecated: use component.WithLogsProcessor.
var WithLogs = component.WithLogsProcessor

// CreateLogsProcessor creates a component.LogsProcessor based on this config.
func (f *factory) CreateLogsProcessor(
ctx context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (component.LogsProcessor, error) {
if f.createLogsProcessor == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f.createLogsProcessor(ctx, set, cfg, nextConsumer)
}
// Deprecated: use component.NewProcessorFactory.
var NewFactory = component.NewProcessorFactory
3 changes: 1 addition & 2 deletions service/internal/builder/factories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/testcomponents"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

Expand Down Expand Up @@ -60,7 +59,7 @@ func newBadReceiverFactory() component.ReceiverFactory {
}

func newBadProcessorFactory() component.ProcessorFactory {
return processorhelper.NewFactory("bf", func() config.Processor {
return component.NewProcessorFactory("bf", func() config.Processor {
return &struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}{
Expand Down

0 comments on commit 5c851ea

Please sign in to comment.