diff --git a/internal/configprovider/builder.go b/internal/configprovider/builder.go index e2b989dceaf..aa28636d14e 100644 --- a/internal/configprovider/builder.go +++ b/internal/configprovider/builder.go @@ -22,12 +22,6 @@ import ( "go.uber.org/zap" ) -// Private error types to help with testability. -type ( - errConfigSourceCreation struct{ error } - errFactoryCreatedNil struct{ error } -) - // Build builds the ConfigSource objects according to the given ConfigSettings. func Build(ctx context.Context, configSourcesSettings map[string]Source, params CreateParams, factories Factories) (map[string]ConfigSource, error) { cfgSources := make(map[string]ConfigSource, len(configSourcesSettings)) @@ -35,23 +29,17 @@ func Build(ctx context.Context, configSourcesSettings map[string]Source, params // If we have the setting we also have the factory. factory, ok := factories[cfgSrcSettings.ID().Type()] if !ok { - return nil, &errUnknownType{ - fmt.Errorf("unknown %s config source type for %s", cfgSrcSettings.ID().Type(), fullName), - } + return nil, fmt.Errorf("unknown %s config source type for %s", cfgSrcSettings.ID().Type(), fullName) } params.Logger = params.Logger.With(zap.String("config_source", fullName)) cfgSrc, err := factory.CreateConfigSource(ctx, params, cfgSrcSettings) if err != nil { - return nil, &errConfigSourceCreation{ - fmt.Errorf("failed to create config source %s: %w", fullName, err), - } + return nil, fmt.Errorf("failed to create config source %s: %w", fullName, err) } if cfgSrc == nil { - return nil, &errFactoryCreatedNil{ - fmt.Errorf("factory for %q produced a nil extension", fullName), - } + return nil, fmt.Errorf("factory for %q produced a nil extension", fullName) } cfgSources[fullName] = cfgSrc diff --git a/internal/configprovider/builder_test.go b/internal/configprovider/builder_test.go index 5ec5d8401ed..6618fb5a7c1 100644 --- a/internal/configprovider/builder_test.go +++ b/internal/configprovider/builder_test.go @@ -40,7 +40,7 @@ func TestConfigSourceBuild(t *testing.T) { configSettings map[string]Source factories Factories expectedCfgSources map[string]ConfigSource - wantErr error + wantErr string name string }{ { @@ -51,7 +51,7 @@ func TestConfigSourceBuild(t *testing.T) { }, }, factories: testFactories, - wantErr: &errUnknownType{}, + wantErr: "unknown unknown_config_source config source type for tstcfgsrc", }, { name: "creation_error", @@ -65,7 +65,7 @@ func TestConfigSourceBuild(t *testing.T) { ErrOnCreateConfigSource: errors.New("forced test error"), }, }, - wantErr: &errConfigSourceCreation{}, + wantErr: "failed to create config source tstcfgsrc: forced test error", }, { name: "factory_return_nil", @@ -77,7 +77,7 @@ func TestConfigSourceBuild(t *testing.T) { factories: Factories{ "tstcfgsrc": &mockNilCfgSrcFactory{}, }, - wantErr: &errFactoryCreatedNil{}, + wantErr: "factory for \"tstcfgsrc\" produced a nil extension", }, { name: "base_case", @@ -108,7 +108,11 @@ func TestConfigSourceBuild(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { builtCfgSources, err := Build(ctx, tt.configSettings, params, tt.factories) - require.IsType(t, tt.wantErr, err) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } require.Equal(t, tt.expectedCfgSources, builtCfgSources) }) } diff --git a/internal/configprovider/component.go b/internal/configprovider/component.go index 33fc2f989db..94b4837db3e 100644 --- a/internal/configprovider/component.go +++ b/internal/configprovider/component.go @@ -17,23 +17,10 @@ package configprovider import ( "context" - "errors" "go.opentelemetry.io/collector/confmap" ) -// ErrSessionClosed is returned by WatchForUpdate functions when its parent Session -// object is closed. -// This error can be wrapped with additional information. Callers trying to identify this -// specific error must use errors.Is. -var ErrSessionClosed = errors.New("parent session was closed") - -// ErrValueUpdated is returned by WatchForUpdate functions when the value being watched -// was changed or expired and needs to be retrieved again. -// This error can be wrapped with additional information. Callers trying to identify this -// specific error must use errors.Is. -var ErrValueUpdated = errors.New("configuration must retrieve the updated value") - // ConfigSource is the interface to be implemented by objects used by the collector // to retrieve external configuration information. // diff --git a/internal/configprovider/config_source_provider.go b/internal/configprovider/config_source_provider.go index 923e9aa252d..aef195af29b 100644 --- a/internal/configprovider/config_source_provider.go +++ b/internal/configprovider/config_source_provider.go @@ -21,25 +21,20 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" - "go.uber.org/multierr" "go.uber.org/zap" "github.com/signalfx/splunk-otel-collector/internal/configconverter" ) -type errDuplicatedConfigSourceFactory struct{ error } - var ( _ confmap.Provider = (*configSourceConfigMapProvider)(nil) ) type configSourceConfigMapProvider struct { logger *zap.Logger - csm *Manager configServer *configconverter.ConfigServer wrappedProvider confmap.Provider wrappedRetrieved *confmap.Retrieved - retrieved *confmap.Retrieved buildInfo component.BuildInfo factories []Factory } @@ -55,7 +50,6 @@ func NewConfigSourceConfigMapProvider(wrappedProvider confmap.Provider, logger * factories: factories, buildInfo: buildInfo, wrappedRetrieved: &confmap.Retrieved{}, - retrieved: &confmap.Retrieved{}, } } @@ -86,33 +80,6 @@ func (c *configSourceConfigMapProvider) Retrieve(ctx context.Context, uri string } c.wrappedRetrieved = newWrappedRetrieved - var cfg *confmap.Conf - if cfg, err = c.Get(ctx, uri); err != nil { - return nil, err - } else if cfg == nil { - cfg = &confmap.Conf{} - } - - c.retrieved, err = confmap.NewRetrieved( - cfg.ToStringMap(), - confmap.WithRetrievedClose(c.wrappedRetrieved.Close), - ) - return c.retrieved, err -} - -func (c *configSourceConfigMapProvider) Scheme() string { - return c.wrappedProvider.Scheme() -} - -func (c *configSourceConfigMapProvider) Shutdown(ctx context.Context) error { - c.configServer.Unregister() - return c.wrappedProvider.Shutdown(ctx) -} - -// Get returns a config.Parser that wraps the config.Default() with a parser -// that can load and inject data from config sources. If there are no config sources -// in the configuration the returned parser behaves like the config.Default(). -func (c *configSourceConfigMapProvider) Get(_ context.Context, uri string) (*confmap.Conf, error) { factories, err := makeFactoryMap(c.factories) if err != nil { return nil, err @@ -123,38 +90,28 @@ func (c *configSourceConfigMapProvider) Get(_ context.Context, uri string) (*con return nil, err } c.configServer.SetForScheme(c.Scheme(), wrappedMap.ToStringMap()) - csm, err := NewManager(wrappedMap, c.logger, c.buildInfo, factories) + retrieved, closeFunc, err := Resolve(ctx, wrappedMap, c.logger, c.buildInfo, factories, onChange) if err != nil { return nil, err } - effectiveMap, err := csm.Resolve(context.Background(), wrappedMap) - if err != nil { - return nil, err - } - - c.csm = csm - return effectiveMap, nil + return confmap.NewRetrieved(retrieved, confmap.WithRetrievedClose(mergeCloseFuncs([]confmap.CloseFunc{closeFunc, c.wrappedRetrieved.Close}))) } -// WatchForUpdate is used to monitor for updates on configuration values that -// were retrieved from config sources. -func (c *configSourceConfigMapProvider) WatchForUpdate() error { - return c.csm.WatchForUpdate() +func (c *configSourceConfigMapProvider) Scheme() string { + return c.wrappedProvider.Scheme() } -// Close ends the watch for updates and closes the parser provider and respective -// config sources. -func (c *configSourceConfigMapProvider) Close(ctx context.Context) error { +func (c *configSourceConfigMapProvider) Shutdown(ctx context.Context) error { c.configServer.Unregister() - return multierr.Combine(c.csm.Close(ctx), c.retrieved.Close(ctx)) + return c.wrappedProvider.Shutdown(ctx) } func makeFactoryMap(factories []Factory) (Factories, error) { fMap := make(Factories) for _, f := range factories { if _, ok := fMap[f.Type()]; ok { - return nil, &errDuplicatedConfigSourceFactory{fmt.Errorf("duplicate config source factory %q", f.Type())} + return nil, fmt.Errorf("duplicate config source factory %q", f.Type()) } fMap[f.Type()] = f } diff --git a/internal/configprovider/config_source_provider_test.go b/internal/configprovider/config_source_provider_test.go index b3da4632f41..2ee7f9f462e 100644 --- a/internal/configprovider/config_source_provider_test.go +++ b/internal/configprovider/config_source_provider_test.go @@ -18,9 +18,7 @@ package configprovider import ( "context" "errors" - "fmt" "path" - "sync" "testing" "github.com/stretchr/testify/assert" @@ -37,7 +35,7 @@ func TestConfigSourceConfigMapProvider(t *testing.T) { tests := []struct { parserProvider confmap.Provider configLocation []string - wantErr error + wantErr string name string factories []Factory }{ @@ -49,7 +47,7 @@ func TestConfigSourceConfigMapProvider(t *testing.T) { parserProvider: &mockParserProvider{ ErrOnGet: true, }, - wantErr: &errOnParserProviderGet{}, + wantErr: "mockParserProvider.Get() forced test error", }, { name: "duplicated_factory_type", @@ -57,7 +55,7 @@ func TestConfigSourceConfigMapProvider(t *testing.T) { &mockCfgSrcFactory{}, &mockCfgSrcFactory{}, }, - wantErr: &errDuplicatedConfigSourceFactory{}, + wantErr: "duplicate config source factory \"tstcfgsrc\"", }, { name: "new_manager_builder_error", @@ -68,13 +66,13 @@ func TestConfigSourceConfigMapProvider(t *testing.T) { }, parserProvider: fileprovider.New(), configLocation: []string{"file:" + path.Join("testdata", "basic_config.yaml")}, - wantErr: &errConfigSourceCreation{}, + wantErr: "failed to create config source tstcfgsrc", }, { name: "manager_resolve_error", parserProvider: fileprovider.New(), configLocation: []string{"file:" + path.Join("testdata", "manager_resolve_error.yaml")}, - wantErr: fmt.Errorf("error not wrappedProviders by specific error type: %w", ErrSessionClosed), + wantErr: "config source \"tstcfgsrc\" failed to retrieve value: no value for selector \"selector\"", }, { name: "multiple_config_success", @@ -118,13 +116,15 @@ func TestConfigSourceConfigMapProvider(t *testing.T) { configLocation = "" } r, err := pp.Retrieve(context.Background(), configLocation, nil) - if tt.wantErr == nil { + if tt.wantErr == "" { require.NoError(t, err) require.NotNil(t, r) - rMap, _ := r.AsConf() - require.NotNil(t, rMap) + rMap, errAsConf := r.AsConf() + require.NoError(t, errAsConf) + assert.NotNil(t, rMap) + assert.NoError(t, r.Close(context.Background())) } else { - assert.IsType(t, tt.wantErr, err) + assert.ErrorContains(t, err, tt.wantErr) assert.Nil(t, r) return } @@ -132,20 +132,7 @@ func TestConfigSourceConfigMapProvider(t *testing.T) { ok = i < len(tt.configLocation) } - var watchForUpdatedError error - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - watchForUpdatedError = cspp.WatchForUpdate() - }() - require.NotNil(t, cspp.csm) - - closeErr := cspp.Close(context.Background()) - assert.NoError(t, closeErr) - - wg.Wait() - assert.Equal(t, ErrSessionClosed, watchForUpdatedError) + assert.NoError(t, cspp.Shutdown(context.Background())) }) } } @@ -156,31 +143,17 @@ type mockParserProvider struct { var _ confmap.Provider = (*mockParserProvider)(nil) -func (mpp *mockParserProvider) Retrieve(ctx context.Context, _ string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { - m, err := mpp.Get(ctx) - if err != nil { - return nil, err +func (mpp *mockParserProvider) Retrieve(context.Context, string, confmap.WatcherFunc) (*confmap.Retrieved, error) { + if mpp.ErrOnGet { + return nil, errors.New("mockParserProvider.Get() forced test error") } - return confmap.NewRetrieved(m.ToStringMap()) + return confmap.NewRetrieved(confmap.New().ToStringMap()) } -func (mpp *mockParserProvider) Shutdown(ctx context.Context) error { +func (mpp *mockParserProvider) Shutdown(context.Context) error { return nil } func (mpp *mockParserProvider) Scheme() string { return "" } - -func (mpp *mockParserProvider) Get(context.Context) (*confmap.Conf, error) { - if mpp.ErrOnGet { - return nil, &errOnParserProviderGet{errors.New("mockParserProvider.Get() forced test error")} - } - return confmap.New(), nil -} - -func (mpp *mockParserProvider) Close(context.Context) error { - return nil -} - -type errOnParserProviderGet struct{ error } diff --git a/internal/configprovider/manager.go b/internal/configprovider/manager.go index bf9e714309b..69ee31e441c 100644 --- a/internal/configprovider/manager.go +++ b/internal/configprovider/manager.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" + "github.com/knadh/koanf/maps" "github.com/spf13/cast" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" @@ -57,15 +58,13 @@ var ddBackwardCompatible = func() bool { return true }() -// Manager is used to inject data from config sources into a configuration and also -// to monitor for updates on the items injected into the configuration. All methods -// of a Manager must be called only once and have an expected sequence: +// Resolve inspects the given confmap.Conf and resolves all config sources referenced +// in the configuration, returning a confmap.Conf in which all env vars and config sources on +// the given input config map are resolved to actual literal values of the env vars or config sources. // -// 1. NewManager to create a new instance; -// 2. Resolve to inject the data from config sources into a configuration; -// 3. WatchForUpdate in a goroutine to wait for configuration updates; -// 4. WaitForWatcher to wait until the watchers are in place; -// 5. Close to close the instance; +// 1. Resolve to inject the data from config sources into a configuration; +// 2. Wait for an update on "watcher" func. +// 3. Close the confmap.Retrieved instance; // // The current syntax to reference a config source in a YAML is provisional. Currently // single-line: @@ -169,22 +168,10 @@ var ddBackwardCompatible = func() bool { // results. // // For an overview about the internals of the Manager refer to the package README.md. -type Manager struct { - configSources map[string]ConfigSource - closeCh chan struct{} - // Use a channel to capture the first error returned by any watcher and another one - // to ensure completion of any remaining watcher also trying to report an error. - errChannel chan error - retrieved []*confmap.Retrieved -} - -// NewManager creates a new instance of a Manager to be used to inject data from -// ConfigSource objects into a configuration and watch for updates on the injected -// data. -func NewManager(configMap *confmap.Conf, logger *zap.Logger, buildInfo component.BuildInfo, factories Factories) (*Manager, error) { +func Resolve(ctx context.Context, configMap *confmap.Conf, logger *zap.Logger, buildInfo component.BuildInfo, factories Factories, watcher confmap.WatcherFunc) (map[string]any, confmap.CloseFunc, error) { configSourcesSettings, err := Load(context.Background(), configMap, factories) if err != nil { - return nil, err + return nil, nil, err } params := CreateParams{ @@ -193,19 +180,16 @@ func NewManager(configMap *confmap.Conf, logger *zap.Logger, buildInfo component } cfgSources, err := Build(context.Background(), configSourcesSettings, params, factories) if err != nil { - return nil, err + return nil, nil, err } - return newManager(cfgSources), nil + return resolve(ctx, cfgSources, configMap, watcher) } -// Resolve inspects the given confmap.Conf and resolves all config sources referenced -// in the configuration, returning a confmap.Conf in which all env vars and config sources on -// the given input config map are resolved to actual literal values of the env vars or config sources. -// This method must be called only once per lifetime of a Manager object. -func (m *Manager) Resolve(ctx context.Context, configMap *confmap.Conf) (*confmap.Conf, error) { +func resolve(ctx context.Context, configSources map[string]ConfigSource, configMap *confmap.Conf, watcher confmap.WatcherFunc) (map[string]any, confmap.CloseFunc, error) { res := map[string]any{} allKeys := configMap.AllKeys() + var closeFuncs []confmap.CloseFunc for _, k := range allKeys { if strings.HasPrefix(k, configSourcesKey) { // Remove everything under the config_sources section. The `config_sources` section @@ -214,64 +198,29 @@ func (m *Manager) Resolve(ctx context.Context, configMap *confmap.Conf) (*confma continue } - value, err := m.parseConfigValue(ctx, configMap.Get(k)) + value, closeFunc, err := parseConfigValue(ctx, configSources, configMap.Get(k), watcher) if err != nil { - return nil, err + return nil, nil, err } res[k] = value + if closeFunc != nil { + closeFuncs = append(closeFuncs, closeFunc) + } } - return confmap.NewFromStringMap(res), nil -} - -// WatchForUpdate must watch for updates on any of the values retrieved from config sources -// and injected into the configuration. Typically this method is launched in a goroutine, the -// method WaitForWatcher blocks until the WatchForUpdate goroutine is running and ready. -func (m *Manager) WatchForUpdate() error { - select { - case err := <-m.errChannel: - // Return the first error that reaches the channel and ignore any other error. - return err - case <-m.closeCh: - // This covers the case that all watchers returned ErrWatcherNotSupported. - return ErrSessionClosed - } -} - -// Close terminates the WatchForUpdate function and closes all Session objects used -// in the configuration. It should be called -func (m *Manager) Close(ctx context.Context) error { - var errs error - for _, ret := range m.retrieved { - errs = multierr.Append(errs, ret.Close(ctx)) - } - - for _, source := range m.configSources { - errs = multierr.Append(errs, source.Shutdown(ctx)) - } - - close(m.closeCh) - - return errs -} - -func newManager(configSources map[string]ConfigSource) *Manager { - return &Manager{ - configSources: configSources, - closeCh: make(chan struct{}), - errChannel: make(chan error, 1), - } + maps.IntfaceKeysToStrings(res) + return res, mergeCloseFuncs(closeFuncs), nil } // parseConfigValue takes the value of a "config node" and process it recursively. The processing consists // in transforming invocations of config sources and/or environment variables into literal data that can be // used directly from a `confmap.Conf` object. -func (m *Manager) parseConfigValue(ctx context.Context, value any) (any, error) { +func parseConfigValue(ctx context.Context, configSources map[string]ConfigSource, value any, watcher confmap.WatcherFunc) (any, confmap.CloseFunc, error) { switch v := value.(type) { case string: // Only if the value of the node is a string it can contain an env var or config source // invocation that requires transformation. - return m.parseStringValue(ctx, v) + return parseStringValue(ctx, configSources, v, watcher) case []any: // The value is of type []any when an array is used in the configuration, YAML example: // @@ -286,36 +235,46 @@ func (m *Manager) parseConfigValue(ctx context.Context, value any) (any, error) // // Both "array0" and "array1" are going to be leaf config nodes hitting this case. nslice := make([]any, 0, len(v)) + var closeFuncs []confmap.CloseFunc for _, vint := range v { - value, err := m.parseConfigValue(ctx, vint) + value, closeFunc, err := parseConfigValue(ctx, configSources, vint, watcher) if err != nil { - return nil, err + return nil, nil, err + } + if closeFunc != nil { + closeFuncs = append(closeFuncs, closeFunc) } nslice = append(nslice, value) } - return nslice, nil + return nslice, mergeCloseFuncs(closeFuncs), nil case map[string]any: // The value is of type map[string]any when an array in the configuration is populated with map // elements. From the case above (for type []any) each element of "array1" is going to hit the // the current case block. nmap := make(map[any]any, len(v)) + var closeFuncs []confmap.CloseFunc for k, vint := range v { - value, err := m.parseConfigValue(ctx, vint) + value, closeFunc, err := parseConfigValue(ctx, configSources, vint, watcher) if err != nil { - return nil, err + return nil, nil, err + } + if closeFunc != nil { + closeFuncs = append(closeFuncs, closeFunc) } nmap[k] = value } - return nmap, nil + return nmap, mergeCloseFuncs(closeFuncs), nil default: // All other literals (int, boolean, etc) can't be further expanded so just return them as they are. - return v, nil + return v, nil, nil } } // parseStringValue transforms environment variables and config sources, if any are present, on // the given string in the configuration into an object to be inserted into the resulting configuration. -func (m *Manager) parseStringValue(ctx context.Context, s string) (any, error) { +func parseStringValue(ctx context.Context, configSources map[string]ConfigSource, s string, watcher confmap.WatcherFunc) (any, confmap.CloseFunc, error) { + var closeFuncs []confmap.CloseFunc + // Code based on os.Expand function. All delimiters that are checked against are // ASCII so bytes are fine for this operation. var buf []byte @@ -394,9 +353,12 @@ func (m *Manager) parseStringValue(ctx context.Context, s string) (any, error) { default: // A config source, retrieve and apply results. - retrieved, err := m.retrieveConfigSourceData(ctx, cfgSrcName, expandableContent) + retrieved, closeFunc, err := retrieveConfigSourceData(ctx, configSources, cfgSrcName, expandableContent, watcher) if err != nil { - return nil, err + return nil, nil, err + } + if closeFunc != nil { + closeFuncs = append(closeFuncs, closeFunc) } consumedAll := j+w+1 == len(s) @@ -423,7 +385,7 @@ func (m *Manager) parseStringValue(ctx context.Context, s string) (any, error) { retrieved = cast.ToStringMap(mapIFace) } - return retrieved, nil + return retrieved, mergeCloseFuncs(closeFuncs), nil } // Either there was a prefix already or there are still characters to be processed. @@ -443,11 +405,11 @@ func (m *Manager) parseStringValue(ctx context.Context, s string) (any, error) { if buf == nil { // No changes to original string, just return it. - return s, nil + return s, mergeCloseFuncs(closeFuncs), nil } // Return whatever was accumulated on the buffer plus the remaining of the original string. - return string(buf) + s[i:], nil + return string(buf) + s[i:], mergeCloseFuncs(closeFuncs), nil } func getBracketedExpandableContent(s string, i int) (expandableContent string, consumed int, cfgSrcName string) { @@ -483,44 +445,50 @@ func getBareExpandableContent(s string, i int) (expandableContent string, consum // retrieveConfigSourceData retrieves data from the specified config source and injects them into // the configuration. The Manager tracks sessions and watcher objects as needed. -func (m *Manager) retrieveConfigSourceData(ctx context.Context, cfgSrcName, cfgSrcInvocation string) (any, error) { - cfgSrc, ok := m.configSources[cfgSrcName] +func retrieveConfigSourceData(ctx context.Context, configSources map[string]ConfigSource, cfgSrcName, cfgSrcInvocation string, watcher confmap.WatcherFunc) (any, confmap.CloseFunc, error) { + var closeFuncs []confmap.CloseFunc + cfgSrc, ok := configSources[cfgSrcName] if !ok { - return nil, newErrUnknownConfigSource(cfgSrcName) + return nil, nil, newErrUnknownConfigSource(cfgSrcName) } cfgSrcName, selector, paramsConfigMap, err := parseCfgSrcInvocation(cfgSrcInvocation) if err != nil { - return nil, err + return nil, nil, err } // Recursively expand the selector. - var expandedSelector any - expandedSelector, err = m.parseStringValue(ctx, selector) + expandedSelector, closeFunc, err := parseStringValue(ctx, configSources, selector, watcher) if err != nil { - return nil, fmt.Errorf("failed to process selector for config source %q selector %q: %w", cfgSrcName, selector, err) + return nil, nil, fmt.Errorf("failed to process selector for config source %q selector %q: %w", cfgSrcName, selector, err) } if selector, ok = expandedSelector.(string); !ok { - return nil, fmt.Errorf("processed selector must be a string instead got a %T %v", expandedSelector, expandedSelector) + return nil, nil, fmt.Errorf("processed selector must be a string instead got a %T %v", expandedSelector, expandedSelector) + } + if closeFunc != nil { + closeFuncs = append(closeFuncs, closeFunc) } // Recursively resolve/parse any config source on the parameters. if paramsConfigMap != nil { - paramsConfigMap, err = m.Resolve(ctx, paramsConfigMap) - if err != nil { - return nil, fmt.Errorf("failed to process parameters for config source %q invocation %q: %w", cfgSrcName, cfgSrcInvocation, err) + paramsConfigMapRet, closeFunc, errResolve := resolve(ctx, configSources, paramsConfigMap, watcher) + if errResolve != nil { + return nil, nil, fmt.Errorf("failed to process parameters for config source %q invocation %q: %w", cfgSrcName, cfgSrcInvocation, errResolve) + } + if closeFunc != nil { + closeFuncs = append(closeFuncs, closeFunc) } + paramsConfigMap = confmap.NewFromStringMap(paramsConfigMapRet) } - retrieved, err := cfgSrc.Retrieve(ctx, selector, paramsConfigMap, func(event *confmap.ChangeEvent) { - m.errChannel <- event.Error - }) + retrieved, err := cfgSrc.Retrieve(ctx, selector, paramsConfigMap, watcher) if err != nil { - return nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err) + return nil, nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err) } - m.retrieved = append(m.retrieved, retrieved) - return retrieved.AsRaw() + closeFuncs = append(closeFuncs, retrieved.Close) + val, err := retrieved.AsRaw() + return val, mergeCloseFuncs(closeFuncs), err } func newErrUnknownConfigSource(cfgSrcName string) error { @@ -702,3 +670,21 @@ func isShellSpecialVar(c uint8) bool { func isAlphaNum(c uint8) bool { return c == '_' || '0' <= c && c <= '9' || 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' } + +func mergeCloseFuncs(closeFuncs []confmap.CloseFunc) confmap.CloseFunc { + if len(closeFuncs) == 0 { + return nil + } + if len(closeFuncs) == 1 { + return closeFuncs[0] + } + return func(ctx context.Context) error { + var errs error + for _, closeFunc := range closeFuncs { + if closeFunc != nil { + errs = multierr.Append(errs, closeFunc(ctx)) + } + } + return errs + } +} diff --git a/internal/configprovider/manager_test.go b/internal/configprovider/manager_test.go index 02bd4875bff..6dde76f9011 100644 --- a/internal/configprovider/manager_test.go +++ b/internal/configprovider/manager_test.go @@ -21,6 +21,7 @@ import ( "path" "testing" + "github.com/knadh/koanf/maps" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -29,10 +30,12 @@ import ( "go.uber.org/zap" ) -func TestConfigSourceManager_NewManager(t *testing.T) { +var errValueUpdated = errors.New("configuration must retrieve the updated value") + +func TestConfigSourceManagerNewManager(t *testing.T) { tests := []struct { factories Factories - wantErr error + wantErr string name string file string }{ @@ -47,7 +50,7 @@ func TestConfigSourceManager_NewManager(t *testing.T) { name: "unknown_type", file: "basic_config", factories: Factories{}, - wantErr: &errUnknownType{}, + wantErr: "unknown config_sources type \"tstcfgsrc\"", }, { name: "build_error", @@ -57,7 +60,7 @@ func TestConfigSourceManager_NewManager(t *testing.T) { ErrOnCreateConfigSource: errors.New("forced test error"), }, }, - wantErr: &errConfigSourceCreation{}, + wantErr: "failed to create config source tstcfgsrc", }, } @@ -67,27 +70,24 @@ func TestConfigSourceManager_NewManager(t *testing.T) { parser, err := confmaptest.LoadConf(filename) require.NoError(t, err) - manager, err := NewManager(parser, zap.NewNop(), component.NewDefaultBuildInfo(), tt.factories) - require.IsType(t, tt.wantErr, err) - if tt.wantErr != nil { - require.Nil(t, manager) - return + _, _, err = Resolve(context.Background(), parser, zap.NewNop(), component.NewDefaultBuildInfo(), tt.factories, nil) + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + } else { + require.NoError(t, err) } - - require.NotNil(t, manager) }) } } -func TestConfigSourceManager_Simple(t *testing.T) { - ctx := context.Background() - manager := newManager(map[string]ConfigSource{ +func TestConfigSourceManagerSimple(t *testing.T) { + cfgSources := map[string]ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "test_selector": {Value: "test_value"}, }, }, - }) + } originalCfg := map[string]any{ "top0": map[string]any{ @@ -104,23 +104,15 @@ func TestConfigSourceManager_Simple(t *testing.T) { cp := confmap.NewFromStringMap(originalCfg) - res, err := manager.Resolve(ctx, cp) + res, closeFunc, err := resolve(context.Background(), cfgSources, cp, func(event *confmap.ChangeEvent) { + panic("must not be called") + }) require.NoError(t, err) - assert.Equal(t, expectedCfg, res.ToStringMap()) - - doneCh := make(chan struct{}) - var errWatcher error - go func() { - defer close(doneCh) - errWatcher = manager.WatchForUpdate() - }() - - assert.NoError(t, manager.Close(ctx)) - <-doneCh - assert.ErrorIs(t, errWatcher, ErrSessionClosed) + assert.Equal(t, expectedCfg, maps.Unflatten(res, confmap.KeyDelimiter)) + assert.NoError(t, closeFunc(context.Background())) } -func TestConfigSourceManager_ResolveRemoveConfigSourceSection(t *testing.T) { +func TestConfigSourceManagerResolveRemoveConfigSourceSection(t *testing.T) { cfg := map[string]any{ "config_sources": map[string]any{ "testcfgsrc": nil, @@ -130,20 +122,22 @@ func TestConfigSourceManager_ResolveRemoveConfigSourceSection(t *testing.T) { }, } - manager := newManager(map[string]ConfigSource{ + cfgSources := map[string]ConfigSource{ "tstcfgsrc": &testConfigSource{}, - }) + } - res, err := manager.Resolve(context.Background(), confmap.NewFromStringMap(cfg)) + res, closeFunc, err := resolve(context.Background(), cfgSources, confmap.NewFromStringMap(cfg), func(event *confmap.ChangeEvent) { + panic("must not be called") + }) require.NoError(t, err) require.NotNil(t, res) delete(cfg, "config_sources") - assert.Equal(t, cfg, res.ToStringMap()) + assert.Equal(t, cfg, maps.Unflatten(res, confmap.KeyDelimiter)) + assert.NoError(t, callClose(closeFunc)) } -func TestConfigSourceManager_ResolveErrors(t *testing.T) { - ctx := context.Background() +func TestConfigSourceManagerResolveErrors(t *testing.T) { testErr := errors.New("test error") tests := []struct { @@ -172,19 +166,18 @@ func TestConfigSourceManager_ResolveErrors(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - manager := newManager(tt.configSourceMap) - - res, err := manager.Resolve(ctx, confmap.NewFromStringMap(tt.config)) + res, closeFunc, err := resolve(context.Background(), tt.configSourceMap, confmap.NewFromStringMap(tt.config), func(event *confmap.ChangeEvent) { + panic("must not be called") + }) require.Error(t, err) require.Nil(t, res) - require.NoError(t, manager.Close(ctx)) + assert.NoError(t, callClose(closeFunc)) }) } } -func TestConfigSourceManager_YAMLInjection(t *testing.T) { - ctx := context.Background() - manager := newManager(map[string]ConfigSource{ +func TestConfigSourceManagerYAMLInjection(t *testing.T) { + cfgSources := map[string]ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "valid_yaml_str": {Value: ` @@ -198,7 +191,7 @@ map: "invalid_yaml_str": {Value: ":"}, }, }, - }) + } file := path.Join("testdata", "yaml_injection.yaml") cp, err := confmaptest.LoadConf(file) @@ -209,16 +202,16 @@ map: require.NoError(t, err) expectedCfg := expectedParser.ToStringMap() - res, err := manager.Resolve(ctx, cp) + res, closeFunc, err := resolve(context.Background(), cfgSources, cp, func(event *confmap.ChangeEvent) { + panic("must not be called") + }) require.NoError(t, err) - actualCfg := res.ToStringMap() - assert.Equal(t, expectedCfg, actualCfg) - assert.NoError(t, manager.Close(ctx)) + assert.Equal(t, expectedCfg, maps.Unflatten(res, confmap.KeyDelimiter)) + assert.NoError(t, callClose(closeFunc)) } -func TestConfigSourceManager_ArraysAndMaps(t *testing.T) { - ctx := context.Background() - manager := newManager(map[string]ConfigSource{ +func TestConfigSourceManagerArraysAndMaps(t *testing.T) { + cfgSources := map[string]ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "elem0": {Value: "elem0_value"}, @@ -227,7 +220,7 @@ func TestConfigSourceManager_ArraysAndMaps(t *testing.T) { "k1": {Value: "k1_value"}, }, }, - }) + } file := path.Join("testdata", "arrays_and_maps.yaml") cp, err := confmaptest.LoadConf(file) @@ -237,14 +230,15 @@ func TestConfigSourceManager_ArraysAndMaps(t *testing.T) { expectedParser, err := confmaptest.LoadConf(expectedFile) require.NoError(t, err) - res, err := manager.Resolve(ctx, cp) + res, closeFunc, err := resolve(context.Background(), cfgSources, cp, func(event *confmap.ChangeEvent) { + panic("must not be called") + }) require.NoError(t, err) - assert.Equal(t, expectedParser.ToStringMap(), res.ToStringMap()) - assert.NoError(t, manager.Close(ctx)) + assert.Equal(t, expectedParser.ToStringMap(), maps.Unflatten(res, confmap.KeyDelimiter)) + assert.NoError(t, callClose(closeFunc)) } -func TestConfigSourceManager_ParamsHandling(t *testing.T) { - ctx := context.Background() +func TestConfigSourceManagerParamsHandling(t *testing.T) { tstCfgSrc := testConfigSource{ ValueMap: map[string]valueEntry{ "elem0": {Value: nil}, @@ -279,10 +273,6 @@ func TestConfigSourceManager_ParamsHandling(t *testing.T) { return nil } - manager := newManager(map[string]ConfigSource{ - "tstcfgsrc": &tstCfgSrc, - }) - file := path.Join("testdata", "params_handling.yaml") cp, err := confmaptest.LoadConf(file) require.NoError(t, err) @@ -291,17 +281,18 @@ func TestConfigSourceManager_ParamsHandling(t *testing.T) { expectedParser, err := confmaptest.LoadConf(expectedFile) require.NoError(t, err) - res, err := manager.Resolve(ctx, cp) + res, closeFunc, err := resolve(context.Background(), map[string]ConfigSource{"tstcfgsrc": &tstCfgSrc}, cp, func(event *confmap.ChangeEvent) { + panic("must not be called") + }) require.NoError(t, err) - assert.Equal(t, expectedParser.ToStringMap(), res.ToStringMap()) - assert.NoError(t, manager.Close(ctx)) + assert.Equal(t, expectedParser.ToStringMap(), maps.Unflatten(res, confmap.KeyDelimiter)) + assert.NoError(t, callClose(closeFunc)) } -func TestConfigSourceManager_WatchForUpdate(t *testing.T) { - ctx := context.Background() +func TestConfigSourceManagerWatchForUpdate(t *testing.T) { watchForUpdateCh := make(chan error, 1) - manager := newManager(map[string]ConfigSource{ + cfgSources := map[string]ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "test_selector": { @@ -310,7 +301,7 @@ func TestConfigSourceManager_WatchForUpdate(t *testing.T) { }, }, }, - }) + } originalCfg := map[string]any{ "top0": map[string]any{ @@ -319,28 +310,22 @@ func TestConfigSourceManager_WatchForUpdate(t *testing.T) { } cp := confmap.NewFromStringMap(originalCfg) - _, err := manager.Resolve(ctx, cp) + watchCh := make(chan *confmap.ChangeEvent) + _, closeFunc, err := resolve(context.Background(), cfgSources, cp, func(event *confmap.ChangeEvent) { + watchCh <- event + }) require.NoError(t, err) - doneCh := make(chan struct{}) - var errWatcher error - go func() { - defer close(doneCh) - errWatcher = manager.WatchForUpdate() - }() - - watchForUpdateCh <- ErrValueUpdated + watchForUpdateCh <- nil - <-doneCh - assert.ErrorIs(t, errWatcher, ErrValueUpdated) - assert.NoError(t, manager.Close(ctx)) + ce := <-watchCh + assert.NoError(t, ce.Error) + assert.NoError(t, callClose(closeFunc)) } -func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) { - ctx := context.Background() - +func TestConfigSourceManagerMultipleWatchForUpdate(t *testing.T) { watchForUpdateCh := make(chan error, 2) - manager := newManager(map[string]ConfigSource{ + cfgSources := map[string]ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "test_selector": { @@ -349,7 +334,7 @@ func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) { }, }, }, - }) + } originalCfg := map[string]any{ "top0": map[string]any{ @@ -361,32 +346,27 @@ func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) { } cp := confmap.NewFromStringMap(originalCfg) - _, err := manager.Resolve(ctx, cp) + watchCh := make(chan *confmap.ChangeEvent) + _, closeFunc, err := resolve(context.Background(), cfgSources, cp, func(event *confmap.ChangeEvent) { + watchCh <- event + }) require.NoError(t, err) - doneCh := make(chan struct{}) - var errWatcher error - go func() { - defer close(doneCh) - errWatcher = manager.WatchForUpdate() - }() - - watchForUpdateCh <- ErrValueUpdated - watchForUpdateCh <- ErrValueUpdated + watchForUpdateCh <- errValueUpdated + watchForUpdateCh <- errValueUpdated - <-doneCh - assert.ErrorIs(t, errWatcher, ErrValueUpdated) + ce := <-watchCh + assert.ErrorIs(t, ce.Error, errValueUpdated) close(watchForUpdateCh) - assert.NoError(t, manager.Close(ctx)) + assert.NoError(t, callClose(closeFunc)) } -func TestConfigSourceManager_EnvVarHandling(t *testing.T) { +func TestConfigSourceManagerEnvVarHandling(t *testing.T) { require.NoError(t, os.Setenv("envvar", "envvar_value")) defer func() { assert.NoError(t, os.Unsetenv("envvar")) }() - ctx := context.Background() tstCfgSrc := testConfigSource{ ValueMap: map[string]valueEntry{ "int_key": {Value: 42}, @@ -404,9 +384,6 @@ func TestConfigSourceManager_EnvVarHandling(t *testing.T) { } return nil } - manager := newManager(map[string]ConfigSource{ - "tstcfgsrc": &tstCfgSrc, - }) file := path.Join("testdata", "envvar_cfgsrc_mix.yaml") cp, err := confmaptest.LoadConf(file) @@ -416,15 +393,17 @@ func TestConfigSourceManager_EnvVarHandling(t *testing.T) { expectedParser, err := confmaptest.LoadConf(expectedFile) require.NoError(t, err) - res, err := manager.Resolve(ctx, cp) + res, closeFunc, err := resolve(context.Background(), map[string]ConfigSource{"tstcfgsrc": &tstCfgSrc}, cp, func(event *confmap.ChangeEvent) { + panic("must not be called") + }) require.NoError(t, err) - assert.Equal(t, expectedParser.ToStringMap(), res.ToStringMap()) - assert.NoError(t, manager.Close(ctx)) + assert.Equal(t, expectedParser.ToStringMap(), res) + assert.NoError(t, callClose(closeFunc)) } -func TestManager_expandString(t *testing.T) { +func TestManagerExpandString(t *testing.T) { ctx := context.Background() - manager := newManager(map[string]ConfigSource{ + cfgSources := map[string]ConfigSource{ "tstcfgsrc": &testConfigSource{ ValueMap: map[string]valueEntry{ "str_key": {Value: "test_value"}, @@ -437,7 +416,7 @@ func TestManager_expandString(t *testing.T) { "int_key": {Value: 42}, }, }, - }) + } require.NoError(t, os.Setenv("envvar", "envvar_value")) defer func() { @@ -556,13 +535,16 @@ func TestManager_expandString(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := manager.parseStringValue(ctx, tt.input) + got, closeFunc, err := parseStringValue(ctx, cfgSources, tt.input, func(event *confmap.ChangeEvent) { + panic("must not be called") + }) if tt.wantErr != nil { require.Error(t, err) require.IsType(t, tt.wantErr, err) } else { require.NoError(t, err) } + require.NoError(t, callClose(closeFunc)) require.Equal(t, tt.want, got) }) } @@ -658,3 +640,10 @@ func Test_parseCfgSrc(t *testing.T) { }) } } + +func callClose(closeFunc confmap.CloseFunc) error { + if closeFunc == nil { + return nil + } + return closeFunc(context.Background()) +} diff --git a/internal/configprovider/parser.go b/internal/configprovider/parser.go index 558a819835e..9883d760a14 100644 --- a/internal/configprovider/parser.go +++ b/internal/configprovider/parser.go @@ -29,14 +29,6 @@ const ( configSourcesKey = "config_sources" ) -// Private error types to help with testability. -type ( - errInvalidTypeAndNameKey struct{ error } - errUnknownType struct{ error } - errUnmarshalError struct{ error } - errDuplicateName struct{ error } -) - // Load reads the configuration for ConfigSource objects from the given parser and returns a map // from the full name of config sources to the respective ConfigSettings. func Load(ctx context.Context, v *confmap.Conf, factories Factories) (map[string]Source, error) { @@ -55,13 +47,6 @@ func Load(ctx context.Context, v *confmap.Conf, factories Factories) (map[string // processParser prepares a confmap.Conf to be used to load config source settings. func processParser(ctx context.Context, v *confmap.Conf) (*confmap.Conf, error) { - // Use a manager to resolve environment variables with a syntax consistent with - // the config source usage. - manager := newManager(make(map[string]ConfigSource)) - defer func() { - _ = manager.Close(ctx) - }() - processedParser := map[string]any{} for _, key := range v.AllKeys() { if !strings.HasPrefix(key, configSourcesKey) { @@ -69,7 +54,7 @@ func processParser(ctx context.Context, v *confmap.Conf) (*confmap.Conf, error) continue } - value, err := manager.parseConfigValue(ctx, v.Get(key)) + value, _, err := parseConfigValue(ctx, make(map[string]ConfigSource), v.Get(key), nil) if err != nil { return nil, err } @@ -90,13 +75,13 @@ func loadSettings(css map[string]any, factories Factories) (map[string]Source, e // Decode the key into type and fullName components. componentID := component.ID{} if err := componentID.UnmarshalText([]byte(key)); err != nil { - return nil, &errInvalidTypeAndNameKey{fmt.Errorf("invalid %s type and name key %q: %w", configSourcesKey, key, err)} + return nil, fmt.Errorf("invalid %s type and name key %q: %w", configSourcesKey, key, err) } // Find the factory based on "type" that we read from config source. factory := factories[componentID.Type()] if factory == nil { - return nil, &errUnknownType{fmt.Errorf("unknown %s type %q for %q", configSourcesKey, componentID.Type(), componentID)} + return nil, fmt.Errorf("unknown %s type %q for %q", configSourcesKey, componentID.Type(), componentID) } // Create the default config. @@ -106,12 +91,12 @@ func loadSettings(css map[string]any, factories Factories) (map[string]Source, e // Now that the default settings struct is created we can Unmarshal into it // and it will apply user-defined config on top of the default. if err := settingsParser.Unmarshal(&cfgSrcSettings, confmap.WithErrorUnused()); err != nil { - return nil, &errUnmarshalError{fmt.Errorf("error reading %s configuration for %q: %w", configSourcesKey, componentID, err)} + return nil, fmt.Errorf("error reading %s configuration for %q: %w", configSourcesKey, componentID, err) } fullName := componentID.String() if cfgSrcToSettings[fullName] != nil { - return nil, &errDuplicateName{fmt.Errorf("duplicate %s name %s", configSourcesKey, fullName)} + return nil, fmt.Errorf("duplicate %s name %s", configSourcesKey, fullName) } cfgSrcToSettings[fullName] = cfgSrcSettings diff --git a/internal/configprovider/parser_test.go b/internal/configprovider/parser_test.go index ae7ed2f0b8a..968ea71f41f 100644 --- a/internal/configprovider/parser_test.go +++ b/internal/configprovider/parser_test.go @@ -37,7 +37,7 @@ func TestConfigSourceParser(t *testing.T) { factories Factories expected map[string]Source envvars map[string]string - wantErr error + wantErr string name string file string }{ @@ -77,13 +77,13 @@ func TestConfigSourceParser(t *testing.T) { name: "cfgsrc_load_cannot_use_cfgsrc", file: "cfgsrc_load_use_cfgsrc", factories: testFactories, - wantErr: &errUnknownConfigSource{}, + wantErr: "config source \"tstcfgsrc\" not found if this was intended to be an environment variable use \"${tstcfgsrc}\" instead\"", }, { name: "bad_name", file: "bad_name", factories: testFactories, - wantErr: &errInvalidTypeAndNameKey{}, + wantErr: "invalid config_sources type and name key \"tstcfgsrc/\"", }, { name: "missing_factory", @@ -91,19 +91,19 @@ func TestConfigSourceParser(t *testing.T) { factories: Factories{ "not_in_basic_config": &mockCfgSrcFactory{}, }, - wantErr: &errUnknownType{}, + wantErr: "unknown config_sources type \"tstcfgsrc\"", }, { name: "unknown_field", file: "unknown_field", factories: testFactories, - wantErr: &errUnmarshalError{}, + wantErr: "error reading config_sources configuration for \"tstcfgsrc\"", }, { name: "duplicated_name", file: "duplicated_name", factories: testFactories, - wantErr: &errDuplicateName{}, + wantErr: "duplicate config_sources name tstcfgsrc", }, } for _, tt := range tests { @@ -121,7 +121,11 @@ func TestConfigSourceParser(t *testing.T) { } cfgSrcSettings, err := Load(ctx, v, tt.factories) - require.IsType(t, tt.wantErr, err) + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } assert.Equal(t, tt.expected, cfgSrcSettings) }) }