Skip to content

Commit

Permalink
Move everything from processorhelper to component.
Browse files Browse the repository at this point in the history
Updates open-telemetry#4681

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Feb 20, 2022
1 parent 89e0d42 commit 5cfdf68
Show file tree
Hide file tree
Showing 14 changed files with 303 additions and 284 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
- Deprecated `extensionhelper.CreateDefaultConfig` in favour of `component.ExtensionDefaultConfigFunc`
- Deprecated `extensionhelper.CreateServiceExtension` in favour of `component.CreateExtensionFunc`
- Deprecated `extensionhelper.NewFactory` in favour of `component.NewExtensionFactory`
- Move helpers from processorhelper to component (#4805)
- Deprecated `processorhelper.CreateDefaultConfig` in favour of `component.ProcessorDefaultConfigFunc`
- Deprecated `processorhelper.WithTraces` in favour of `component.WithTracesProcessor`
- Deprecated `processorhelper.WithMetrics` in favour of `component.WithMetricsProcessor`
- Deprecated `processorhelper.WithLogs` in favour of `component.WithLogsProcessor`
- Deprecated `processorhelper.NewFactory` in favour of `component.NewExtensionFactory`

### 💡 Enhancements 💡

Expand Down
48 changes: 12 additions & 36 deletions component/componenttest/nop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/internalinterface"
)

// NewNopProcessorCreateSettings returns a new nop settings for Create*Processor functions.
Expand All @@ -36,57 +35,34 @@ type nopProcessorConfig struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// nopProcessorFactory is factory for nopProcessor.
type nopProcessorFactory struct {
internalinterface.BaseInternal
}

var nopProcessorFactoryInstance = &nopProcessorFactory{}
var nopProcessorFactory = component.NewProcessorFactory(
"nop",
createDefaultConfig,
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor),
)

// NewNopProcessorFactory returns a component.ProcessorFactory that constructs nop processors.
func NewNopProcessorFactory() component.ProcessorFactory {
return nopProcessorFactoryInstance
}

// Type gets the type of the Processor config created by this factory.
func (f *nopProcessorFactory) Type() config.Type {
return "nop"
return nopProcessorFactory
}

// CreateDefaultConfig creates the default configuration for the Processor.
func (f *nopProcessorFactory) CreateDefaultConfig() config.Processor {
func createDefaultConfig() config.Processor {
return &nopProcessorConfig{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("nop")),
}
}

// CreateTracesProcessor implements component.ProcessorFactory interface.
func (f *nopProcessorFactory) CreateTracesProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
_ config.Processor,
_ consumer.Traces,
) (component.TracesProcessor, error) {
func createTracesProcessor(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Traces) (component.TracesProcessor, error) {
return nopProcessorInstance, nil
}

// CreateMetricsProcessor implements component.ProcessorFactory interface.
func (f *nopProcessorFactory) CreateMetricsProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
_ config.Processor,
_ consumer.Metrics,
) (component.MetricsProcessor, error) {
func createMetricsProcessor(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Metrics) (component.MetricsProcessor, error) {
return nopProcessorInstance, nil
}

// CreateLogsProcessor implements component.ProcessorFactory interface.
func (f *nopProcessorFactory) CreateLogsProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
_ config.Processor,
_ consumer.Logs,
) (component.LogsProcessor, error) {
func createLogsProcessor(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Logs) (component.LogsProcessor, error) {
return nopProcessorInstance, nil
}

Expand Down
99 changes: 99 additions & 0 deletions component/factories_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/config"
)

func TestMakeProcessorFactoryMap(t *testing.T) {
type testCase struct {
name string
in []ProcessorFactory
out map[config.Type]ProcessorFactory
}

p1 := NewProcessorFactory("p1", nil)
p2 := NewProcessorFactory("p2", nil)
testCases := []testCase{
{
name: "different names",
in: []ProcessorFactory{p1, p2},
out: map[config.Type]ProcessorFactory{
p1.Type(): p1,
p2.Type(): p2,
},
},
{
name: "same name",
in: []ProcessorFactory{p1, p2, NewProcessorFactory("p1", nil)},
},
}

for i := range testCases {
tt := testCases[i]
t.Run(tt.name, func(t *testing.T) {
out, err := MakeProcessorFactoryMap(tt.in...)
if tt.out == nil {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, tt.out, out)
})
}
}

func TestMakeExtensionFactoryMap(t *testing.T) {
type testCase struct {
name string
in []ExtensionFactory
out map[config.Type]ExtensionFactory
}

p1 := NewExtensionFactory("p1", nil, nil)
p2 := NewExtensionFactory("p2", nil, nil)
testCases := []testCase{
{
name: "different names",
in: []ExtensionFactory{p1, p2},
out: map[config.Type]ExtensionFactory{
p1.Type(): p1,
p2.Type(): p2,
},
},
{
name: "same name",
in: []ExtensionFactory{p1, p2, NewExtensionFactory("p1", nil, nil)},
},
}

for i := range testCases {
tt := testCases[i]
t.Run(tt.name, func(t *testing.T) {
out, err := MakeExtensionFactoryMap(tt.in...)
if tt.out == nil {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, tt.out, out)
})
}
}
110 changes: 108 additions & 2 deletions component/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component"
import (
"context"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/internalinterface"
Expand Down Expand Up @@ -54,8 +55,8 @@ type ProcessorCreateSettings struct {
BuildInfo BuildInfo
}

// ProcessorFactory is factory interface for processors. This is the
// new factory type that can create new style processors.
// ProcessorFactory is processorFactory interface for processors. This is the
// new processorFactory type that can create new style processors.
//
// This interface cannot be directly implemented. Implementations must
// use the processorhelper.NewFactory to implement it.
Expand Down Expand Up @@ -102,3 +103,108 @@ type ProcessorFactory interface {
nextConsumer consumer.Logs,
) (LogsProcessor, error)
}

// ProcessorFactoryOption apply changes to ProcessorOptions.
type ProcessorFactoryOption func(o *processorFactory)

// ProcessorCreateDefaultConfigFunc is the equivalent of ProcessorFactory.CreateDefaultConfig()
type ProcessorCreateDefaultConfigFunc func() config.Processor

// CreateDefaultConfig implements ExtensionFactory.CreateDefaultConfig()
func (f ProcessorCreateDefaultConfigFunc) CreateDefaultConfig() config.Processor {
return f()
}

// CreateTracesProcessorFunc is the equivalent of ProcessorFactory.CreateTracesProcessor()
type CreateTracesProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Traces) (TracesProcessor, error)

// CreateTracesProcessor creates a TracesProcessor based on this config.
func (f CreateTracesProcessorFunc) CreateTracesProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces) (TracesProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

// CreateMetricsProcessorFunc is the equivalent of ProcessorFactory.CreateMetricsProcessor()
type CreateMetricsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Metrics) (MetricsProcessor, error)

// CreateMetricsProcessor creates a MetricsProcessor based on this config.
func (f CreateMetricsProcessorFunc) CreateMetricsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (MetricsProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

// CreateLogsProcessorFunc is the equivalent of ProcessorFactory.CreateLogsProcessor()
type CreateLogsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Logs) (LogsProcessor, error)

// CreateLogsProcessor creates a LogsProcessor based on this config.
func (f CreateLogsProcessorFunc) CreateLogsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (LogsProcessor, error) {
if f == nil {
return nil, componenterror.ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}

type processorFactory struct {
internalinterface.BaseInternal
cfgType config.Type
ProcessorCreateDefaultConfigFunc
CreateTracesProcessorFunc
CreateMetricsProcessorFunc
CreateLogsProcessorFunc
}

// WithTracesProcessor overrides the default "error not supported" implementation for CreateTracesProcessor.
func WithTracesProcessor(createTracesProcessor CreateTracesProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateTracesProcessorFunc = createTracesProcessor
}
}

// WithMetricsProcessor overrides the default "error not supported" implementation for CreateMetricsProcessor.
func WithMetricsProcessor(createMetricsProcessor CreateMetricsProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateMetricsProcessorFunc = createMetricsProcessor
}
}

// WithLogsProcessor overrides the default "error not supported" implementation for CreateLogsProcessor.
func WithLogsProcessor(createLogsProcessor CreateLogsProcessorFunc) ProcessorFactoryOption {
return func(o *processorFactory) {
o.CreateLogsProcessorFunc = createLogsProcessor
}
}

// NewProcessorFactory returns a ProcessorFactory.
func NewProcessorFactory(cfgType config.Type, createDefaultConfig ProcessorCreateDefaultConfigFunc, options ...ProcessorFactoryOption) ProcessorFactory {
f := &processorFactory{
cfgType: cfgType,
ProcessorCreateDefaultConfigFunc: createDefaultConfig,
}
for _, opt := range options {
opt(f)
}
return f
}

// Type gets the type of the Processor config created by this processorFactory.
func (f *processorFactory) Type() config.Type {
return f.cfgType
}
Loading

0 comments on commit 5cfdf68

Please sign in to comment.