Skip to content

Commit

Permalink
Add ability to disable index creation on startup with an env variable
Browse files Browse the repository at this point in the history
Allow disabling the user events handler
Add error context
Fix imports
  • Loading branch information
toddkazakov authored and lostlevels committed Jun 23, 2024
1 parent 5c4bfd4 commit f577d63
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 32 deletions.
19 changes: 13 additions & 6 deletions auth/service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *Service) Initialize(provider application.Provider) error {
if err := s.initializeDeviceCheck(); err != nil {
return err
}
return s.initializeUserEventsHandler()
return s.initializeUserEventsHandler(provider)
}

func (s *Service) Terminate() {
Expand Down Expand Up @@ -400,13 +400,20 @@ func (s *Service) terminateAuthClient() {
}
}

func (s *Service) initializeUserEventsHandler() error {
func (s *Service) initializeUserEventsHandler(provider application.Provider) error {
s.Logger().Debug("Initializing user events handler")

ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := authEvents.NewUserDataDeletionHandler(ctx, s.authClient)
handlers := []eventsCommon.EventHandler{handler}
runner := events.NewRunner(handlers)
var runner events.Runner

configReporter := provider.ConfigReporter().WithScopes("user", "events", "handler")
if configReporter.GetWithDefault("disable", "") != "true" {
ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := authEvents.NewUserDataDeletionHandler(ctx, s.authClient)
handlers := []eventsCommon.EventHandler{handler}
runner = events.NewRunner(handlers)
} else {
runner = events.NewNoopRunner()
}

if err := runner.Initialize(); err != nil {
return errors.Wrap(err, "unable to initialize events runner")
Expand Down
19 changes: 13 additions & 6 deletions blob/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *Service) Initialize(provider application.Provider) error {
if err := s.initializeBlobClient(); err != nil {
return err
}
if err := s.initializeUserEventsHandler(); err != nil {
if err := s.initializeUserEventsHandler(provider); err != nil {
return err
}
return s.initializeRouter()
Expand Down Expand Up @@ -211,13 +211,20 @@ func (s *Service) terminateDeviceLogsUnstructuredStore() {
}
}

func (s *Service) initializeUserEventsHandler() error {
func (s *Service) initializeUserEventsHandler(provider application.Provider) error {
s.Logger().Debug("Initializing user events handler")

ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := blobEvents.NewUserDataDeletionHandler(ctx, s.blobClient)
handlers := []eventsCommon.EventHandler{handler}
runner := events.NewRunner(handlers)
var runner events.Runner

configReporter := provider.ConfigReporter().WithScopes("user", "events", "handler")
if configReporter.GetWithDefault("disable", "") != "true" {
ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := blobEvents.NewUserDataDeletionHandler(ctx, s.blobClient)
handlers := []eventsCommon.EventHandler{handler}
runner = events.NewRunner(handlers)
} else {
runner = events.NewNoopRunner()
}

if err := runner.Initialize(); err != nil {
return errors.Wrap(err, "unable to initialize events runner")
Expand Down
3 changes: 2 additions & 1 deletion client/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"fmt"
"net/url"

"github.com/kelseyhightower/envconfig"
Expand Down Expand Up @@ -36,7 +37,7 @@ func (c *Config) Validate() error {
if c.Address == "" {
return errors.New("address is missing")
} else if _, err := url.Parse(c.Address); err != nil {
return errors.New("address is invalid")
return fmt.Errorf("address is invalid: %w", err)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ var _ = Describe("Config", func() {

It("returns an error if the address is not a parseable URL", func() {
cfg.Address = "Not%Parseable"
Expect(cfg.Validate()).To(MatchError("address is invalid"))
Expect(cfg.Validate()).To(MatchError("address is invalid: parse \"Not%Parseable\": invalid URL escape \"%Pa\""))
})

It("returns success", func() {
Expand Down
20 changes: 14 additions & 6 deletions data/service/service/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *Standard) Initialize(provider application.Provider) error {
if err := s.initializeDataSourceClient(); err != nil {
return err
}
if err := s.initializeUserEventsHandler(); err != nil {
if err := s.initializeUserEventsHandler(provider); err != nil {
return err
}
if err := s.initializeAPI(); err != nil {
Expand Down Expand Up @@ -404,14 +404,22 @@ func (s *Standard) initializeServer() error {
return nil
}

func (s *Standard) initializeUserEventsHandler() error {
func (s *Standard) initializeUserEventsHandler(provider application.Provider) error {
s.Logger().Debug("Initializing user events handler")
sarama.Logger = log.New(os.Stdout, "SARAMA ", log.LstdFlags|log.Lshortfile)

ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := dataEvents.NewUserDataDeletionHandler(ctx, s.dataStore, s.dataSourceStructuredStore)
handlers := []eventsCommon.EventHandler{handler}
runner := events.NewRunner(handlers)
var runner events.Runner

configReporter := provider.ConfigReporter().WithScopes("user", "events", "handler")
if configReporter.GetWithDefault("disable", "") != "true" {
ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := dataEvents.NewUserDataDeletionHandler(ctx, s.dataStore, s.dataSourceStructuredStore)
handlers := []eventsCommon.EventHandler{handler}
runner = events.NewRunner(handlers)
} else {
runner = events.NewNoopRunner()
}

if err := runner.Initialize(); err != nil {
return errors.Wrap(err, "unable to initialize user events handler runner")
}
Expand Down
25 changes: 25 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,28 @@ func (r *runner) Terminate() error {
}
return nil
}

type noopRunner struct {
terminate chan struct{}
}

func (n *noopRunner) Initialize() error {
n.terminate = make(chan struct{}, 0)
return nil
}

func (n *noopRunner) Run() error {
<-n.terminate
return nil
}

func (n *noopRunner) Terminate() error {
n.terminate <- struct{}{}
return nil
}

var _ Runner = &noopRunner{}

func NewNoopRunner() Runner {
return &noopRunner{}
}
2 changes: 1 addition & 1 deletion platform/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var _ = Describe("Config", func() {

It("returns an error if the address is not a parseable URL", func() {
cfg.Address = "Not%Parseable"
Expect(cfg.Validate()).To(MatchError("address is invalid"))
Expect(cfg.Validate()).To(MatchError("address is invalid: parse \"Not%Parseable\": invalid URL escape \"%Pa\""))
})

It("returns success", func() {
Expand Down
19 changes: 10 additions & 9 deletions store/structured/mongo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ func LoadConfig() (*Config, error) {

// Config describe parameters need to make a connection to a Mongo database
type Config struct {
Scheme string `json:"scheme" envconfig:"TIDEPOOL_STORE_SCHEME"`
Addresses []string `json:"addresses" envconfig:"TIDEPOOL_STORE_ADDRESSES" required:"true"`
TLS bool `json:"tls" envconfig:"TIDEPOOL_STORE_TLS" default:"true"`
Database string `json:"database" envconfig:"TIDEPOOL_STORE_DATABASE" required:"true"`
CollectionPrefix string `json:"collectionPrefix" envconfig:"TIDEPOOL_STORE_COLLECTION_PREFIX"`
Username *string `json:"-" envconfig:"TIDEPOOL_STORE_USERNAME"`
Password *string `json:"-" envconfig:"TIDEPOOL_STORE_PASSWORD"`
Timeout time.Duration `json:"timeout" envconfig:"TIDEPOOL_STORE_TIMEOUT" default:"60s"`
OptParams *string `json:"optParams" envconfig:"TIDEPOOL_STORE_OPT_PARAMS"`
Scheme string `json:"scheme" envconfig:"TIDEPOOL_STORE_SCHEME"`
Addresses []string `json:"addresses" envconfig:"TIDEPOOL_STORE_ADDRESSES" required:"true"`
TLS bool `json:"tls" envconfig:"TIDEPOOL_STORE_TLS" default:"true"`
Database string `json:"database" envconfig:"TIDEPOOL_STORE_DATABASE" required:"true"`
CollectionPrefix string `json:"collectionPrefix" envconfig:"TIDEPOOL_STORE_COLLECTION_PREFIX"`
Username *string `json:"-" envconfig:"TIDEPOOL_STORE_USERNAME"`
Password *string `json:"-" envconfig:"TIDEPOOL_STORE_PASSWORD"`
Timeout time.Duration `json:"timeout" envconfig:"TIDEPOOL_STORE_TIMEOUT" default:"60s"`
OptParams *string `json:"optParams" envconfig:"TIDEPOOL_STORE_OPT_PARAMS"`
DisableIndexCreation bool `json:"disableIndexCreation" envconfig:"TIDEPOOL_DISABLE_INDEX_CREATION"`
}

// AsConnectionString constructs a MongoDB connection string from a Config
Expand Down
12 changes: 11 additions & 1 deletion store/structured/mongo/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,25 @@ import (

type Repository struct {
*mongo.Collection
config RepositoryConfig
}

func NewRepository(collection *mongo.Collection) *Repository {
type RepositoryConfig struct {
DisableIndexCreation bool
}

func NewRepository(collection *mongo.Collection, config RepositoryConfig) *Repository {
return &Repository{
collection,
config,
}
}

func (r *Repository) CreateAllIndexes(ctx context.Context, indexes []mongo.IndexModel) error {
if r.config.DisableIndexCreation {
return nil
}

if ctx == nil {
ctx = context.Background()
}
Expand Down
5 changes: 4 additions & 1 deletion store/structured/mongo/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ func AppendLifecycleHooksToStore(store *Store, lifecycle fx.Lifecycle) {
}

func (o *Store) GetRepository(collection string) *Repository {
return NewRepository(o.GetCollection(collection))
config := RepositoryConfig{
DisableIndexCreation: o.config.DisableIndexCreation,
}
return NewRepository(o.GetCollection(collection), config)
}

func (o *Store) GetCollection(collection string) *mongoDriver.Collection {
Expand Down

0 comments on commit f577d63

Please sign in to comment.