Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix all TraceProcessor usages, use TracesProcessor #2935

Merged
merged 2 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Remove `config.NewViper`, users should use `config.NewParser` (#2917)
- Remove `testutil.WaitFor`, use `testify.Eventually` helper if needed (#2920)
- Remove testutil.WaitForPort, users can use testify.Eventually (#2926)
- Rename `processorhelper.NewTraceProcessor` to `processorhelper.NewTracesProcessor` (#2935)

## 💡 Enhancements 💡

Expand Down
4 changes: 2 additions & 2 deletions component/componenttest/shutdown_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"go.opentelemetry.io/collector/internal/testdata"
)

func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg config.Processor) {
func verifyTracesProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg config.Processor) {
// Create a processor and output its produce to a sink.
nextSink := new(consumertest.TracesSink)
processor, err := factory.CreateTracesProcessor(
Expand Down Expand Up @@ -64,7 +64,7 @@ func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory compon

// VerifyProcessorShutdown verifies the processor doesn't produce telemetry data after shutdown.
func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg config.Processor) {
verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg)
verifyTracesProcessorDoesntProduceAfterShutdown(t, factory, cfg)
// TODO: add metrics and logs verification.
// TODO: add other shutdown verifications.
}
2 changes: 1 addition & 1 deletion component/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type ProcessorFactory interface {
// tests of any implementation of the Factory interface.
CreateDefaultConfig() config.Processor

// CreateTraceProcessor creates a trace processor based on this config.
// CreateTracesProcessor creates a trace processor based on this config.
// If the processor type does not support tracing or if the config is not valid
// error will be returned instead.
CreateTracesProcessor(
Expand Down
4 changes: 2 additions & 2 deletions consumer/fanoutconsumer/cloningconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"go.opentelemetry.io/collector/internal/testdata"
)

func TestTraceProcessorCloningNotMultiplexing(t *testing.T) {
func TestTracesProcessorCloningNotMultiplexing(t *testing.T) {
nop := consumertest.NewNop()
tfc := NewTracesCloning([]consumer.Traces{nop})
assert.Same(t, nop, tfc)
}

func TestTraceProcessorCloningMultiplexing(t *testing.T) {
func TestTracesProcessorCloningMultiplexing(t *testing.T) {
processors := make([]consumer.Traces, 3)
for i := range processors {
processors[i] = new(consumertest.TracesSink)
Expand Down
2 changes: 1 addition & 1 deletion consumer/fanoutconsumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestTracesProcessorMultiplexing(t *testing.T) {
}
}

func TestTraceProcessorWhenOneErrors(t *testing.T) {
func TestTracesProcessorWhenOneErrors(t *testing.T) {
processors := make([]consumer.Traces, 3)
for i := range processors {
processors[i] = new(consumertest.TracesSink)
Expand Down
2 changes: 1 addition & 1 deletion processor/attributesprocessor/attributes_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type spanAttributesProcessor struct {
exclude filterspan.Matcher
}

// newTraceProcessor returns a processor that modifies attributes of a span.
// newTracesProcessor returns a processor that modifies attributes of a span.
// To construct the attributes processors, the use of the factory methods are required
// in order to validate the inputs.
func newSpanAttributesProcessor(attrProc *processorhelper.AttrProc, include, exclude filterspan.Matcher) *spanAttributesProcessor {
Expand Down
6 changes: 3 additions & 3 deletions processor/attributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor),
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithLogs(createLogProcessor))
}

Expand All @@ -49,7 +49,7 @@ func createDefaultConfig() config.Processor {
}
}

func createTraceProcessor(
func createTracesProcessor(
_ context.Context,
_ component.ProcessorCreateParams,
cfg config.Processor,
Expand All @@ -72,7 +72,7 @@ func createTraceProcessor(
return nil, err
}

return processorhelper.NewTraceProcessor(
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
newSpanAttributesProcessor(attrProc, include, exclude),
Expand Down
6 changes: 3 additions & 3 deletions processor/attributesprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestFactoryCreateTraceProcessor_EmptyActions(t *testing.T) {
func TestFactoryCreateTracesProcessor_EmptyActions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
ap, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
assert.Error(t, err)
assert.Nil(t, ap)
}

func TestFactoryCreateTraceProcessor_InvalidActions(t *testing.T) {
func TestFactoryCreateTracesProcessor_InvalidActions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
Expand All @@ -64,7 +64,7 @@ func TestFactoryCreateTraceProcessor_InvalidActions(t *testing.T) {
assert.Nil(t, ap)
}

func TestFactoryCreateTraceProcessor(t *testing.T) {
func TestFactoryCreateTracesProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
Expand Down
4 changes: 2 additions & 2 deletions processor/batchprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor),
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
}
Expand All @@ -51,7 +51,7 @@ func createDefaultConfig() config.Processor {
}
}

func createTraceProcessor(
func createTracesProcessor(
_ context.Context,
params component.ProcessorCreateParams,
cfg config.Processor,
Expand Down
6 changes: 3 additions & 3 deletions processor/memorylimiter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor),
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
}
Expand All @@ -48,7 +48,7 @@ func createDefaultConfig() config.Processor {
}
}

func createTraceProcessor(
func createTracesProcessor(
_ context.Context,
params component.ProcessorCreateParams,
cfg config.Processor,
Expand All @@ -58,7 +58,7 @@ func createTraceProcessor(
if err != nil {
return nil, err
}
return processorhelper.NewTraceProcessor(
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
ml,
Expand Down
2 changes: 1 addition & 1 deletion processor/memorylimiter/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
}),
logger: zap.NewNop(),
}
tp, err := processorhelper.NewTraceProcessor(
tp, err := processorhelper.NewTracesProcessor(
&Config{
ProcessorSettings: config.NewProcessorSettings(typeStr),
},
Expand Down
8 changes: 4 additions & 4 deletions processor/probabilisticsamplerprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor))
processorhelper.WithTraces(createTracesProcessor))
}

func createDefaultConfig() config.Processor {
Expand All @@ -42,13 +42,13 @@ func createDefaultConfig() config.Processor {
}
}

// CreateTracesProcessor creates a trace processor based on this config.
func createTraceProcessor(
// createTracesProcessor creates a trace processor based on this config.
func createTracesProcessor(
_ context.Context,
_ component.ProcessorCreateParams,
cfg config.Processor,
nextConsumer consumer.Traces,
) (component.TracesProcessor, error) {
oCfg := cfg.(*Config)
return newTraceProcessor(nextConsumer, *oCfg)
return newTracesProcessor(nextConsumer, *oCfg)
}
2 changes: 1 addition & 1 deletion processor/probabilisticsamplerprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestCreateDefaultConfig(t *testing.T) {
func TestCreateProcessor(t *testing.T) {
cfg := createDefaultConfig()
params := component.ProcessorCreateParams{Logger: zap.NewNop()}
tp, err := createTraceProcessor(context.Background(), params, cfg, consumertest.NewNop())
tp, err := createTracesProcessor(context.Background(), params, cfg, consumertest.NewNop())
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type tracesamplerprocessor struct {
hashSeed uint32
}

// newTraceProcessor returns a processor.TracesProcessor that will perform head sampling according to the given
// newTracesProcessor returns a processor.TracesProcessor that will perform head sampling according to the given
// configuration.
func newTraceProcessor(nextConsumer consumer.Traces, cfg Config) (component.TracesProcessor, error) {
func newTracesProcessor(nextConsumer consumer.Traces, cfg Config) (component.TracesProcessor, error) {
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)

func TestNewTraceProcessor(t *testing.T) {
func TestNewTracesProcessor(t *testing.T) {
tests := []struct {
name string
nextConsumer consumer.Traces
Expand Down Expand Up @@ -72,13 +72,13 @@ func TestNewTraceProcessor(t *testing.T) {
// The truncation below with uint32 cannot be defined at initialization (compiler error), performing it at runtime.
tt.want.(*tracesamplerprocessor).scaledSamplingRate = uint32(tt.cfg.SamplingPercentage * percentageScaleFactor)
}
got, err := newTraceProcessor(tt.nextConsumer, tt.cfg)
got, err := newTracesProcessor(tt.nextConsumer, tt.cfg)
if (err != nil) != tt.wantErr {
t.Errorf("newTraceProcessor() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("newTracesProcessor() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("newTraceProcessor() = %v, want %v", got, tt.want)
t.Errorf("newTracesProcessor() = %v, want %v", got, tt.want)
}
})
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
tsp, err := newTraceProcessor(sink, tt.cfg)
tsp, err := newTracesProcessor(sink, tt.cfg)
if err != nil {
t.Errorf("error when creating tracesamplerprocessor: %v", err)
return
Expand Down Expand Up @@ -203,7 +203,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
tsp, err := newTraceProcessor(sink, tt.cfg)
tsp, err := newTracesProcessor(sink, tt.cfg)
if err != nil {
t.Errorf("error when creating tracesamplerprocessor: %v", err)
return
Expand Down Expand Up @@ -316,7 +316,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
tsp, err := newTraceProcessor(sink, tt.cfg)
tsp, err := newTracesProcessor(sink, tt.cfg)
require.NoError(t, err)

err = tsp.ConsumeTraces(context.Background(), tt.td)
Expand Down
14 changes: 7 additions & 7 deletions processor/processorhelper/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type FactoryOption func(o *factory)
// CreateDefaultConfig is the equivalent of component.ProcessorFactory.CreateDefaultConfig()
type CreateDefaultConfig func() config.Processor

// CreateTraceProcessor is the equivalent of component.ProcessorFactory.CreateTracesProcessor()
type CreateTraceProcessor func(context.Context, component.ProcessorCreateParams, config.Processor, consumer.Traces) (component.TracesProcessor, error)
// CreateTracesProcessor is the equivalent of component.ProcessorFactory.CreateTracesProcessor()
type CreateTracesProcessor func(context.Context, component.ProcessorCreateParams, config.Processor, consumer.Traces) (component.TracesProcessor, error)

// CreateMetricsProcessor is the equivalent of component.ProcessorFactory.CreateMetricsProcessor()
type CreateMetricsProcessor func(context.Context, component.ProcessorCreateParams, config.Processor, consumer.Metrics) (component.MetricsProcessor, error)
Expand All @@ -41,15 +41,15 @@ type factory struct {
component.BaseProcessorFactory
cfgType config.Type
createDefaultConfig CreateDefaultConfig
createTracesProcessor CreateTraceProcessor
createTracesProcessor CreateTracesProcessor
createMetricsProcessor CreateMetricsProcessor
createLogsProcessor CreateLogsProcessor
}

// WithTraces overrides the default "error not supported" implementation for CreateTraceProcessor.
func WithTraces(createTraceProcessor CreateTraceProcessor) FactoryOption {
// WithTraces overrides the default "error not supported" implementation for CreateTracesProcessor.
func WithTraces(createTracesProcessor CreateTracesProcessor) FactoryOption {
return func(o *factory) {
o.createTracesProcessor = createTraceProcessor
o.createTracesProcessor = createTracesProcessor
}
}

Expand Down Expand Up @@ -92,7 +92,7 @@ func (f *factory) CreateDefaultConfig() config.Processor {
return f.createDefaultConfig()
}

// CreateTraceProcessor creates a component.TracesProcessor based on this config.
// CreateTracesProcessor creates a component.TracesProcessor based on this config.
func (f *factory) CreateTracesProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
Expand Down
4 changes: 2 additions & 2 deletions processor/processorhelper/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestNewMetrics_WithConstructors(t *testing.T) {
factory := NewFactory(
typeStr,
defaultConfig,
WithTraces(createTraceProcessor),
WithTraces(createTracesProcessor),
WithMetrics(createMetricsProcessor),
WithLogs(createLogsProcessor))
assert.EqualValues(t, typeStr, factory.Type())
Expand All @@ -70,7 +70,7 @@ func defaultConfig() config.Processor {
return defaultCfg
}

func createTraceProcessor(context.Context, component.ProcessorCreateParams, config.Processor, consumer.Traces) (component.TracesProcessor, error) {
func createTracesProcessor(context.Context, component.ProcessorCreateParams, config.Processor, consumer.Traces) (component.TracesProcessor, error) {
return nil, nil
}

Expand Down
8 changes: 4 additions & 4 deletions processor/processorhelper/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ import (
// to stop further processing without propagating an error back up the pipeline to logs.
var ErrSkipProcessingData = errors.New("sentinel error to skip processing data from the remainder of the pipeline")

// TProcessor is a helper interface that allows avoiding implementing all functions in TracesProcessor by using NewTraceProcessor.
// TProcessor is a helper interface that allows avoiding implementing all functions in TracesProcessor by using NewTracesProcessor.
type TProcessor interface {
// ProcessTraces is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessTraces(context.Context, pdata.Traces) (pdata.Traces, error)
}

// MProcessor is a helper interface that allows avoiding implementing all functions in MetricsProcessor by using NewTraceProcessor.
// MProcessor is a helper interface that allows avoiding implementing all functions in MetricsProcessor by using NewTracesProcessor.
type MProcessor interface {
// ProcessMetrics is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
Expand Down Expand Up @@ -146,9 +146,9 @@ func (tp *tracesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) e
return tp.nextConsumer.ConsumeTraces(ctx, td)
}

// NewTraceProcessor creates a TracesProcessor that ensure context propagation and the right tags are set.
// NewTracesProcessor creates a TracesProcessor that ensure context propagation and the right tags are set.
// TODO: Add observability metrics support
func NewTraceProcessor(
func NewTracesProcessor(
config config.Processor,
nextConsumer consumer.Traces,
processor TProcessor,
Expand Down
8 changes: 4 additions & 4 deletions processor/processorhelper/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestWithOptions(t *testing.T) {
}

func TestNewTraceExporter(t *testing.T) {
me, err := NewTraceProcessor(testCfg, consumertest.NewNop(), newTestTProcessor(nil))
me, err := NewTracesProcessor(testCfg, consumertest.NewNop(), newTestTProcessor(nil))
require.NoError(t, err)

assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -66,16 +66,16 @@ func TestNewTraceExporter(t *testing.T) {
}

func TestNewTraceExporter_NilRequiredFields(t *testing.T) {
_, err := NewTraceProcessor(testCfg, consumertest.NewNop(), nil)
_, err := NewTracesProcessor(testCfg, consumertest.NewNop(), nil)
assert.Error(t, err)

_, err = NewTraceProcessor(testCfg, nil, newTestTProcessor(nil))
_, err = NewTracesProcessor(testCfg, nil, newTestTProcessor(nil))
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
}

func TestNewTraceExporter_ProcessTraceError(t *testing.T) {
want := errors.New("my_error")
me, err := NewTraceProcessor(testCfg, consumertest.NewNop(), newTestTProcessor(want))
me, err := NewTracesProcessor(testCfg, consumertest.NewNop(), newTestTProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, me.ConsumeTraces(context.Background(), testdata.GenerateTraceDataEmpty()))
}
Expand Down
Loading