Skip to content

Commit

Permalink
Move everything from processorhelper to component.
Browse files Browse the repository at this point in the history
Updates #4681

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Feb 22, 2022
1 parent 8fa0a81 commit 0bf4516
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 290 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
- Deprecated `extensionhelper.CreateDefaultConfig` in favour of `component.ExtensionDefaultConfigFunc`
- Deprecated `extensionhelper.CreateServiceExtension` in favour of `component.CreateExtensionFunc`
- Deprecated `extensionhelper.NewFactory` in favour of `component.NewExtensionFactory`
- Move helpers from processorhelper to component (#4889)
- Deprecated `processorhelper.CreateDefaultConfig` in favour of `component.ProcessorDefaultConfigFunc`
- Deprecated `processorhelper.WithTraces` in favour of `component.WithTracesProcessor`
- Deprecated `processorhelper.WithMetrics` in favour of `component.WithMetricsProcessor`
- Deprecated `processorhelper.WithLogs` in favour of `component.WithLogsProcessor`
- Deprecated `processorhelper.NewFactory` in favour of `component.NewProcessorFactory`

### 💡 Enhancements 💡

Expand Down
56 changes: 15 additions & 41 deletions component/componenttest/nop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/internalinterface"
)

// NewNopProcessorCreateSettings returns a new nop settings for Create*Processor functions.
Expand All @@ -36,57 +35,32 @@ type nopProcessorConfig struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// nopProcessorFactory is factory for nopProcessor.
type nopProcessorFactory struct {
internalinterface.BaseInternal
}

var nopProcessorFactoryInstance = &nopProcessorFactory{}
var nopProcessorFactory = component.NewProcessorFactory(
"nop",
func() config.Processor {
return &nopProcessorConfig{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("nop")),
}
},
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor),
)

// NewNopProcessorFactory returns a component.ProcessorFactory that constructs nop processors.
func NewNopProcessorFactory() component.ProcessorFactory {
return nopProcessorFactoryInstance
}

// Type gets the type of the Processor config created by this factory.
func (f *nopProcessorFactory) Type() config.Type {
return "nop"
}

// CreateDefaultConfig creates the default configuration for the Processor.
func (f *nopProcessorFactory) CreateDefaultConfig() config.Processor {
return &nopProcessorConfig{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("nop")),
}
return nopProcessorFactory
}

// CreateTracesProcessor implements component.ProcessorFactory interface.
func (f *nopProcessorFactory) CreateTracesProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
_ config.Processor,
_ consumer.Traces,
) (component.TracesProcessor, error) {
func createTracesProcessor(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Traces) (component.TracesProcessor, error) {
return nopProcessorInstance, nil
}

// CreateMetricsProcessor implements component.ProcessorFactory interface.
func (f *nopProcessorFactory) CreateMetricsProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
_ config.Processor,
_ consumer.Metrics,
) (component.MetricsProcessor, error) {
func createMetricsProcessor(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Metrics) (component.MetricsProcessor, error) {
return nopProcessorInstance, nil
}

// CreateLogsProcessor implements component.ProcessorFactory interface.
func (f *nopProcessorFactory) CreateLogsProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
_ config.Processor,
_ consumer.Logs,
) (component.LogsProcessor, error) {
func createLogsProcessor(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Logs) (component.LogsProcessor, error) {
return nopProcessorInstance, nil
}

Expand Down
61 changes: 61 additions & 0 deletions component/factories_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/config"
)

func TestMakeProcessorFactoryMap(t *testing.T) {
type testCase struct {
name string
in []ProcessorFactory
out map[config.Type]ProcessorFactory
}

p1 := NewProcessorFactory("p1", nil)
p2 := NewProcessorFactory("p2", nil)
testCases := []testCase{
{
name: "different names",
in: []ProcessorFactory{p1, p2},
out: map[config.Type]ProcessorFactory{
p1.Type(): p1,
p2.Type(): p2,
},
},
{
name: "same name",
in: []ProcessorFactory{p1, p2, NewProcessorFactory("p1", nil)},
},
}

for i := range testCases {
tt := testCases[i]
t.Run(tt.name, func(t *testing.T) {
out, err := MakeProcessorFactoryMap(tt.in...)
if tt.out == nil {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, tt.out, out)
})
}
}
111 changes: 108 additions & 3 deletions component/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component"
import (
"context"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/internalinterface"
Expand Down Expand Up @@ -54,11 +55,10 @@ type ProcessorCreateSettings struct {
BuildInfo BuildInfo
}

// ProcessorFactory is factory interface for processors. This is the
// new factory type that can create new style processors.
// ProcessorFactory is Factory interface for processors.
//
// This interface cannot be directly implemented. Implementations must
// use the processorhelper.NewFactory to implement it.
// use the NewProcessorFactory to implement it.
type ProcessorFactory interface {
internalinterface.InternalInterface
Factory
Expand Down Expand Up @@ -102,3 +102,108 @@ type ProcessorFactory interface {
nextConsumer consumer.Logs,
) (LogsProcessor, error)
}

// ProcessorFactoryOption apply changes to ProcessorOptions.
type ProcessorFactoryOption func(o *processorFactory)

// ProcessorCreateDefaultConfigFunc is the equivalent of ProcessorFactory.CreateDefaultConfig().
type ProcessorCreateDefaultConfigFunc func() config.Processor

// CreateDefaultConfig implements ProcessorFactory.CreateDefaultConfig().
func (f ProcessorCreateDefaultConfigFunc) CreateDefaultConfig() config.Processor {
return f()
}

// CreateTracesProcessorFunc is the equivalent of ProcessorFactory.CreateTracesProcessor().
type CreateTracesProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Traces) (TracesProcessor, error)

// CreateTracesProcessor implements ProcessorFactory.CreateTracesProcessor().
func (f CreateTracesProcessorFunc) CreateTracesProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces) (TracesProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

// CreateMetricsProcessorFunc is the equivalent of ProcessorFactory.CreateMetricsProcessor().
type CreateMetricsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Metrics) (MetricsProcessor, error)

// CreateMetricsProcessor implements ProcessorFactory.CreateMetricsProcessor().
func (f CreateMetricsProcessorFunc) CreateMetricsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (MetricsProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

// CreateLogsProcessorFunc is the equivalent of ProcessorFactory.CreateLogsProcessor().
type CreateLogsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Logs) (LogsProcessor, error)

// CreateLogsProcessor implements ProcessorFactory.CreateLogsProcessor().
func (f CreateLogsProcessorFunc) CreateLogsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (LogsProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

type processorFactory struct {
internalinterface.BaseInternal
cfgType config.Type
ProcessorCreateDefaultConfigFunc
CreateTracesProcessorFunc
CreateMetricsProcessorFunc
CreateLogsProcessorFunc
}

// WithTracesProcessor overrides the default "error not supported" implementation for CreateTracesProcessor.
func WithTracesProcessor(createTracesProcessor CreateTracesProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateTracesProcessorFunc = createTracesProcessor
}
}

// WithMetricsProcessor overrides the default "error not supported" implementation for CreateMetricsProcessor.
func WithMetricsProcessor(createMetricsProcessor CreateMetricsProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateMetricsProcessorFunc = createMetricsProcessor
}
}

// WithLogsProcessor overrides the default "error not supported" implementation for CreateLogsProcessor.
func WithLogsProcessor(createLogsProcessor CreateLogsProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateLogsProcessorFunc = createLogsProcessor
}
}

// NewProcessorFactory returns a ProcessorFactory.
func NewProcessorFactory(cfgType config.Type, createDefaultConfig ProcessorCreateDefaultConfigFunc, options ...ProcessorFactoryOption) ProcessorFactory {
f := &processorFactory{
cfgType: cfgType,
ProcessorCreateDefaultConfigFunc: createDefaultConfig,
}
for _, opt := range options {
opt(f)
}
return f
}

// Type returns the type of the Processor created by this ProcessorFactory.
func (f *processorFactory) Type() config.Type {
return f.cfgType
}
84 changes: 44 additions & 40 deletions component/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,61 @@
package component

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
)

var _ ProcessorFactory = (*TestProcessorFactory)(nil)

type TestProcessorFactory struct {
ProcessorFactory
name string
func TestNewProcessorFactory(t *testing.T) {
const typeStr = "test"
defaultCfg := config.NewProcessorSettings(config.NewComponentID(typeStr))
factory := NewProcessorFactory(
typeStr,
func() config.Processor { return &defaultCfg })
assert.EqualValues(t, typeStr, factory.Type())
assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig())
_, err := factory.CreateTracesProcessor(context.Background(), ProcessorCreateSettings{}, &defaultCfg, nil)
assert.Error(t, err)
_, err = factory.CreateMetricsProcessor(context.Background(), ProcessorCreateSettings{}, &defaultCfg, nil)
assert.Error(t, err)
_, err = factory.CreateLogsProcessor(context.Background(), ProcessorCreateSettings{}, &defaultCfg, nil)
assert.Error(t, err)
}

// Type gets the type of the Processor config created by this factory.
func (f *TestProcessorFactory) Type() config.Type {
return config.Type(f.name)
func TestNewProcessorFactory_WithOptions(t *testing.T) {
const typeStr = "test"
defaultCfg := config.NewProcessorSettings(config.NewComponentID(typeStr))
factory := NewProcessorFactory(
typeStr,
func() config.Processor { return &defaultCfg },
WithTracesProcessor(createTracesProcessor),
WithMetricsProcessor(createMetricsProcessor),
WithLogsProcessor(createLogsProcessor))
assert.EqualValues(t, typeStr, factory.Type())
assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig())

_, err := factory.CreateTracesProcessor(context.Background(), ProcessorCreateSettings{}, &defaultCfg, nil)
assert.NoError(t, err)

_, err = factory.CreateMetricsProcessor(context.Background(), ProcessorCreateSettings{}, &defaultCfg, nil)
assert.NoError(t, err)

_, err = factory.CreateLogsProcessor(context.Background(), ProcessorCreateSettings{}, &defaultCfg, nil)
assert.NoError(t, err)
}

func TestMakeProcessorFactoryMap(t *testing.T) {
type testCase struct {
in []ProcessorFactory
out map[config.Type]ProcessorFactory
}
func createTracesProcessor(context.Context, ProcessorCreateSettings, config.Processor, consumer.Traces) (TracesProcessor, error) {
return nil, nil
}

testCases := []testCase{
{
in: []ProcessorFactory{
&TestProcessorFactory{name: "p1"},
&TestProcessorFactory{name: "p2"},
},
out: map[config.Type]ProcessorFactory{
"p1": &TestProcessorFactory{name: "p1"},
"p2": &TestProcessorFactory{name: "p2"},
},
},
{
in: []ProcessorFactory{
&TestProcessorFactory{name: "p1"},
&TestProcessorFactory{name: "p1"},
},
},
}
func createMetricsProcessor(context.Context, ProcessorCreateSettings, config.Processor, consumer.Metrics) (MetricsProcessor, error) {
return nil, nil
}

for _, c := range testCases {
out, err := MakeProcessorFactoryMap(c.in...)
if c.out == nil {
assert.Error(t, err)
continue
}
assert.NoError(t, err)
assert.Equal(t, c.out, out)
}
func createLogsProcessor(context.Context, ProcessorCreateSettings, config.Processor, consumer.Logs) (LogsProcessor, error) {
return nil, nil
}
Loading

0 comments on commit 0bf4516

Please sign in to comment.