Skip to content

Commit

Permalink
Add entry point to build graph-based pipelines (open-telemetry#7023)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Jan 26, 2023
1 parent c90afb4 commit 8d4b9d5
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 15 deletions.
24 changes: 24 additions & 0 deletions internal/sharedgate/sharedgate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package sharedgate exposes a featuregate that is used by multiple packages.
package sharedgate // import "go.opentelemetry.io/collector/internal/sharedgate"

import "go.opentelemetry.io/collector/featuregate"

var ConnectorsFeatureGate = featuregate.GlobalRegistry().MustRegister(
"enableConnectors",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("Enables 'connectors', a new type of component for transmitting signals between pipelines."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/issues/2336"))
12 changes: 3 additions & 9 deletions otelcol/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/sharedgate"
"go.opentelemetry.io/collector/service"
)

Expand All @@ -28,12 +28,6 @@ var (
errMissingReceivers = errors.New("no receiver configuration specified in config")
)

var connectorsFeatureGate = featuregate.GlobalRegistry().MustRegister(
"otelcol.enableConnectors",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("Enables 'connectors', a new type of component for transmitting signals between pipelines."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/issues/2336"))

// Config defines the configuration for the various elements of collector or agent.
type Config struct {
// Receivers is a map of ComponentID to Receivers.
Expand Down Expand Up @@ -114,8 +108,8 @@ func (cfg *Config) Validate() error {
}
}

if len(cfg.Connectors) != 0 && !connectorsFeatureGate.IsEnabled() {
return fmt.Errorf("connectors require feature gate: %s", connectorsFeatureGate.ID())
if len(cfg.Connectors) != 0 && !sharedgate.ConnectorsFeatureGate.IsEnabled() {
return fmt.Errorf("connectors require feature gate: %s", sharedgate.ConnectorsFeatureGate.ID())
}

if err := cfg.Service.Validate(); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions otelcol/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/sharedgate"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/telemetry"
)
Expand Down Expand Up @@ -255,9 +256,9 @@ func TestConfigValidate(t *testing.T) {
},
}

require.NoError(t, featuregate.GlobalRegistry().Set(connectorsFeatureGate.ID(), true))
require.NoError(t, featuregate.GlobalRegistry().Set(sharedgate.ConnectorsFeatureGate.ID(), true))
defer func() {
require.NoError(t, featuregate.GlobalRegistry().Set(connectorsFeatureGate.ID(), false))
require.NoError(t, featuregate.GlobalRegistry().Set(sharedgate.ConnectorsFeatureGate.ID(), false))
}()
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions otelcol/otelcoltest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/sharedgate"
"go.opentelemetry.io/collector/service"
)

Expand Down Expand Up @@ -75,9 +76,9 @@ func TestLoadConfigAndValidate(t *testing.T) {
factories, err := NopFactories()
assert.NoError(t, err)

require.NoError(t, featuregate.GlobalRegistry().Set("otelcol.enableConnectors", true))
require.NoError(t, featuregate.GlobalRegistry().Set(sharedgate.ConnectorsFeatureGate.ID(), true))
defer func() {
require.NoError(t, featuregate.GlobalRegistry().Set("otelcol.enableConnectors", false))
require.NoError(t, featuregate.GlobalRegistry().Set(sharedgate.ConnectorsFeatureGate.ID(), false))
}()
cfgValidate, errValidate := LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories)
require.NoError(t, errValidate)
Expand Down
6 changes: 6 additions & 0 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"errors"
"net/http"

"go.uber.org/multierr"
Expand All @@ -32,6 +33,11 @@ type pipelinesGraph struct {
componentGraph *simple.DirectedGraph
}

func buildPipelinesGraph(_ context.Context, _ pipelinesSettings) (pipelines, error) {
err := errors.New("not yet implemented")
return &pipelinesGraph{componentGraph: simple.NewDirectedGraph()}, err
}

func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/sharedgate"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/extensions"
Expand Down Expand Up @@ -194,10 +195,18 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings,
Receivers: set.Receivers,
Processors: set.Processors,
Exporters: set.Exporters,
Connectors: set.Connectors,
PipelineConfigs: cfg.Pipelines,
}
if srv.host.pipelines, err = buildPipelines(ctx, pSet); err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)

if sharedgate.ConnectorsFeatureGate.IsEnabled() {
if srv.host.pipelines, err = buildPipelinesGraph(ctx, pSet); err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)
}
} else {
if srv.host.pipelines, err = buildPipelines(ctx, pSet); err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)
}
}

if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" {
Expand Down

0 comments on commit 8d4b9d5

Please sign in to comment.