diff --git a/auth/service/service/service.go b/auth/service/service/service.go index abc501197..178e035c0 100644 --- a/auth/service/service/service.go +++ b/auth/service/service/service.go @@ -108,7 +108,7 @@ func (s *Service) Initialize(provider application.Provider) error { if err := s.initializeDeviceCheck(); err != nil { return err } - return s.initializeUserEventsHandler() + return s.initializeUserEventsHandler(provider) } func (s *Service) Terminate() { @@ -400,13 +400,20 @@ func (s *Service) terminateAuthClient() { } } -func (s *Service) initializeUserEventsHandler() error { +func (s *Service) initializeUserEventsHandler(provider application.Provider) error { s.Logger().Debug("Initializing user events handler") - ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger()) - handler := authEvents.NewUserDataDeletionHandler(ctx, s.authClient) - handlers := []eventsCommon.EventHandler{handler} - runner := events.NewRunner(handlers) + var runner events.Runner + + configReporter := provider.ConfigReporter().WithScopes("user", "events", "handler") + if configReporter.GetWithDefault("disable", "") != "true" { + ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger()) + handler := authEvents.NewUserDataDeletionHandler(ctx, s.authClient) + handlers := []eventsCommon.EventHandler{handler} + runner = events.NewRunner(handlers) + } else { + runner = events.NewNoopRunner() + } if err := runner.Initialize(); err != nil { return errors.Wrap(err, "unable to initialize events runner") diff --git a/blob/service/service.go b/blob/service/service.go index a8381e2bd..ee99021c6 100644 --- a/blob/service/service.go +++ b/blob/service/service.go @@ -69,7 +69,7 @@ func (s *Service) Initialize(provider application.Provider) error { if err := s.initializeBlobClient(); err != nil { return err } - if err := s.initializeUserEventsHandler(); err != nil { + if err := s.initializeUserEventsHandler(provider); err != nil { return err } return s.initializeRouter() @@ -211,13 +211,20 @@ func (s *Service) terminateDeviceLogsUnstructuredStore() { } } -func (s *Service) initializeUserEventsHandler() error { +func (s *Service) initializeUserEventsHandler(provider application.Provider) error { s.Logger().Debug("Initializing user events handler") - ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger()) - handler := blobEvents.NewUserDataDeletionHandler(ctx, s.blobClient) - handlers := []eventsCommon.EventHandler{handler} - runner := events.NewRunner(handlers) + var runner events.Runner + + configReporter := provider.ConfigReporter().WithScopes("user", "events", "handler") + if configReporter.GetWithDefault("disable", "") != "true" { + ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger()) + handler := blobEvents.NewUserDataDeletionHandler(ctx, s.blobClient) + handlers := []eventsCommon.EventHandler{handler} + runner = events.NewRunner(handlers) + } else { + runner = events.NewNoopRunner() + } if err := runner.Initialize(); err != nil { return errors.Wrap(err, "unable to initialize events runner") diff --git a/client/config.go b/client/config.go index 74bdd29dc..c43574fd8 100644 --- a/client/config.go +++ b/client/config.go @@ -1,6 +1,7 @@ package client import ( + "fmt" "net/url" "github.com/kelseyhightower/envconfig" @@ -36,7 +37,7 @@ func (c *Config) Validate() error { if c.Address == "" { return errors.New("address is missing") } else if _, err := url.Parse(c.Address); err != nil { - return errors.New("address is invalid") + return fmt.Errorf("address is invalid: %w", err) } return nil diff --git a/client/config_test.go b/client/config_test.go index 4c22fbcf2..62090aca7 100644 --- a/client/config_test.go +++ b/client/config_test.go @@ -86,7 +86,7 @@ var _ = Describe("Config", func() { It("returns an error if the address is not a parseable URL", func() { cfg.Address = "Not%Parseable" - Expect(cfg.Validate()).To(MatchError("address is invalid")) + Expect(cfg.Validate()).To(MatchError("address is invalid: parse \"Not%Parseable\": invalid URL escape \"%Pa\"")) }) It("returns success", func() { diff --git a/data/service/service/standard.go b/data/service/service/standard.go index 874f25b2e..0a241a9ef 100644 --- a/data/service/service/standard.go +++ b/data/service/service/standard.go @@ -87,7 +87,7 @@ func (s *Standard) Initialize(provider application.Provider) error { if err := s.initializeDataSourceClient(); err != nil { return err } - if err := s.initializeUserEventsHandler(); err != nil { + if err := s.initializeUserEventsHandler(provider); err != nil { return err } if err := s.initializeAPI(); err != nil { @@ -404,14 +404,22 @@ func (s *Standard) initializeServer() error { return nil } -func (s *Standard) initializeUserEventsHandler() error { +func (s *Standard) initializeUserEventsHandler(provider application.Provider) error { s.Logger().Debug("Initializing user events handler") sarama.Logger = log.New(os.Stdout, "SARAMA ", log.LstdFlags|log.Lshortfile) - ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger()) - handler := dataEvents.NewUserDataDeletionHandler(ctx, s.dataStore, s.dataSourceStructuredStore) - handlers := []eventsCommon.EventHandler{handler} - runner := events.NewRunner(handlers) + var runner events.Runner + + configReporter := provider.ConfigReporter().WithScopes("user", "events", "handler") + if configReporter.GetWithDefault("disable", "") != "true" { + ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger()) + handler := dataEvents.NewUserDataDeletionHandler(ctx, s.dataStore, s.dataSourceStructuredStore) + handlers := []eventsCommon.EventHandler{handler} + runner = events.NewRunner(handlers) + } else { + runner = events.NewNoopRunner() + } + if err := runner.Initialize(); err != nil { return errors.Wrap(err, "unable to initialize user events handler runner") } diff --git a/events/events.go b/events/events.go index e1d312930..240850c75 100644 --- a/events/events.go +++ b/events/events.go @@ -47,3 +47,28 @@ func (r *runner) Terminate() error { } return nil } + +type noopRunner struct { + terminate chan struct{} +} + +func (n *noopRunner) Initialize() error { + n.terminate = make(chan struct{}, 0) + return nil +} + +func (n *noopRunner) Run() error { + <-n.terminate + return nil +} + +func (n *noopRunner) Terminate() error { + n.terminate <- struct{}{} + return nil +} + +var _ Runner = &noopRunner{} + +func NewNoopRunner() Runner { + return &noopRunner{} +} diff --git a/platform/config_test.go b/platform/config_test.go index edba20a42..841e15294 100644 --- a/platform/config_test.go +++ b/platform/config_test.go @@ -110,7 +110,7 @@ var _ = Describe("Config", func() { It("returns an error if the address is not a parseable URL", func() { cfg.Address = "Not%Parseable" - Expect(cfg.Validate()).To(MatchError("address is invalid")) + Expect(cfg.Validate()).To(MatchError("address is invalid: parse \"Not%Parseable\": invalid URL escape \"%Pa\"")) }) It("returns success", func() { diff --git a/store/structured/mongo/config.go b/store/structured/mongo/config.go index 72bdf0803..8212f760d 100644 --- a/store/structured/mongo/config.go +++ b/store/structured/mongo/config.go @@ -28,15 +28,16 @@ func LoadConfig() (*Config, error) { // Config describe parameters need to make a connection to a Mongo database type Config struct { - Scheme string `json:"scheme" envconfig:"TIDEPOOL_STORE_SCHEME"` - Addresses []string `json:"addresses" envconfig:"TIDEPOOL_STORE_ADDRESSES" required:"true"` - TLS bool `json:"tls" envconfig:"TIDEPOOL_STORE_TLS" default:"true"` - Database string `json:"database" envconfig:"TIDEPOOL_STORE_DATABASE" required:"true"` - CollectionPrefix string `json:"collectionPrefix" envconfig:"TIDEPOOL_STORE_COLLECTION_PREFIX"` - Username *string `json:"-" envconfig:"TIDEPOOL_STORE_USERNAME"` - Password *string `json:"-" envconfig:"TIDEPOOL_STORE_PASSWORD"` - Timeout time.Duration `json:"timeout" envconfig:"TIDEPOOL_STORE_TIMEOUT" default:"60s"` - OptParams *string `json:"optParams" envconfig:"TIDEPOOL_STORE_OPT_PARAMS"` + Scheme string `json:"scheme" envconfig:"TIDEPOOL_STORE_SCHEME"` + Addresses []string `json:"addresses" envconfig:"TIDEPOOL_STORE_ADDRESSES" required:"true"` + TLS bool `json:"tls" envconfig:"TIDEPOOL_STORE_TLS" default:"true"` + Database string `json:"database" envconfig:"TIDEPOOL_STORE_DATABASE" required:"true"` + CollectionPrefix string `json:"collectionPrefix" envconfig:"TIDEPOOL_STORE_COLLECTION_PREFIX"` + Username *string `json:"-" envconfig:"TIDEPOOL_STORE_USERNAME"` + Password *string `json:"-" envconfig:"TIDEPOOL_STORE_PASSWORD"` + Timeout time.Duration `json:"timeout" envconfig:"TIDEPOOL_STORE_TIMEOUT" default:"60s"` + OptParams *string `json:"optParams" envconfig:"TIDEPOOL_STORE_OPT_PARAMS"` + DisableIndexCreation bool `json:"disableIndexCreation" envconfig:"TIDEPOOL_DISABLE_INDEX_CREATION"` } // AsConnectionString constructs a MongoDB connection string from a Config diff --git a/store/structured/mongo/repository.go b/store/structured/mongo/repository.go index 274d9217d..16f2ba79f 100644 --- a/store/structured/mongo/repository.go +++ b/store/structured/mongo/repository.go @@ -15,15 +15,25 @@ import ( type Repository struct { *mongo.Collection + config RepositoryConfig } -func NewRepository(collection *mongo.Collection) *Repository { +type RepositoryConfig struct { + DisableIndexCreation bool +} + +func NewRepository(collection *mongo.Collection, config RepositoryConfig) *Repository { return &Repository{ collection, + config, } } func (r *Repository) CreateAllIndexes(ctx context.Context, indexes []mongo.IndexModel) error { + if r.config.DisableIndexCreation { + return nil + } + if ctx == nil { ctx = context.Background() } diff --git a/store/structured/mongo/store.go b/store/structured/mongo/store.go index 93ef92d1e..584373bfc 100644 --- a/store/structured/mongo/store.go +++ b/store/structured/mongo/store.go @@ -62,7 +62,10 @@ func AppendLifecycleHooksToStore(store *Store, lifecycle fx.Lifecycle) { } func (o *Store) GetRepository(collection string) *Repository { - return NewRepository(o.GetCollection(collection)) + config := RepositoryConfig{ + DisableIndexCreation: o.config.DisableIndexCreation, + } + return NewRepository(o.GetCollection(collection), config) } func (o *Store) GetCollection(collection string) *mongoDriver.Collection {