From 8d4b9d563573c88384d5580cebf1b34aa21791b4 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Thu, 26 Jan 2023 18:43:15 -0500 Subject: [PATCH] Add entry point to build graph-based pipelines (#7023) --- internal/sharedgate/sharedgate.go | 24 ++++++++++++++++++++++++ otelcol/config.go | 12 +++--------- otelcol/config_test.go | 5 +++-- otelcol/otelcoltest/config_test.go | 5 +++-- service/graph.go | 6 ++++++ service/service.go | 13 +++++++++++-- 6 files changed, 50 insertions(+), 15 deletions(-) create mode 100644 internal/sharedgate/sharedgate.go diff --git a/internal/sharedgate/sharedgate.go b/internal/sharedgate/sharedgate.go new file mode 100644 index 000000000000..9402e2da0b5d --- /dev/null +++ b/internal/sharedgate/sharedgate.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package sharedgate exposes a featuregate that is used by multiple packages. +package sharedgate // import "go.opentelemetry.io/collector/internal/sharedgate" + +import "go.opentelemetry.io/collector/featuregate" + +var ConnectorsFeatureGate = featuregate.GlobalRegistry().MustRegister( + "enableConnectors", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("Enables 'connectors', a new type of component for transmitting signals between pipelines."), + featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/issues/2336")) diff --git a/otelcol/config.go b/otelcol/config.go index e3f533c6c964..18a45f129e52 100644 --- a/otelcol/config.go +++ b/otelcol/config.go @@ -19,7 +19,7 @@ import ( "fmt" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/sharedgate" "go.opentelemetry.io/collector/service" ) @@ -28,12 +28,6 @@ var ( errMissingReceivers = errors.New("no receiver configuration specified in config") ) -var connectorsFeatureGate = featuregate.GlobalRegistry().MustRegister( - "otelcol.enableConnectors", - featuregate.StageAlpha, - featuregate.WithRegisterDescription("Enables 'connectors', a new type of component for transmitting signals between pipelines."), - featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/issues/2336")) - // Config defines the configuration for the various elements of collector or agent. type Config struct { // Receivers is a map of ComponentID to Receivers. @@ -114,8 +108,8 @@ func (cfg *Config) Validate() error { } } - if len(cfg.Connectors) != 0 && !connectorsFeatureGate.IsEnabled() { - return fmt.Errorf("connectors require feature gate: %s", connectorsFeatureGate.ID()) + if len(cfg.Connectors) != 0 && !sharedgate.ConnectorsFeatureGate.IsEnabled() { + return fmt.Errorf("connectors require feature gate: %s", sharedgate.ConnectorsFeatureGate.ID()) } if err := cfg.Service.Validate(); err != nil { diff --git a/otelcol/config_test.go b/otelcol/config_test.go index e4c00e2bd888..584b23229d2b 100644 --- a/otelcol/config_test.go +++ b/otelcol/config_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/sharedgate" "go.opentelemetry.io/collector/service" "go.opentelemetry.io/collector/service/telemetry" ) @@ -255,9 +256,9 @@ func TestConfigValidate(t *testing.T) { }, } - require.NoError(t, featuregate.GlobalRegistry().Set(connectorsFeatureGate.ID(), true)) + require.NoError(t, featuregate.GlobalRegistry().Set(sharedgate.ConnectorsFeatureGate.ID(), true)) defer func() { - require.NoError(t, featuregate.GlobalRegistry().Set(connectorsFeatureGate.ID(), false)) + require.NoError(t, featuregate.GlobalRegistry().Set(sharedgate.ConnectorsFeatureGate.ID(), false)) }() for _, test := range testCases { t.Run(test.name, func(t *testing.T) { diff --git a/otelcol/otelcoltest/config_test.go b/otelcol/otelcoltest/config_test.go index 09ee83c3af21..2228ff953c5a 100644 --- a/otelcol/otelcoltest/config_test.go +++ b/otelcol/otelcoltest/config_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/sharedgate" "go.opentelemetry.io/collector/service" ) @@ -75,9 +76,9 @@ func TestLoadConfigAndValidate(t *testing.T) { factories, err := NopFactories() assert.NoError(t, err) - require.NoError(t, featuregate.GlobalRegistry().Set("otelcol.enableConnectors", true)) + require.NoError(t, featuregate.GlobalRegistry().Set(sharedgate.ConnectorsFeatureGate.ID(), true)) defer func() { - require.NoError(t, featuregate.GlobalRegistry().Set("otelcol.enableConnectors", false)) + require.NoError(t, featuregate.GlobalRegistry().Set(sharedgate.ConnectorsFeatureGate.ID(), false)) }() cfgValidate, errValidate := LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories) require.NoError(t, errValidate) diff --git a/service/graph.go b/service/graph.go index a0014cd1c78e..5c84efc6671d 100644 --- a/service/graph.go +++ b/service/graph.go @@ -16,6 +16,7 @@ package service // import "go.opentelemetry.io/collector/service" import ( "context" + "errors" "net/http" "go.uber.org/multierr" @@ -32,6 +33,11 @@ type pipelinesGraph struct { componentGraph *simple.DirectedGraph } +func buildPipelinesGraph(_ context.Context, _ pipelinesSettings) (pipelines, error) { + err := errors.New("not yet implemented") + return &pipelinesGraph{componentGraph: simple.NewDirectedGraph()}, err +} + func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) error { nodes, err := topo.Sort(g.componentGraph) if err != nil { diff --git a/service/service.go b/service/service.go index c239cc0b794a..c63c8fff64a2 100644 --- a/service/service.go +++ b/service/service.go @@ -29,6 +29,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/sharedgate" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/extensions" @@ -194,10 +195,18 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, Receivers: set.Receivers, Processors: set.Processors, Exporters: set.Exporters, + Connectors: set.Connectors, PipelineConfigs: cfg.Pipelines, } - if srv.host.pipelines, err = buildPipelines(ctx, pSet); err != nil { - return fmt.Errorf("cannot build pipelines: %w", err) + + if sharedgate.ConnectorsFeatureGate.IsEnabled() { + if srv.host.pipelines, err = buildPipelinesGraph(ctx, pSet); err != nil { + return fmt.Errorf("cannot build pipelines: %w", err) + } + } else { + if srv.host.pipelines, err = buildPipelines(ctx, pSet); err != nil { + return fmt.Errorf("cannot build pipelines: %w", err) + } } if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" {