Skip to content

Commit

Permalink
Move everything from processorhelper to component.
Browse files Browse the repository at this point in the history
Updates open-telemetry#4681

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Feb 20, 2022
1 parent 89e0d42 commit 1cc6568
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 246 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
- Deprecated `extensionhelper.CreateDefaultConfig` in favour of `component.ExtensionDefaultConfigFunc`
- Deprecated `extensionhelper.CreateServiceExtension` in favour of `component.CreateExtensionFunc`
- Deprecated `extensionhelper.NewFactory` in favour of `component.NewExtensionFactory`
- Move helpers from processorhelper to component (#4805)
- Deprecated `processorhelper.CreateDefaultConfig` in favour of `component.ProcessorDefaultConfigFunc`
- Deprecated `processorhelper.WithTraces` in favour of `component.WithTracesProcessor`
- Deprecated `processorhelper.WithMetrics` in favour of `component.WithMetricsProcessor`
- Deprecated `processorhelper.WithLogs` in favour of `component.WithLogsProcessor`
- Deprecated `processorhelper.NewFactory` in favour of `component.NewExtensionFactory`

### 💡 Enhancements 💡

Expand Down
48 changes: 12 additions & 36 deletions component/componenttest/nop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/internalinterface"
)

// NewNopProcessorCreateSettings returns a new nop settings for Create*Processor functions.
Expand All @@ -36,57 +35,34 @@ type nopProcessorConfig struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// nopProcessorFactory is factory for nopProcessor.
type nopProcessorFactory struct {
internalinterface.BaseInternal
}

var nopProcessorFactoryInstance = &nopProcessorFactory{}
var nopProcessorFactory = component.NewProcessorFactory(
"nop",
createDefaultConfig,
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor),
)

// NewNopProcessorFactory returns a component.ProcessorFactory that constructs nop processors.
func NewNopProcessorFactory() component.ProcessorFactory {
return nopProcessorFactoryInstance
}

// Type gets the type of the Processor config created by this factory.
func (f *nopProcessorFactory) Type() config.Type {
return "nop"
return nopProcessorFactory
}

// CreateDefaultConfig creates the default configuration for the Processor.
func (f *nopProcessorFactory) CreateDefaultConfig() config.Processor {
func createDefaultConfig() config.Processor {
return &nopProcessorConfig{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("nop")),
}
}

// CreateTracesProcessor implements component.ProcessorFactory interface.
func (f *nopProcessorFactory) CreateTracesProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
_ config.Processor,
_ consumer.Traces,
) (component.TracesProcessor, error) {
func createTracesProcessor(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Traces) (component.TracesProcessor, error) {
return nopProcessorInstance, nil
}

// CreateMetricsProcessor implements component.ProcessorFactory interface.
func (f *nopProcessorFactory) CreateMetricsProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
_ config.Processor,
_ consumer.Metrics,
) (component.MetricsProcessor, error) {
func createMetricsProcessor(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Metrics) (component.MetricsProcessor, error) {
return nopProcessorInstance, nil
}

// CreateLogsProcessor implements component.ProcessorFactory interface.
func (f *nopProcessorFactory) CreateLogsProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
_ config.Processor,
_ consumer.Logs,
) (component.LogsProcessor, error) {
func createLogsProcessor(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Logs) (component.LogsProcessor, error) {
return nopProcessorInstance, nil
}

Expand Down
110 changes: 108 additions & 2 deletions component/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component"
import (
"context"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/internalinterface"
Expand Down Expand Up @@ -54,8 +55,8 @@ type ProcessorCreateSettings struct {
BuildInfo BuildInfo
}

// ProcessorFactory is factory interface for processors. This is the
// new factory type that can create new style processors.
// ProcessorFactory is processorFactory interface for processors. This is the
// new processorFactory type that can create new style processors.
//
// This interface cannot be directly implemented. Implementations must
// use the processorhelper.NewFactory to implement it.
Expand Down Expand Up @@ -102,3 +103,108 @@ type ProcessorFactory interface {
nextConsumer consumer.Logs,
) (LogsProcessor, error)
}

// ProcessorFactoryOption apply changes to ProcessorOptions.
type ProcessorFactoryOption func(o *processorFactory)

// ProcessorCreateDefaultConfigFunc is the equivalent of ProcessorFactory.CreateDefaultConfig()
type ProcessorCreateDefaultConfigFunc func() config.Processor

// CreateDefaultConfig implements ExtensionFactory.CreateDefaultConfig()
func (f ProcessorCreateDefaultConfigFunc) CreateDefaultConfig() config.Processor {
return f()
}

// CreateTracesProcessorFunc is the equivalent of ProcessorFactory.CreateTracesProcessor()
type CreateTracesProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Traces) (TracesProcessor, error)

// CreateTracesProcessor creates a TracesProcessor based on this config.
func (f CreateTracesProcessorFunc) CreateTracesProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces) (TracesProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

// CreateMetricsProcessorFunc is the equivalent of ProcessorFactory.CreateMetricsProcessor()
type CreateMetricsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Metrics) (MetricsProcessor, error)

// CreateMetricsProcessor creates a MetricsProcessor based on this config.
func (f CreateMetricsProcessorFunc) CreateMetricsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (MetricsProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

// CreateLogsProcessorFunc is the equivalent of ProcessorFactory.CreateLogsProcessor()
type CreateLogsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Logs) (LogsProcessor, error)

// CreateLogsProcessor creates a LogsProcessor based on this config.
func (f CreateLogsProcessorFunc) CreateLogsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (LogsProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

type processorFactory struct {
internalinterface.BaseInternal
cfgType config.Type
ProcessorCreateDefaultConfigFunc
CreateTracesProcessorFunc
CreateMetricsProcessorFunc
CreateLogsProcessorFunc
}

// WithTracesProcessor overrides the default "error not supported" implementation for CreateTracesProcessor.
func WithTracesProcessor(createTracesProcessor CreateTracesProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateTracesProcessorFunc = createTracesProcessor
}
}

// WithMetricsProcessor overrides the default "error not supported" implementation for CreateMetricsProcessor.
func WithMetricsProcessor(createMetricsProcessor CreateMetricsProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateMetricsProcessorFunc = createMetricsProcessor
}
}

// WithLogsProcessor overrides the default "error not supported" implementation for CreateLogsProcessor.
func WithLogsProcessor(createLogsProcessor CreateLogsProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateLogsProcessorFunc = createLogsProcessor
}
}

// NewProcessorFactory returns a ProcessorFactory.
func NewProcessorFactory(cfgType config.Type, createDefaultConfig ProcessorCreateDefaultConfigFunc, options ...ProcessorFactoryOption) ProcessorFactory {
f := &processorFactory{
cfgType: cfgType,
ProcessorCreateDefaultConfigFunc: createDefaultConfig,
}
for _, opt := range options {
opt(f)
}
return f
}

// Type gets the type of the Processor config created by this processorFactory.
func (f *processorFactory) Type() config.Type {
return f.cfgType
}
62 changes: 60 additions & 2 deletions component/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,25 @@
package component

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
)

var _ ProcessorFactory = (*TestProcessorFactory)(nil)

type TestProcessorFactory struct {
ProcessorFactory
name string
name config.Type
}

// Type gets the type of the Processor config created by this factory.
func (f *TestProcessorFactory) Type() config.Type {
return config.Type(f.name)
return f.name
}

func TestMakeProcessorFactoryMap(t *testing.T) {
Expand Down Expand Up @@ -69,3 +71,59 @@ func TestMakeProcessorFactoryMap(t *testing.T) {
assert.Equal(t, c.out, out)
}
}

func TestNewTrace(t *testing.T) {
const typeStr = "test"
defaultCfg := config.NewProcessorSettings(config.NewComponentID(typeStr))
factory := NewProcessorFactory(
typeStr,
func() config.Processor { return &defaultCfg })
assert.EqualValues(t, typeStr, factory.Type())
assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig())
_, err := factory.CreateTracesProcessor(context.Background(), newNopProcessorCreateSettings(), &defaultCfg, nil)
assert.Error(t, err)
_, err = factory.CreateMetricsProcessor(context.Background(), newNopProcessorCreateSettings(), &defaultCfg, nil)
assert.Error(t, err)
_, err = factory.CreateLogsProcessor(context.Background(), newNopProcessorCreateSettings(), &defaultCfg, nil)
assert.Error(t, err)
}

func TestNewMetrics_WithConstructors(t *testing.T) {
const typeStr = "test"
defaultCfg := config.NewProcessorSettings(config.NewComponentID(typeStr))
factory := NewProcessorFactory(
typeStr,
func() config.Processor { return &defaultCfg },
WithTracesProcessor(createTracesProcessor),
WithMetricsProcessor(createMetricsProcessor),
WithLogsProcessor(createLogsProcessor))
assert.EqualValues(t, typeStr, factory.Type())
assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig())

_, err := factory.CreateTracesProcessor(context.Background(), newNopProcessorCreateSettings(), &defaultCfg, nil)
assert.NoError(t, err)

_, err = factory.CreateMetricsProcessor(context.Background(), newNopProcessorCreateSettings(), &defaultCfg, nil)
assert.NoError(t, err)

_, err = factory.CreateLogsProcessor(context.Background(), newNopProcessorCreateSettings(), &defaultCfg, nil)
assert.NoError(t, err)
}

func createTracesProcessor(context.Context, ProcessorCreateSettings, config.Processor, consumer.Traces) (TracesProcessor, error) {
return nil, nil
}

func createMetricsProcessor(context.Context, ProcessorCreateSettings, config.Processor, consumer.Metrics) (MetricsProcessor, error) {
return nil, nil
}

func createLogsProcessor(context.Context, ProcessorCreateSettings, config.Processor, consumer.Logs) (LogsProcessor, error) {
return nil, nil
}

func newNopProcessorCreateSettings() ProcessorCreateSettings {
return ProcessorCreateSettings{
BuildInfo: NewDefaultBuildInfo(),
}
}
9 changes: 4 additions & 5 deletions internal/testcomponents/example_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

// ExampleProcessorCfg is for testing purposes. We are defining an example config and factory
Expand All @@ -35,12 +34,12 @@ type ExampleProcessorCfg struct {
const procType = "exampleprocessor"

// ExampleProcessorFactory is factory for exampleProcessor.
var ExampleProcessorFactory = processorhelper.NewFactory(
var ExampleProcessorFactory = component.NewProcessorFactory(
procType,
createDefaultConfig,
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor))

// CreateDefaultConfig creates the default configuration for the Processor.
func createDefaultConfig() config.Processor {
Expand Down
9 changes: 4 additions & 5 deletions processor/batchprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

const (
Expand All @@ -34,12 +33,12 @@ const (

// NewFactory returns a new factory for the Batch processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor))
}

func createDefaultConfig() config.Processor {
Expand Down
8 changes: 4 additions & 4 deletions processor/memorylimiterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ var processorCapabilities = consumer.Capabilities{MutatesData: false}

// NewFactory returns a new factory for the Memory Limiter processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor))
}

// CreateDefaultConfig creates the default configuration for processor. Notice
Expand Down
Loading

0 comments on commit 1cc6568

Please sign in to comment.