Skip to content

Commit

Permalink
[BUG_FIX] Watch for changes in the configsource implementations (open…
Browse files Browse the repository at this point in the history
…-telemetry#2272)

Before, the WatchForChanges was not called, now we pass the same WatcherFunc down to the ConfigSources.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Nov 18, 2022
1 parent ff77e31 commit e0edd6b
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 366 deletions.
18 changes: 3 additions & 15 deletions internal/configprovider/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,24 @@ import (
"go.uber.org/zap"
)

// Private error types to help with testability.
type (
errConfigSourceCreation struct{ error }
errFactoryCreatedNil struct{ error }
)

// Build builds the ConfigSource objects according to the given ConfigSettings.
func Build(ctx context.Context, configSourcesSettings map[string]Source, params CreateParams, factories Factories) (map[string]ConfigSource, error) {
cfgSources := make(map[string]ConfigSource, len(configSourcesSettings))
for fullName, cfgSrcSettings := range configSourcesSettings {
// If we have the setting we also have the factory.
factory, ok := factories[cfgSrcSettings.ID().Type()]
if !ok {
return nil, &errUnknownType{
fmt.Errorf("unknown %s config source type for %s", cfgSrcSettings.ID().Type(), fullName),
}
return nil, fmt.Errorf("unknown %s config source type for %s", cfgSrcSettings.ID().Type(), fullName)
}

params.Logger = params.Logger.With(zap.String("config_source", fullName))
cfgSrc, err := factory.CreateConfigSource(ctx, params, cfgSrcSettings)
if err != nil {
return nil, &errConfigSourceCreation{
fmt.Errorf("failed to create config source %s: %w", fullName, err),
}
return nil, fmt.Errorf("failed to create config source %s: %w", fullName, err)
}

if cfgSrc == nil {
return nil, &errFactoryCreatedNil{
fmt.Errorf("factory for %q produced a nil extension", fullName),
}
return nil, fmt.Errorf("factory for %q produced a nil extension", fullName)
}

cfgSources[fullName] = cfgSrc
Expand Down
14 changes: 9 additions & 5 deletions internal/configprovider/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestConfigSourceBuild(t *testing.T) {
configSettings map[string]Source
factories Factories
expectedCfgSources map[string]ConfigSource
wantErr error
wantErr string
name string
}{
{
Expand All @@ -51,7 +51,7 @@ func TestConfigSourceBuild(t *testing.T) {
},
},
factories: testFactories,
wantErr: &errUnknownType{},
wantErr: "unknown unknown_config_source config source type for tstcfgsrc",
},
{
name: "creation_error",
Expand All @@ -65,7 +65,7 @@ func TestConfigSourceBuild(t *testing.T) {
ErrOnCreateConfigSource: errors.New("forced test error"),
},
},
wantErr: &errConfigSourceCreation{},
wantErr: "failed to create config source tstcfgsrc: forced test error",
},
{
name: "factory_return_nil",
Expand All @@ -77,7 +77,7 @@ func TestConfigSourceBuild(t *testing.T) {
factories: Factories{
"tstcfgsrc": &mockNilCfgSrcFactory{},
},
wantErr: &errFactoryCreatedNil{},
wantErr: "factory for \"tstcfgsrc\" produced a nil extension",
},
{
name: "base_case",
Expand Down Expand Up @@ -108,7 +108,11 @@ func TestConfigSourceBuild(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
builtCfgSources, err := Build(ctx, tt.configSettings, params, tt.factories)
require.IsType(t, tt.wantErr, err)
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.expectedCfgSources, builtCfgSources)
})
}
Expand Down
13 changes: 0 additions & 13 deletions internal/configprovider/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,10 @@ package configprovider

import (
"context"
"errors"

"go.opentelemetry.io/collector/confmap"
)

// ErrSessionClosed is returned by WatchForUpdate functions when its parent Session
// object is closed.
// This error can be wrapped with additional information. Callers trying to identify this
// specific error must use errors.Is.
var ErrSessionClosed = errors.New("parent session was closed")

// ErrValueUpdated is returned by WatchForUpdate functions when the value being watched
// was changed or expired and needs to be retrieved again.
// This error can be wrapped with additional information. Callers trying to identify this
// specific error must use errors.Is.
var ErrValueUpdated = errors.New("configuration must retrieve the updated value")

// ConfigSource is the interface to be implemented by objects used by the collector
// to retrieve external configuration information.
//
Expand Down
57 changes: 7 additions & 50 deletions internal/configprovider/config_source_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,20 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/signalfx/splunk-otel-collector/internal/configconverter"
)

type errDuplicatedConfigSourceFactory struct{ error }

var (
_ confmap.Provider = (*configSourceConfigMapProvider)(nil)
)

type configSourceConfigMapProvider struct {
logger *zap.Logger
csm *Manager
configServer *configconverter.ConfigServer
wrappedProvider confmap.Provider
wrappedRetrieved *confmap.Retrieved
retrieved *confmap.Retrieved
buildInfo component.BuildInfo
factories []Factory
}
Expand All @@ -55,7 +50,6 @@ func NewConfigSourceConfigMapProvider(wrappedProvider confmap.Provider, logger *
factories: factories,
buildInfo: buildInfo,
wrappedRetrieved: &confmap.Retrieved{},
retrieved: &confmap.Retrieved{},
}
}

Expand Down Expand Up @@ -86,33 +80,6 @@ func (c *configSourceConfigMapProvider) Retrieve(ctx context.Context, uri string
}
c.wrappedRetrieved = newWrappedRetrieved

var cfg *confmap.Conf
if cfg, err = c.Get(ctx, uri); err != nil {
return nil, err
} else if cfg == nil {
cfg = &confmap.Conf{}
}

c.retrieved, err = confmap.NewRetrieved(
cfg.ToStringMap(),
confmap.WithRetrievedClose(c.wrappedRetrieved.Close),
)
return c.retrieved, err
}

func (c *configSourceConfigMapProvider) Scheme() string {
return c.wrappedProvider.Scheme()
}

func (c *configSourceConfigMapProvider) Shutdown(ctx context.Context) error {
c.configServer.Unregister()
return c.wrappedProvider.Shutdown(ctx)
}

// Get returns a config.Parser that wraps the config.Default() with a parser
// that can load and inject data from config sources. If there are no config sources
// in the configuration the returned parser behaves like the config.Default().
func (c *configSourceConfigMapProvider) Get(_ context.Context, uri string) (*confmap.Conf, error) {
factories, err := makeFactoryMap(c.factories)
if err != nil {
return nil, err
Expand All @@ -123,38 +90,28 @@ func (c *configSourceConfigMapProvider) Get(_ context.Context, uri string) (*con
return nil, err
}
c.configServer.SetForScheme(c.Scheme(), wrappedMap.ToStringMap())
csm, err := NewManager(wrappedMap, c.logger, c.buildInfo, factories)
retrieved, closeFunc, err := Resolve(ctx, wrappedMap, c.logger, c.buildInfo, factories, onChange)
if err != nil {
return nil, err
}

effectiveMap, err := csm.Resolve(context.Background(), wrappedMap)
if err != nil {
return nil, err
}

c.csm = csm
return effectiveMap, nil
return confmap.NewRetrieved(retrieved, confmap.WithRetrievedClose(mergeCloseFuncs([]confmap.CloseFunc{closeFunc, c.wrappedRetrieved.Close})))
}

// WatchForUpdate is used to monitor for updates on configuration values that
// were retrieved from config sources.
func (c *configSourceConfigMapProvider) WatchForUpdate() error {
return c.csm.WatchForUpdate()
func (c *configSourceConfigMapProvider) Scheme() string {
return c.wrappedProvider.Scheme()
}

// Close ends the watch for updates and closes the parser provider and respective
// config sources.
func (c *configSourceConfigMapProvider) Close(ctx context.Context) error {
func (c *configSourceConfigMapProvider) Shutdown(ctx context.Context) error {
c.configServer.Unregister()
return multierr.Combine(c.csm.Close(ctx), c.retrieved.Close(ctx))
return c.wrappedProvider.Shutdown(ctx)
}

func makeFactoryMap(factories []Factory) (Factories, error) {
fMap := make(Factories)
for _, f := range factories {
if _, ok := fMap[f.Type()]; ok {
return nil, &errDuplicatedConfigSourceFactory{fmt.Errorf("duplicate config source factory %q", f.Type())}
return nil, fmt.Errorf("duplicate config source factory %q", f.Type())
}
fMap[f.Type()] = f
}
Expand Down
61 changes: 17 additions & 44 deletions internal/configprovider/config_source_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ package configprovider
import (
"context"
"errors"
"fmt"
"path"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -37,7 +35,7 @@ func TestConfigSourceConfigMapProvider(t *testing.T) {
tests := []struct {
parserProvider confmap.Provider
configLocation []string
wantErr error
wantErr string
name string
factories []Factory
}{
Expand All @@ -49,15 +47,15 @@ func TestConfigSourceConfigMapProvider(t *testing.T) {
parserProvider: &mockParserProvider{
ErrOnGet: true,
},
wantErr: &errOnParserProviderGet{},
wantErr: "mockParserProvider.Get() forced test error",
},
{
name: "duplicated_factory_type",
factories: []Factory{
&mockCfgSrcFactory{},
&mockCfgSrcFactory{},
},
wantErr: &errDuplicatedConfigSourceFactory{},
wantErr: "duplicate config source factory \"tstcfgsrc\"",
},
{
name: "new_manager_builder_error",
Expand All @@ -68,13 +66,13 @@ func TestConfigSourceConfigMapProvider(t *testing.T) {
},
parserProvider: fileprovider.New(),
configLocation: []string{"file:" + path.Join("testdata", "basic_config.yaml")},
wantErr: &errConfigSourceCreation{},
wantErr: "failed to create config source tstcfgsrc",
},
{
name: "manager_resolve_error",
parserProvider: fileprovider.New(),
configLocation: []string{"file:" + path.Join("testdata", "manager_resolve_error.yaml")},
wantErr: fmt.Errorf("error not wrappedProviders by specific error type: %w", ErrSessionClosed),
wantErr: "config source \"tstcfgsrc\" failed to retrieve value: no value for selector \"selector\"",
},
{
name: "multiple_config_success",
Expand Down Expand Up @@ -118,34 +116,23 @@ func TestConfigSourceConfigMapProvider(t *testing.T) {
configLocation = ""
}
r, err := pp.Retrieve(context.Background(), configLocation, nil)
if tt.wantErr == nil {
if tt.wantErr == "" {
require.NoError(t, err)
require.NotNil(t, r)
rMap, _ := r.AsConf()
require.NotNil(t, rMap)
rMap, errAsConf := r.AsConf()
require.NoError(t, errAsConf)
assert.NotNil(t, rMap)
assert.NoError(t, r.Close(context.Background()))
} else {
assert.IsType(t, tt.wantErr, err)
assert.ErrorContains(t, err, tt.wantErr)
assert.Nil(t, r)
return
}
i++
ok = i < len(tt.configLocation)
}

var watchForUpdatedError error
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
watchForUpdatedError = cspp.WatchForUpdate()
}()
require.NotNil(t, cspp.csm)

closeErr := cspp.Close(context.Background())
assert.NoError(t, closeErr)

wg.Wait()
assert.Equal(t, ErrSessionClosed, watchForUpdatedError)
assert.NoError(t, cspp.Shutdown(context.Background()))
})
}
}
Expand All @@ -156,31 +143,17 @@ type mockParserProvider struct {

var _ confmap.Provider = (*mockParserProvider)(nil)

func (mpp *mockParserProvider) Retrieve(ctx context.Context, _ string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
m, err := mpp.Get(ctx)
if err != nil {
return nil, err
func (mpp *mockParserProvider) Retrieve(context.Context, string, confmap.WatcherFunc) (*confmap.Retrieved, error) {
if mpp.ErrOnGet {
return nil, errors.New("mockParserProvider.Get() forced test error")
}
return confmap.NewRetrieved(m.ToStringMap())
return confmap.NewRetrieved(confmap.New().ToStringMap())
}

func (mpp *mockParserProvider) Shutdown(ctx context.Context) error {
func (mpp *mockParserProvider) Shutdown(context.Context) error {
return nil
}

func (mpp *mockParserProvider) Scheme() string {
return ""
}

func (mpp *mockParserProvider) Get(context.Context) (*confmap.Conf, error) {
if mpp.ErrOnGet {
return nil, &errOnParserProviderGet{errors.New("mockParserProvider.Get() forced test error")}
}
return confmap.New(), nil
}

func (mpp *mockParserProvider) Close(context.Context) error {
return nil
}

type errOnParserProviderGet struct{ error }
Loading

0 comments on commit e0edd6b

Please sign in to comment.