Skip to content

Commit

Permalink
Fix logs-only check in agent
Browse files Browse the repository at this point in the history
Fix container environment merge
  • Loading branch information
jefchien committed Oct 15, 2024
1 parent b023efb commit 795473e
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 96 deletions.
71 changes: 37 additions & 34 deletions cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,18 +310,16 @@ func runAgent(ctx context.Context,
}
}

isOnlyDefaultConfigPath := len(fOtelConfigs) == 1 && fOtelConfigs[0] == paths.YamlConfigPath

if len(c.Inputs) != 0 && len(c.Outputs) != 0 {
log.Println("creating new logs agent")
logAgent := logs.NewLogAgent(c)
// Always run logAgent as goroutine regardless of whether starting OTEL or Telegraf.
go logAgent.Run(ctx)

// If only the default CWAgent config yaml is provided and does not exist, then ASSUME
// If only a single YAML is provided and does not exist, then ASSUME the agent is
// just monitoring logs since this is the default when no OTEL config flag is provided.
// So just start Telegraf.
if isOnlyDefaultConfigPath {
if len(fOtelConfigs) == 1 {
_, err = os.Stat(fOtelConfigs[0])
if errors.Is(err, os.ErrNotExist) {
useragent.Get().SetComponents(&otelcol.Config{}, c)
Expand All @@ -333,20 +331,20 @@ func runAgent(ctx context.Context,
level := cwaLogger.ConvertToAtomicLevel(wlog.LogLevel())
logger, loggerOptions := cwaLogger.NewLogger(writer, level)

uris := fOtelConfigs
// merge configs together
if len(uris) > 1 {
result, err := mergeConfigs(uris, isOnlyDefaultConfigPath)
if err != nil {
return err
}
_ = os.Setenv(envconfig.CWAgentMergedOtelConfig, toyamlconfig.ToYamlConfig(result.ToStringMap()))
uris = []string{"env:" + envconfig.CWAgentMergedOtelConfig}
otelConfigs := fOtelConfigs
// try merging configs together, will return nil if nothing to merge
merged, err := mergeConfigs(otelConfigs)
if err != nil {
return err
}
if merged != nil {
_ = os.Setenv(envconfig.CWAgentMergedOtelConfig, toyamlconfig.ToYamlConfig(merged.ToStringMap()))
otelConfigs = []string{"env:" + envconfig.CWAgentMergedOtelConfig}
} else {
_ = os.Unsetenv(envconfig.CWAgentMergedOtelConfig)
}

providerSettings := configprovider.GetSettings(uris, logger)
providerSettings := configprovider.GetSettings(otelConfigs, logger)
provider, err := otelcol.NewConfigProvider(providerSettings)
if err != nil {
return fmt.Errorf("error while initializing config provider: %v", err)
Expand Down Expand Up @@ -382,7 +380,7 @@ func runAgent(ctx context.Context,
// docs: https://github.com/open-telemetry/opentelemetry-collector/blob/93cbae436ae61b832279dbbb18a0d99214b7d305/otelcol/command.go#L63
// *************************************************************************************************
var e []string
for _, uri := range uris {
for _, uri := range otelConfigs {
e = append(e, "--config="+uri)
}
cmd.SetArgs(e)
Expand All @@ -405,32 +403,37 @@ func getCollectorParams(factories otelcol.Factories, providerSettings otelcol.Co
}
}

func mergeConfigs(configPaths []string, isOnlyDefaultConfigPath bool) (*confmap.Conf, error) {
result := confmap.New()
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
// mergeConfigs tries to merge configurations together. If nothing to merge, returns nil without an error.
func mergeConfigs(configPaths []string) (*confmap.Conf, error) {
// Similar to translator, for containerized agent, try to use env variable if no other supplemental config paths
// are provided.
if ok && len(content) > 0 && isOnlyDefaultConfigPath && envconfig.IsRunningInContainer() {
log.Printf("D! Merging OTEL configuration from: %s", envconfig.CWOtelConfigContent)
conf, err := confmap.LoadFromBytes([]byte(content))
if err != nil {
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
var loaders []confmap.Loader
if envconfig.IsRunningInContainer() && len(configPaths) == 1 {
content, ok := os.LookupEnv(envconfig.CWOtelConfigContent)
if ok && len(content) > 0 {
log.Printf("D! Merging OTEL configuration from: %s, %s", envconfig.CWOtelConfigContent, configPaths[0])
loaders = append(loaders, confmap.NewByteLoader(envconfig.CWOtelConfigContent, []byte(content)), confmap.NewFileLoader(configPaths[0]))
}
if err = result.Merge(conf); err != nil {
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
} else if len(configPaths) > 1 {
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
for _, configPath := range configPaths {
loaders = append(loaders, confmap.NewFileLoader(configPath))
}
}
log.Printf("D! Merging OTEL configurations from: %s", configPaths)
for _, configPath := range configPaths {
conf, err := confmap.LoadFromFile(configPath)
if err != nil {
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
}
if err = result.Merge(conf); err != nil {
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
if len(loaders) > 0 {
result := confmap.New()
for _, loader := range loaders {
conf, err := loader.Load()
if err != nil {
return nil, fmt.Errorf("failed to load OTEL configs: %w", err)
}
if err = result.Merge(conf); err != nil {
return nil, fmt.Errorf("failed to merge OTEL configs: %w", err)
}
}
return result, nil
}
return result, nil
return nil, nil
}

func components(telegrafConfig *config.Config) (otelcol.Factories, error) {
Expand Down
79 changes: 40 additions & 39 deletions cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,57 +81,55 @@ service:
exporters: [nop]
`
testCases := map[string]struct {
input []string
isContainer bool
isOnlyDefaultConfigPath bool
envValue string
want *confmap.Conf
wantErr bool
input []string
isContainer bool
envValue string
want *confmap.Conf
wantErr bool
}{
"WithoutInvalidFile": {
input: []string{filepath.Join("not", "a", "file")},
"WithInvalidFile": {
input: []string{filepath.Join("not", "a", "file"), filepath.Join("testdata", "base.yaml")},
wantErr: true,
},
"WithNoMerge": {
input: []string{filepath.Join("testdata", "base.yaml")},
wantErr: false,
},
"WithoutEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
isOnlyDefaultConfigPath: true,
want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")),
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
isContainer: true,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
},
"WithEnv/NonContainer": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: false,
isOnlyDefaultConfigPath: true,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")),
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
isContainer: false,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
},
"WithEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
isOnlyDefaultConfigPath: true,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+env.yaml")),
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+env.yaml")),
},
"WithEmptyEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
isOnlyDefaultConfigPath: true,
envValue: "",
want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")),
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
envValue: "",
want: nil,
wantErr: false,
},
"WithInvalidEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
isOnlyDefaultConfigPath: true,
envValue: "test",
wantErr: true,
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
envValue: "test",
wantErr: true,
},
"WithIgnoredEnv/Container": {
input: []string{filepath.Join("testdata", "base.yaml")},
isContainer: true,
isOnlyDefaultConfigPath: false,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base.yaml")),
input: []string{filepath.Join("testdata", "base.yaml"), filepath.Join("testdata", "merge.yaml")},
isContainer: true,
envValue: testEnvValue,
want: mustLoadFromFile(t, filepath.Join("testdata", "base+merge.yaml")),
},
}
for name, testCase := range testCases {
Expand All @@ -140,10 +138,13 @@ service:
t.Setenv(envconfig.RunInContainer, envconfig.TrueValue)
}
t.Setenv(envconfig.CWOtelConfigContent, testCase.envValue)
got, err := mergeConfigs(testCase.input, testCase.isOnlyDefaultConfigPath)
got, err := mergeConfigs(testCase.input)
if testCase.wantErr {
assert.Error(t, err)
assert.Nil(t, got)
} else if testCase.want == nil {
assert.NoError(t, err)
assert.Nil(t, got)
} else {
assert.NoError(t, err)
assert.NotNil(t, got)
Expand All @@ -154,7 +155,7 @@ service:
}

func mustLoadFromFile(t *testing.T, path string) *confmap.Conf {
conf, err := confmap.LoadFromFile(path)
conf, err := confmap.NewFileLoader(path).Load()
require.NoError(t, err)
return conf
}
14 changes: 14 additions & 0 deletions cmd/amazon-cloudwatch-agent/testdata/base+merge.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
receivers:
nop:

exporters:
nop:

service:
pipelines:
metrics:
receivers: [nop]
exporters: [nop]
traces:
receivers: [nop]
exporters: [nop]
11 changes: 11 additions & 0 deletions cmd/amazon-cloudwatch-agent/testdata/merge.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
receivers:
nop:

exporters:
nop:

service:
pipelines:
traces:
receivers: [nop]
exporters: [nop]
22 changes: 0 additions & 22 deletions internal/merge/confmap/confmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,10 @@
package confmap

import (
"fmt"
"os"
"path/filepath"

"github.com/knadh/koanf/maps"
"github.com/knadh/koanf/providers/confmap"
"github.com/knadh/koanf/v2"
otelconfmap "go.opentelemetry.io/collector/confmap"
"gopkg.in/yaml.v3"
)

const (
Expand Down Expand Up @@ -48,20 +43,3 @@ func (c *Conf) mergeFromStringMap(data map[string]any) error {
func (c *Conf) ToStringMap() map[string]any {
return maps.Unflatten(c.k.All(), KeyDelimiter)
}

func LoadFromFile(path string) (*Conf, error) {
// Clean the path before using it.
content, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return nil, fmt.Errorf("unable to read the file %v: %w", path, err)
}
return LoadFromBytes(content)
}

func LoadFromBytes(content []byte) (*Conf, error) {
var rawConf map[string]any
if err := yaml.Unmarshal(content, &rawConf); err != nil {
return nil, err
}
return NewFromStringMap(rawConf), nil
}
2 changes: 1 addition & 1 deletion internal/merge/confmap/confmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestMerge(t *testing.T) {
}

func mustLoadFromFile(t *testing.T, path string) *Conf {
conf, err := LoadFromFile(path)
conf, err := NewFileLoader(path).Load()
require.NoError(t, err)
return conf
}
50 changes: 50 additions & 0 deletions internal/merge/confmap/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package confmap

import (
"fmt"
"os"
"path/filepath"

"gopkg.in/yaml.v3"
)

type Loader interface {
Load() (*Conf, error)
}

type FileLoader struct {
path string
}

func NewFileLoader(path string) *FileLoader {
return &FileLoader{path: path}
}

func (f *FileLoader) Load() (*Conf, error) {
// Clean the path before using it.
content, err := os.ReadFile(filepath.Clean(f.path))
if err != nil {
return nil, fmt.Errorf("unable to read the file %v: %w", f.path, err)
}
return NewByteLoader(f.path, content).Load()
}

type ByteLoader struct {
id string
content []byte
}

func NewByteLoader(id string, content []byte) *ByteLoader {
return &ByteLoader{id: id, content: content}
}

func (b *ByteLoader) Load() (*Conf, error) {
var rawConf map[string]any
if err := yaml.Unmarshal(b.content, &rawConf); err != nil {
return nil, fmt.Errorf("unable to unmarshal contents: %v: %w", b.id, err)
}
return NewFromStringMap(rawConf), nil
}
36 changes: 36 additions & 0 deletions internal/merge/confmap/load_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package confmap

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFileLoader(t *testing.T) {
loader := NewFileLoader(filepath.Join("not", "a", "file"))
got, err := loader.Load()
assert.Error(t, err)
assert.Nil(t, got)
loader = NewFileLoader(filepath.Join("testdata", "base.yaml"))
got, err = loader.Load()
assert.NoError(t, err)
assert.NotNil(t, got)
}

func TestByteLoader(t *testing.T) {
testCase := `receivers:
nop/1:
`
loader := NewByteLoader("invalid-yaml", []byte("string"))
got, err := loader.Load()
assert.Error(t, err)
assert.Nil(t, got)
loader = NewByteLoader("valid-yaml", []byte(testCase))
got, err = loader.Load()
assert.NoError(t, err)
assert.NotNil(t, got)
}

0 comments on commit 795473e

Please sign in to comment.