From 44b6bf5ceded822fad3ecd55251f4f5aeb98247e Mon Sep 17 00:00:00 2001 From: Mrod1598 Date: Tue, 15 Jun 2021 12:17:43 -0400 Subject: [PATCH] Fix default output (#186) * Fix defaultoutput * Implement PR feedback --- agent/builder.go | 4 +++ agent/builder_test.go | 63 ++++++++++++++++++++++++++++++++++++++++--- pipeline/config.go | 17 +++++------- plugin/config.go | 2 +- plugin/config_test.go | 6 ++--- 5 files changed, 74 insertions(+), 18 deletions(-) diff --git a/agent/builder.go b/agent/builder.go index e596b34f..fa9a8e17 100644 --- a/agent/builder.go +++ b/agent/builder.go @@ -87,6 +87,10 @@ func (b *LogAgentBuilder) Build() (*LogAgent, error) { b.config = cfgs } + if len(b.config.Pipeline) == 0 { + return nil, errors.NewError("empty pipeline not allowed", "") + } + sampledLogger := b.logger.Desugar().WithOptions( zap.WrapCore(func(core zapcore.Core) zapcore.Core { return zapcore.NewSamplerWithOptions(core, time.Second, 1, 10000) diff --git a/agent/builder_test.go b/agent/builder_test.go index 685c96e6..9e70c968 100644 --- a/agent/builder_test.go +++ b/agent/builder_test.go @@ -20,11 +20,41 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-log-collection/operator" + "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/noop" "github.com/open-telemetry/opentelemetry-log-collection/testutil" ) func TestBuildAgentSuccess(t *testing.T) { - mockCfg := Config{} + mockCfg := Config{ + []operator.Config{ + { + Builder: noop.NewNoopOperatorConfig("noop"), + }, + }, + } + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + + agent, err := NewBuilder(mockLogger). + WithConfig(&mockCfg). + WithPluginDir(mockPluginDir). + Build() + require.NoError(t, err) + require.Equal(t, mockLogger, agent.SugaredLogger) +} + +func TestBuildAgentDefaultOperator(t *testing.T) { + mockCfg := Config{ + []operator.Config{ + { + Builder: noop.NewNoopOperatorConfig("noop"), + }, + { + Builder: noop.NewNoopOperatorConfig("noop1"), + }, + }, + } mockLogger := zap.NewNop().Sugar() mockPluginDir := "/some/path/plugins" mockOutput := testutil.NewFakeOutput(t) @@ -36,6 +66,30 @@ func TestBuildAgentSuccess(t *testing.T) { Build() require.NoError(t, err) require.Equal(t, mockLogger, agent.SugaredLogger) + + ops := agent.pipeline.Operators() + require.Equal(t, 3, len(ops)) + + exists := make(map[string]bool) + + for _, op := range ops { + switch op.ID() { + case "$.noop": + require.Equal(t, 1, len(op.GetOutputIDs())) + require.Equal(t, "$.noop1", op.GetOutputIDs()[0]) + exists["$.noop"] = true + case "$.noop1": + require.Equal(t, 1, len(op.GetOutputIDs())) + require.Equal(t, "$.fake", op.GetOutputIDs()[0]) + exists["$.noop1"] = true + case "$.fake": + require.Equal(t, 0, len(op.GetOutputIDs())) + exists["$.fake"] = true + } + } + require.True(t, exists["$.noop"]) + require.True(t, exists["$.noop1"]) + require.True(t, exists["$.fake"]) } func TestBuildAgentFailureOnPluginRegistry(t *testing.T) { @@ -43,13 +97,14 @@ func TestBuildAgentFailureOnPluginRegistry(t *testing.T) { mockLogger := zap.NewNop().Sugar() mockPluginDir := "[]" mockOutput := testutil.NewFakeOutput(t) - - _, err := NewBuilder(mockLogger). + agent, err := NewBuilder(mockLogger). WithConfig(&mockCfg). WithPluginDir(mockPluginDir). WithDefaultOutput(mockOutput). Build() - require.NoError(t, err) + require.Error(t, err) + require.Contains(t, err.Error(), "empty pipeline not allowed") + require.Nil(t, agent) } func TestBuildAgentFailureNoConfigOrGlobs(t *testing.T) { diff --git a/pipeline/config.go b/pipeline/config.go index 18350e64..f6a108ab 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -22,7 +22,7 @@ import ( type Config []operator.Config // BuildOperators builds the operators from the list of configs into operators. -func (c Config) BuildOperators(bc operator.BuildContext) ([]operator.Operator, error) { +func (c Config) BuildOperators(bc operator.BuildContext, defaultOperator operator.Operator) ([]operator.Operator, error) { // buildsMulti's key represents an operator's ID that builds multiple operators, e.g. Plugins. // the value is the plugin's first operator's ID. buildsMulti := make(map[string]string) @@ -38,6 +38,11 @@ func (c Config) BuildOperators(bc operator.BuildContext) ([]operator.Operator, e } operators = append(operators, op...) } + + if defaultOperator != nil && operators[len(operators)-1].CanOutput() { + operators = append(operators, defaultOperator) + } + if err := SetOutputIDs(operators, buildsMulti); err != nil { return nil, err } @@ -47,19 +52,11 @@ func (c Config) BuildOperators(bc operator.BuildContext) ([]operator.Operator, e // BuildPipeline will build a pipeline from the config. func (c Config) BuildPipeline(bc operator.BuildContext, defaultOperator operator.Operator) (*DirectedPipeline, error) { - if defaultOperator != nil { - bc.DefaultOutputIDs = []string{defaultOperator.ID()} - } - - operators, err := c.BuildOperators(bc) + operators, err := c.BuildOperators(bc, defaultOperator) if err != nil { return nil, err } - if defaultOperator != nil { - operators = append(operators, defaultOperator) - } - return NewDirectedPipeline(operators) } diff --git a/plugin/config.go b/plugin/config.go index d3c4751a..7d4e060c 100644 --- a/plugin/config.go +++ b/plugin/config.go @@ -57,7 +57,7 @@ func (c *Config) Build(bc operator.BuildContext) ([]operator.Operator, error) { } nbc := bc.WithSubNamespace(c.ID()).WithIncrementedDepth() - return pipelineConfig.Pipeline.BuildOperators(nbc) + return pipelineConfig.Pipeline.BuildOperators(nbc, nil) } func (c *Config) BuildsMultipleOps() bool { return true } diff --git a/plugin/config_test.go b/plugin/config_test.go index 9a10e010..29ac4f92 100644 --- a/plugin/config_test.go +++ b/plugin/config_test.go @@ -229,7 +229,7 @@ pipeline: } for _, tc := range cases { - ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t)) + ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t), nil) require.NoError(t, err) require.Len(t, ops, len(tc.ExpectedOpIDs)) @@ -553,7 +553,7 @@ pipeline: for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t)) + ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t), nil) require.NoError(t, err) for i, op := range ops { @@ -595,7 +595,7 @@ pipeline: err = yaml.Unmarshal(pipelineConfig, &pipeline) require.NoError(t, err) - _, err = pipeline.BuildOperators(operator.NewBuildContext(zaptest.NewLogger(t).Sugar())) + _, err = pipeline.BuildOperators(operator.NewBuildContext(zaptest.NewLogger(t).Sugar()), nil) require.Error(t, err) require.Contains(t, err.Error(), "reached max plugin depth") }