diff --git a/otelcol/collector.go b/otelcol/collector.go index 00282d81bd8..27f3b264570 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/otelcol/internal/grpclog" + "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service" ) @@ -158,8 +159,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { col.service, err = service.New(ctx, service.Settings{ BuildInfo: col.set.BuildInfo, - ReceiverFactories: col.set.Factories.Receivers, - ReceiverConfigs: cfg.Receivers, + ReceiverBuilder: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers), ProcessorFactories: col.set.Factories.Processors, ProcessorConfigs: cfg.Processors, ExporterFactories: col.set.Factories.Exporters, diff --git a/receiver/receiver.go b/receiver/receiver.go index 3de2673fc45..c98bf50a8d1 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -18,6 +18,8 @@ import ( "context" "fmt" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" ) @@ -195,3 +197,76 @@ func MakeFactoryMap(factories ...Factory) (map[component.Type]Factory, error) { } return fMap, nil } + +// Builder receiver is a helper struct that given a set of Configs and Factories helps with creating receivers. +type Builder struct { + cfgs map[component.ID]component.Config + factories map[component.Type]Factory +} + +func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder { + return &Builder{cfgs: cfgs, factories: factories} +} + +// CreateTraces creates a Traces receiver based on the settings and config. +func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("receiver %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.TracesReceiverStability()) + return f.CreateTracesReceiver(ctx, set, cfg, next) +} + +// CreateMetrics creates a Metrics receiver based on the settings and config. +func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Metrics, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("receiver %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.MetricsReceiverStability()) + return f.CreateMetricsReceiver(ctx, set, cfg, next) +} + +// CreateLogs creates a Logs receiver based on the settings and config. +func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Logs, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("receiver %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.LogsReceiverStability()) + return f.CreateLogsReceiver(ctx, set, cfg, next) +} + +func (b *Builder) Factory(componentType component.Type) component.Factory { + return b.factories[componentType] +} + +// logStabilityLevel logs the stability level of a component. The log level is set to info for +// undefined, unmaintained, deprecated and development. The log level is set to debug +// for alpha, beta and stable. +func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) { + if sl >= component.StabilityLevelAlpha { + logger.Debug(sl.LogMessage()) + } else { + logger.Info(sl.LogMessage()) + } +} diff --git a/service/host.go b/service/host.go index 1a722b07d32..2ef83587385 100644 --- a/service/host.go +++ b/service/host.go @@ -27,7 +27,7 @@ var _ component.Host = (*serviceHost)(nil) type serviceHost struct { asyncErrorChannel chan error - receiverFactories map[component.Type]receiver.Factory + receiverBuilder *receiver.Builder processorFactories map[component.Type]processor.Factory exporterFactories map[component.Type]exporter.Factory extensionFactories map[component.Type]extension.Factory @@ -48,7 +48,7 @@ func (host *serviceHost) ReportFatalError(err error) { func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: - return host.receiverFactories[componentType] + return host.receiverBuilder.Factory(componentType) case component.KindProcessor: return host.processorFactories[componentType] case component.KindExporter: diff --git a/service/pipelines.go b/service/pipelines.go index 945908fc1cd..3afa2f8ea50 100644 --- a/service/pipelines.go +++ b/service/pipelines.go @@ -185,11 +185,7 @@ type pipelinesSettings struct { Telemetry component.TelemetrySettings BuildInfo component.BuildInfo - // ReceiverFactories maps receiver type names in the config to the respective receiver.Factory. - ReceiverFactories map[component.Type]receiver.Factory - - // ReceiverConfigs is a map of component.ID to component.Config. - ReceiverConfigs map[component.ID]component.Config + ReceiverBuilder *receiver.Builder // ProcessorFactories maps processor type names in the config to the respective component.ProcessorFactory. ProcessorFactories map[component.Type]processor.Factory @@ -340,7 +336,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines BuildInfo: set.BuildInfo, } cSet.TelemetrySettings.Logger = receiverLogger(set.Telemetry.Logger, recvID, pipelineID.Type()) - recv, err := buildReceiver(ctx, cSet, set.ReceiverConfigs, set.ReceiverFactories, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) + recv, err := buildReceiver(ctx, cSet, set.ReceiverBuilder, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) if err != nil { return nil, err } @@ -500,53 +496,36 @@ func getProcessorStabilityLevel(factory processor.Factory, dt component.DataType func buildReceiver(ctx context.Context, set receiver.CreateSettings, - cfgs map[component.ID]component.Config, - factories map[component.Type]receiver.Factory, + builder *receiver.Builder, pipelineID component.ID, nexts []baseConsumer, -) (component.Component, error) { - cfg, existsCfg := cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("receiver %q is not configured", set.ID) - } - - factory, existsFactory := factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) - } - - components.LogStabilityLevel(set.TelemetrySettings.Logger, getReceiverStabilityLevel(factory, pipelineID.Type())) - - recv, err := createReceiver(ctx, set, cfg, pipelineID, nexts, factory) - if err != nil { - return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err) - } - - return recv, nil -} - -func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg component.Config, pipelineID component.ID, nexts []baseConsumer, factory receiver.Factory) (component.Component, error) { +) (recv component.Component, err error) { switch pipelineID.Type() { case component.DataTypeTraces: var consumers []consumer.Traces for _, next := range nexts { consumers = append(consumers, next.(consumer.Traces)) } - return factory.CreateTracesReceiver(ctx, set, cfg, fanoutconsumer.NewTraces(consumers)) + recv, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers)) case component.DataTypeMetrics: var consumers []consumer.Metrics for _, next := range nexts { consumers = append(consumers, next.(consumer.Metrics)) } - return factory.CreateMetricsReceiver(ctx, set, cfg, fanoutconsumer.NewMetrics(consumers)) + recv, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers)) case component.DataTypeLogs: var consumers []consumer.Logs for _, next := range nexts { consumers = append(consumers, next.(consumer.Logs)) } - return factory.CreateLogsReceiver(ctx, set, cfg, fanoutconsumer.NewLogs(consumers)) + recv, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers)) + default: + return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) } - return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) + if err != nil { + return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err) + } + return recv, nil } func receiverLogger(logger *zap.Logger, id component.ID, dt component.DataType) *zap.Logger { diff --git a/service/pipelines_test.go b/service/pipelines_test.go index 4842169d823..28fc840982a 100644 --- a/service/pipelines_test.go +++ b/service/pipelines_test.go @@ -184,13 +184,14 @@ func TestBuildPipelines(t *testing.T) { pipelines, err := buildPipelines(context.Background(), pipelinesSettings{ Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), - ReceiverFactories: map[component.Type]receiver.Factory{ - testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory, - }, - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), - component.NewIDWithName("examplereceiver", "1"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), - }, + ReceiverBuilder: receiver.NewBuilder( + map[component.ID]component.Config{ + component.NewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), + component.NewIDWithName("examplereceiver", "1"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), + }, + map[component.Type]receiver.Factory{ + testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory, + }), ProcessorFactories: map[component.Type]processor.Factory{ testcomponents.ExampleProcessorFactory.Type(): testcomponents.ExampleProcessorFactory, }, @@ -300,318 +301,297 @@ func TestBuildErrors(t *testing.T) { badExporterFactory := newBadExporterFactory() tests := []struct { - name string - settings pipelinesSettings - expected string + name string + ReceiverConfigs map[component.ID]component.Config + ProcessorConfigs map[component.ID]component.Config + ExporterConfigs map[component.ID]component.Config + PipelineConfigs map[component.ID]*ConfigServicePipeline + expected string }{ { name: "not_supported_exporter_logs", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, }, }, expected: "failed to create \"bf\" exporter, in pipeline \"logs\": telemetry type is not supported", }, { name: "not_supported_exporter_metrics", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, }, }, expected: "failed to create \"bf\" exporter, in pipeline \"metrics\": telemetry type is not supported", }, { name: "not_supported_exporter_traces", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, }, }, expected: "failed to create \"bf\" exporter, in pipeline \"traces\": telemetry type is not supported", }, { name: "not_supported_processor_logs", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" processor, in pipeline \"logs\": telemetry type is not supported", }, { name: "not_supported_processor_metrics", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" processor, in pipeline \"metrics\": telemetry type is not supported", }, { name: "not_supported_processor_traces", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" processor, in pipeline \"traces\": telemetry type is not supported", }, { name: "not_supported_receiver_logs", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" receiver, in pipeline \"logs\": telemetry type is not supported", }, { name: "not_supported_receiver_metrics", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" receiver, in pipeline \"metrics\": telemetry type is not supported", }, { name: "not_supported_receiver_traces", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" receiver, in pipeline \"traces\": telemetry type is not supported", }, { name: "unknown_exporter_config", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, }, }, expected: "exporter \"nop/1\" is not configured", }, { name: "unknown_exporter_factory", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("unknown"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("unknown")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("unknown"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("unknown")}, }, }, expected: "exporter factory not available for: \"unknown\"", }, { name: "unknown_processor_config", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "processor \"nop/1\" is not configured", }, { name: "unknown_processor_factory", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("unknown"): nopProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("unknown")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("unknown"): nopProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("unknown")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "processor factory not available for: \"unknown\"", }, { name: "unknown_receiver_config", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "receiver \"nop/1\" is not configured", + expected: "failed to create \"nop/1\" receiver, in pipeline \"logs\": receiver \"nop/1\" is not configured", }, { name: "unknown_receiver_factory", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("unknown"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*ConfigServicePipeline{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("unknown")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("unknown"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*ConfigServicePipeline{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("unknown")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "receiver factory not available for: \"unknown\"", + expected: "failed to create \"unknown\" receiver, in pipeline \"logs\": receiver factory not available for: \"unknown\"", }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - set := test.settings - set.BuildInfo = component.NewDefaultBuildInfo() - set.Telemetry = componenttest.NewNopTelemetrySettings() - set.ReceiverFactories = map[component.Type]receiver.Factory{ - nopReceiverFactory.Type(): nopReceiverFactory, - badReceiverFactory.Type(): badReceiverFactory, - } - set.ProcessorFactories = map[component.Type]processor.Factory{ - nopProcessorFactory.Type(): nopProcessorFactory, - badProcessorFactory.Type(): badProcessorFactory, - } - set.ExporterFactories = map[component.Type]exporter.Factory{ - nopExporterFactory.Type(): nopExporterFactory, - badExporterFactory.Type(): badExporterFactory, + set := pipelinesSettings{ + Telemetry: componenttest.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + ReceiverBuilder: receiver.NewBuilder( + test.ReceiverConfigs, + map[component.Type]receiver.Factory{ + nopReceiverFactory.Type(): nopReceiverFactory, + badReceiverFactory.Type(): badReceiverFactory, + }), + ProcessorFactories: map[component.Type]processor.Factory{ + nopProcessorFactory.Type(): nopProcessorFactory, + badProcessorFactory.Type(): badProcessorFactory, + }, + ProcessorConfigs: test.ProcessorConfigs, + ExporterFactories: map[component.Type]exporter.Factory{ + nopExporterFactory.Type(): nopExporterFactory, + badExporterFactory.Type(): badExporterFactory, + }, + ExporterConfigs: test.ExporterConfigs, + PipelineConfigs: test.PipelineConfigs, } _, err := buildPipelines(context.Background(), set) @@ -631,14 +611,15 @@ func TestFailToStartAndShutdown(t *testing.T) { set := pipelinesSettings{ Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), - ReceiverFactories: map[component.Type]receiver.Factory{ - nopReceiverFactory.Type(): nopReceiverFactory, - errReceiverFactory.Type(): errReceiverFactory, - }, - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), - component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), - }, + ReceiverBuilder: receiver.NewBuilder( + map[component.ID]component.Config{ + component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), + component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), + }, + map[component.Type]receiver.Factory{ + nopReceiverFactory.Type(): nopReceiverFactory, + errReceiverFactory.Type(): errReceiverFactory, + }), ProcessorFactories: map[component.Type]processor.Factory{ nopProcessorFactory.Type(): nopProcessorFactory, errProcessorFactory.Type(): errProcessorFactory, diff --git a/service/service.go b/service/service.go index 625dc810e33..6e2cadbd63c 100644 --- a/service/service.go +++ b/service/service.go @@ -40,11 +40,8 @@ type Settings struct { // BuildInfo provides collector start information. BuildInfo component.BuildInfo - // ReceiverFactories maps receiver type names in the config to the respective receiver.Factory. - ReceiverFactories map[component.Type]receiver.Factory - - // ReceiverConfigs is a map of component.ID to receivers component.Config. - ReceiverConfigs map[component.ID]component.Config + // ReceiverBuilder builder for receivers. + ReceiverBuilder *receiver.Builder // ProcessorFactories maps processor type names in the config to the respective component.ProcessorFactory. ProcessorFactories map[component.Type]processor.Factory @@ -91,7 +88,7 @@ func New(ctx context.Context, set Settings, cfg ConfigService) (*Service, error) srv := &Service{ buildInfo: set.BuildInfo, host: &serviceHost{ - receiverFactories: set.ReceiverFactories, + receiverBuilder: set.ReceiverBuilder, processorFactories: set.ProcessorFactories, exporterFactories: set.ExporterFactories, extensionFactories: set.ExtensionFactories, @@ -199,11 +196,10 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, pSet := pipelinesSettings{ Telemetry: srv.telemetrySettings, BuildInfo: srv.buildInfo, - ReceiverFactories: srv.host.receiverFactories, - ReceiverConfigs: set.ReceiverConfigs, - ProcessorFactories: srv.host.processorFactories, + ReceiverBuilder: set.ReceiverBuilder, + ProcessorFactories: set.ProcessorFactories, ProcessorConfigs: set.ProcessorConfigs, - ExporterFactories: srv.host.exporterFactories, + ExporterFactories: set.ExporterFactories, ExporterConfigs: set.ExporterConfigs, PipelineConfigs: cfg.Pipelines, } diff --git a/service/service_test.go b/service/service_test.go index 6db5e6c314c..495ab5de5f9 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -151,7 +151,7 @@ func TestServiceGetFactory(t *testing.T) { }) assert.Nil(t, srv.host.GetFactory(component.KindReceiver, "wrongtype")) - assert.Equal(t, set.ReceiverFactories["nop"], srv.host.GetFactory(component.KindReceiver, "nop")) + assert.Equal(t, set.ReceiverBuilder.Factory("nop"), srv.host.GetFactory(component.KindReceiver, "nop")) assert.Nil(t, srv.host.GetFactory(component.KindProcessor, "wrongtype")) assert.Equal(t, set.ProcessorFactories["nop"], srv.host.GetFactory(component.KindProcessor, "nop")) @@ -395,9 +395,10 @@ func assertZPages(t *testing.T, zpagesAddr string) { func newNopSettings() Settings { return Settings{ - BuildInfo: component.NewDefaultBuildInfo(), - ReceiverFactories: map[component.Type]receiver.Factory{"nop": receivertest.NewNopFactory()}, - ReceiverConfigs: map[component.ID]component.Config{component.NewID("nop"): receivertest.NewNopFactory().CreateDefaultConfig()}, + BuildInfo: component.NewDefaultBuildInfo(), + ReceiverBuilder: receiver.NewBuilder( + map[component.ID]component.Config{component.NewID("nop"): receivertest.NewNopFactory().CreateDefaultConfig()}, + map[component.Type]receiver.Factory{"nop": receivertest.NewNopFactory()}), ProcessorFactories: map[component.Type]processor.Factory{"nop": processortest.NewNopFactory()}, ProcessorConfigs: map[component.ID]component.Config{component.NewID("nop"): processortest.NewNopFactory().CreateDefaultConfig()}, ExporterFactories: map[component.Type]exporter.Factory{"nop": exportertest.NewNopFactory()},