From 795473effb28da38598267b7ae1e149e335fbeb5 Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Tue, 15 Oct 2024 13:09:33 -0400 Subject: [PATCH] Fix logs-only check in agent Fix container environment merge --- .../amazon-cloudwatch-agent.go | 71 +++++++++-------- .../amazon-cloudwatch-agent_test.go | 79 ++++++++++--------- .../testdata/base+merge.yaml | 14 ++++ .../testdata/merge.yaml | 11 +++ internal/merge/confmap/confmap.go | 22 ------ internal/merge/confmap/confmap_test.go | 2 +- internal/merge/confmap/load.go | 50 ++++++++++++ internal/merge/confmap/load_test.go | 36 +++++++++ 8 files changed, 189 insertions(+), 96 deletions(-) create mode 100644 cmd/amazon-cloudwatch-agent/testdata/base+merge.yaml create mode 100644 cmd/amazon-cloudwatch-agent/testdata/merge.yaml create mode 100644 internal/merge/confmap/load.go create mode 100644 internal/merge/confmap/load_test.go diff --git a/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go b/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go index 267c0a1797..65b8d12436 100644 --- a/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go +++ b/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go @@ -310,18 +310,16 @@ func runAgent(ctx context.Context, } } - isOnlyDefaultConfigPath := len(fOtelConfigs) == 1 && fOtelConfigs[0] == paths.YamlConfigPath - if len(c.Inputs) != 0 && len(c.Outputs) != 0 { log.Println("creating new logs agent") logAgent := logs.NewLogAgent(c) // Always run logAgent as goroutine regardless of whether starting OTEL or Telegraf. go logAgent.Run(ctx) - // If only the default CWAgent config yaml is provided and does not exist, then ASSUME + // If only a single YAML is provided and does not exist, then ASSUME the agent is // just monitoring logs since this is the default when no OTEL config flag is provided. // So just start Telegraf. - if isOnlyDefaultConfigPath { + if len(fOtelConfigs) == 1 { _, err = os.Stat(fOtelConfigs[0]) if errors.Is(err, os.ErrNotExist) { useragent.Get().SetComponents(&otelcol.Config{}, c) @@ -333,20 +331,20 @@ func runAgent(ctx context.Context, level := cwaLogger.ConvertToAtomicLevel(wlog.LogLevel()) logger, loggerOptions := cwaLogger.NewLogger(writer, level) - uris := fOtelConfigs - // merge configs together - if len(uris) > 1 { - result, err := mergeConfigs(uris, isOnlyDefaultConfigPath) - if err != nil { - return err - } - _ = os.Setenv(envconfig.CWAgentMergedOtelConfig, toyamlconfig.ToYamlConfig(result.ToStringMap())) - uris = []string{"env:" + envconfig.CWAgentMergedOtelConfig} + otelConfigs := fOtelConfigs + // try merging configs together, will return nil if nothing to merge + merged, err := mergeConfigs(otelConfigs) + if err != nil { + return err + } + if merged != nil { + _ = os.Setenv(envconfig.CWAgentMergedOtelConfig, toyamlconfig.ToYamlConfig(merged.ToStringMap())) + otelConfigs = []string{"env:" + envconfig.CWAgentMergedOtelConfig} } else { _ = os.Unsetenv(envconfig.CWAgentMergedOtelConfig) } - providerSettings := configprovider.GetSettings(uris, logger) + providerSettings := configprovider.GetSettings(otelConfigs, logger) provider, err := otelcol.NewConfigProvider(providerSettings) if err != nil { return fmt.Errorf("error while initializing config provider: %v", err) @@ -382,7 +380,7 @@ func runAgent(ctx context.Context, // docs: https://github.com/open-telemetry/opentelemetry-collector/blob/93cbae436ae61b832279dbbb18a0d99214b7d305/otelcol/command.go#L63 // ************************************************************************************************* var e []string - for _, uri := range uris { + for _, uri := range otelConfigs { e = append(e, "--config="+uri) } cmd.SetArgs(e) @@ -405,32 +403,37 @@ func getCollectorParams(factories otelcol.Factories, providerSettings otelcol.Co } } -func mergeConfigs(configPaths []string, isOnlyDefaultConfigPath bool) (*confmap.Conf, error) { - result := confmap.New() - content, ok := os.LookupEnv(envconfig.CWOtelConfigContent) +// mergeConfigs tries to merge configurations together. If nothing to merge, returns nil without an error. +func mergeConfigs(configPaths []string) (*confmap.Conf, error) { // Similar to translator, for containerized agent, try to use env variable if no other supplemental config paths // are provided. - if ok && len(content) > 0 && isOnlyDefaultConfigPath && envconfig.IsRunningInContainer() { - log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent) - conf, err := confmap.LoadFromBytes([]byte(content)) - if err != nil { - return nil, fmt.Errorf("failed to load OTEL configs: %w", err) + var loaders []confmap.Loader + if envconfig.IsRunningInContainer() && len(configPaths) == 1 { + content, ok := os.LookupEnv(envconfig.CWOtelConfigContent) + if ok && len(content) > 0 { + log.Printf("D! Merging OTEL configuration from: %s, %s", envconfig.CWOtelConfigContent, configPaths[0]) + loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)), confmap.NewFileLoader(configPaths[0])) } - if err = result.Merge(conf); err != nil { - return nil, fmt.Errorf("failed to merge OTEL configs: %w", err) + } else if len(configPaths) > 1 { + log.Printf("D! Merging OTEL configurations from: %s", configPaths) + for _, configPath := range configPaths { + loaders = append(loaders, confmap.NewFileLoader(configPath)) } } - log.Printf("D! Merging OTEL configurations from: %s", configPaths) - for _, configPath := range configPaths { - conf, err := confmap.LoadFromFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to load OTEL configs: %w", err) - } - if err = result.Merge(conf); err != nil { - return nil, fmt.Errorf("failed to merge OTEL configs: %w", err) + if len(loaders) > 0 { + result := confmap.New() + for _, loader := range loaders { + conf, err := loader.Load() + if err != nil { + return nil, fmt.Errorf("failed to load OTEL configs: %w", err) + } + if err = result.Merge(conf); err != nil { + return nil, fmt.Errorf("failed to merge OTEL configs: %w", err) + } } + return result, nil } - return result, nil + return nil, nil } func components(telegrafConfig *config.Config) (otelcol.Factories, error) { diff --git a/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent_test.go b/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent_test.go index a87b9c8935..4e9f30509a 100644 --- a/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent_test.go +++ b/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent_test.go @@ -81,57 +81,55 @@ service: exporters: [nop] ` testCases := map[string]struct { - input []string - isContainer bool - isOnlyDefaultConfigPath bool - envValue string - want *confmap.Conf - wantErr bool + input []string + isContainer bool + envValue string + want *confmap.Conf + wantErr bool }{ - "WithoutInvalidFile": { - input: []string{filepath.Join("not", "a", "file")}, + "WithInvalidFile": { + input: []string{filepath.Join("not", "a", "file"), filepath.Join("testdata", "base.yaml")}, wantErr: true, }, + "WithNoMerge": { + input: []string{filepath.Join("testdata", "base.yaml")}, + wantErr: false, + }, "WithoutEnv/Container": { - input: []string{filepath.Join("testdata", "base.yaml")}, - isContainer: true, - isOnlyDefaultConfigPath: true, - want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")), + input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")}, + isContainer: true, + want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")), }, "WithEnv/NonContainer": { - input: []string{filepath.Join("testdata", "base.yaml")}, - isContainer: false, - isOnlyDefaultConfigPath: true, - envValue: testEnvValue, - want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")), + input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")}, + isContainer: false, + envValue: testEnvValue, + want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")), }, "WithEnv/Container": { - input: []string{filepath.Join("testdata", "base.yaml")}, - isContainer: true, - isOnlyDefaultConfigPath: true, - envValue: testEnvValue, - want: mustLoadFromFile(t, filepath.Join("testdata", "base+env.yaml")), + input: []string{filepath.Join("testdata", "base.yaml")}, + isContainer: true, + envValue: testEnvValue, + want: mustLoadFromFile(t, filepath.Join("testdata", "base+env.yaml")), }, "WithEmptyEnv/Container": { - input: []string{filepath.Join("testdata", "base.yaml")}, - isContainer: true, - isOnlyDefaultConfigPath: true, - envValue: "", - want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")), + input: []string{filepath.Join("testdata", "base.yaml")}, + isContainer: true, + envValue: "", + want: nil, + wantErr: false, }, "WithInvalidEnv/Container": { - input: []string{filepath.Join("testdata", "base.yaml")}, - isContainer: true, - isOnlyDefaultConfigPath: true, - envValue: "test", - wantErr: true, + input: []string{filepath.Join("testdata", "base.yaml")}, + isContainer: true, + envValue: "test", + wantErr: true, }, "WithIgnoredEnv/Container": { - input: []string{filepath.Join("testdata", "base.yaml")}, - isContainer: true, - isOnlyDefaultConfigPath: false, - envValue: testEnvValue, - want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")), + input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")}, + isContainer: true, + envValue: testEnvValue, + want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")), }, } for name, testCase := range testCases { @@ -140,10 +138,13 @@ service: t.Setenv(envconfig.RunInContainer, envconfig.TrueValue) } t.Setenv(envconfig.CWOtelConfigContent, testCase.envValue) - got, err := mergeConfigs(testCase.input, testCase.isOnlyDefaultConfigPath) + got, err := mergeConfigs(testCase.input) if testCase.wantErr { assert.Error(t, err) assert.Nil(t, got) + } else if testCase.want == nil { + assert.NoError(t, err) + assert.Nil(t, got) } else { assert.NoError(t, err) assert.NotNil(t, got) @@ -154,7 +155,7 @@ service: } func mustLoadFromFile(t *testing.T, path string) *confmap.Conf { - conf, err := confmap.LoadFromFile(path) + conf, err := confmap.NewFileLoader(path).Load() require.NoError(t, err) return conf } diff --git a/cmd/amazon-cloudwatch-agent/testdata/base+merge.yaml b/cmd/amazon-cloudwatch-agent/testdata/base+merge.yaml new file mode 100644 index 0000000000..6f8d616874 --- /dev/null +++ b/cmd/amazon-cloudwatch-agent/testdata/base+merge.yaml @@ -0,0 +1,14 @@ +receivers: + nop: + +exporters: + nop: + +service: + pipelines: + metrics: + receivers: [nop] + exporters: [nop] + traces: + receivers: [nop] + exporters: [nop] \ No newline at end of file diff --git a/cmd/amazon-cloudwatch-agent/testdata/merge.yaml b/cmd/amazon-cloudwatch-agent/testdata/merge.yaml new file mode 100644 index 0000000000..54f35d4186 --- /dev/null +++ b/cmd/amazon-cloudwatch-agent/testdata/merge.yaml @@ -0,0 +1,11 @@ +receivers: + nop: + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [nop] + exporters: [nop] diff --git a/internal/merge/confmap/confmap.go b/internal/merge/confmap/confmap.go index 388fa1da82..4dc9b687fe 100644 --- a/internal/merge/confmap/confmap.go +++ b/internal/merge/confmap/confmap.go @@ -4,15 +4,10 @@ package confmap import ( - "fmt" - "os" - "path/filepath" - "github.com/knadh/koanf/maps" "github.com/knadh/koanf/providers/confmap" "github.com/knadh/koanf/v2" otelconfmap "go.opentelemetry.io/collector/confmap" - "gopkg.in/yaml.v3" ) const ( @@ -48,20 +43,3 @@ func (c *Conf) mergeFromStringMap(data map[string]any) error { func (c *Conf) ToStringMap() map[string]any { return maps.Unflatten(c.k.All(), KeyDelimiter) } - -func LoadFromFile(path string) (*Conf, error) { - // Clean the path before using it. - content, err := os.ReadFile(filepath.Clean(path)) - if err != nil { - return nil, fmt.Errorf("unable to read the file %v: %w", path, err) - } - return LoadFromBytes(content) -} - -func LoadFromBytes(content []byte) (*Conf, error) { - var rawConf map[string]any - if err := yaml.Unmarshal(content, &rawConf); err != nil { - return nil, err - } - return NewFromStringMap(rawConf), nil -} diff --git a/internal/merge/confmap/confmap_test.go b/internal/merge/confmap/confmap_test.go index eda202f67a..2d20f532e0 100644 --- a/internal/merge/confmap/confmap_test.go +++ b/internal/merge/confmap/confmap_test.go @@ -57,7 +57,7 @@ func TestMerge(t *testing.T) { } func mustLoadFromFile(t *testing.T, path string) *Conf { - conf, err := LoadFromFile(path) + conf, err := NewFileLoader(path).Load() require.NoError(t, err) return conf } diff --git a/internal/merge/confmap/load.go b/internal/merge/confmap/load.go new file mode 100644 index 0000000000..c6965e7231 --- /dev/null +++ b/internal/merge/confmap/load.go @@ -0,0 +1,50 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package confmap + +import ( + "fmt" + "os" + "path/filepath" + + "gopkg.in/yaml.v3" +) + +type Loader interface { + Load() (*Conf, error) +} + +type FileLoader struct { + path string +} + +func NewFileLoader(path string) *FileLoader { + return &FileLoader{path: path} +} + +func (f *FileLoader) Load() (*Conf, error) { + // Clean the path before using it. + content, err := os.ReadFile(filepath.Clean(f.path)) + if err != nil { + return nil, fmt.Errorf("unable to read the file %v: %w", f.path, err) + } + return NewByteLoader(f.path, content).Load() +} + +type ByteLoader struct { + id string + content []byte +} + +func NewByteLoader(id string, content []byte) *ByteLoader { + return &ByteLoader{id: id, content: content} +} + +func (b *ByteLoader) Load() (*Conf, error) { + var rawConf map[string]any + if err := yaml.Unmarshal(b.content, &rawConf); err != nil { + return nil, fmt.Errorf("unable to unmarshal contents: %v: %w", b.id, err) + } + return NewFromStringMap(rawConf), nil +} diff --git a/internal/merge/confmap/load_test.go b/internal/merge/confmap/load_test.go new file mode 100644 index 0000000000..df9b773539 --- /dev/null +++ b/internal/merge/confmap/load_test.go @@ -0,0 +1,36 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package confmap + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFileLoader(t *testing.T) { + loader := NewFileLoader(filepath.Join("not", "a", "file")) + got, err := loader.Load() + assert.Error(t, err) + assert.Nil(t, got) + loader = NewFileLoader(filepath.Join("testdata", "base.yaml")) + got, err = loader.Load() + assert.NoError(t, err) + assert.NotNil(t, got) +} + +func TestByteLoader(t *testing.T) { + testCase := `receivers: + nop/1: +` + loader := NewByteLoader("invalid-yaml", []byte("string")) + got, err := loader.Load() + assert.Error(t, err) + assert.Nil(t, got) + loader = NewByteLoader("valid-yaml", []byte(testCase)) + got, err = loader.Load() + assert.NoError(t, err) + assert.NotNil(t, got) +}