From 61559f0a1443a61765d8f2241806fb512f4d0033 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Tue, 12 Dec 2023 07:20:40 +0100 Subject: [PATCH] refactor(BUX-357): cronJobs simplification according to suggestions --- bux_suite_mocks_test.go | 2 +- client.go | 12 +++--- client_internal.go | 16 ++++++++ client_options.go | 10 +++-- cron_job_declarations.go | 48 ++++++++++++---------- examples/client/custom_cron/custom_cron.go | 27 ++---------- taskmanager/cron_jobs.go | 6 +-- taskmanager/cron_jobs_test.go | 19 ++++----- taskmanager/interface.go | 2 +- 9 files changed, 72 insertions(+), 70 deletions(-) diff --git a/bux_suite_mocks_test.go b/bux_suite_mocks_test.go index 4a7faf01..ddbfbf56 100644 --- a/bux_suite_mocks_test.go +++ b/bux_suite_mocks_test.go @@ -52,6 +52,6 @@ func (tm *taskManagerMockBase) IsNewRelicEnabled() bool { return false } -func (tm *taskManagerMockBase) CronJobsInit(target interface{}, cronJobsMap taskmanager.CronJobs) error { +func (tm *taskManagerMockBase) CronJobsInit(cronJobsMap taskmanager.CronJobs) error { return nil } diff --git a/client.go b/client.go index bf76a932..b834469b 100644 --- a/client.go +++ b/client.go @@ -3,6 +3,7 @@ package bux import ( "context" "fmt" + "time" "github.com/BuxOrg/bux/chainstate" "github.com/BuxOrg/bux/cluster" @@ -111,9 +112,10 @@ type ( // taskManagerOptions holds the configuration for taskmanager taskManagerOptions struct { - taskmanager.ClientInterface // Client for TaskManager - cronJobs taskmanager.CronJobs // List of cron jobs - options []taskmanager.ClientOps // List of options + taskmanager.ClientInterface // Client for TaskManager + cronJobs taskmanager.CronJobs // List of cron jobs + options []taskmanager.ClientOps // List of options + cronCustomPeriods map[string]time.Duration // will override the default period of cronJob } ) @@ -181,8 +183,8 @@ func NewClient(ctx context.Context, opts ...ClientOps) (ClientInterface, error) return nil, err } - // Register all model tasks & custom tasks - if err = client.Taskmanager().CronJobsInit(client, client.options.taskManager.cronJobs); err != nil { + // Register all cron jobs + if err = client.registerCronJobs(); err != nil { return nil, err } diff --git a/client_internal.go b/client_internal.go index fe893062..399519f7 100644 --- a/client_internal.go +++ b/client_internal.go @@ -196,6 +196,22 @@ func (c *Client) runModelMigrations(models ...interface{}) (err error) { return } +func (c *Client) registerCronJobs() error { + cronJobs := c.cronJobs() + + if c.options.taskManager.cronCustomPeriods != nil { + // override the default periods + for name, job := range cronJobs { + if custom, ok := c.options.taskManager.cronCustomPeriods[name]; ok { + job.Period = custom + cronJobs[name] = job + } + } + } + + return c.Taskmanager().CronJobsInit(cronJobs) +} + // loadDefaultPaymailConfig will load the default paymail server configuration func (c *Client) loadDefaultPaymailConfig() (err error) { // Default FROM paymail diff --git a/client_options.go b/client_options.go index b487f69e..616c4148 100644 --- a/client_options.go +++ b/client_options.go @@ -6,6 +6,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/BuxOrg/bux/chainstate" "github.com/BuxOrg/bux/cluster" @@ -101,8 +102,8 @@ func defaultClientOptions() *clientOptions { // Blank TaskManager config taskManager: &taskManagerOptions{ - ClientInterface: nil, - cronJobs: defaultCronJobs, + ClientInterface: nil, + cronCustomPeriods: map[string]time.Duration{}, }, // Default user agent @@ -572,10 +573,11 @@ func WithCronService(cronService taskmanager.CronService) ClientOps { } } -func WithCustomCronJobs(modifier func(cronJobs taskmanager.CronJobs) taskmanager.CronJobs) ClientOps { +// WithCronJobs will set the custom cron jobs period which will override the default +func WithCronCustmPeriod(cronJobName string, period time.Duration) ClientOps { return func(c *clientOptions) { if c.taskManager != nil { - c.taskManager.cronJobs = modifier(c.taskManager.cronJobs) + c.taskManager.cronCustomPeriods[cronJobName] = period } } } diff --git a/cron_job_declarations.go b/cron_job_declarations.go index f4af2f57..a637e007 100644 --- a/cron_job_declarations.go +++ b/cron_job_declarations.go @@ -14,29 +14,33 @@ const ( CronJobNameSyncTransactionSync = "sync_transaction_sync" ) +type cronJobHandler func(ctx context.Context, client *Client) error + // here is where we define all the cron jobs for the client -var defaultCronJobs = taskmanager.CronJobs{ - CronJobNameDraftTransactionCleanUp: { - Period: defaultMonitorHeartbeat * time.Second, - Handler: BuxClientHandler(taskCleanupDraftTransactions), - }, - CronJobNameIncomingTransaction: { - Period: 30 * time.Second, - Handler: BuxClientHandler(taskProcessIncomingTransactions), - }, - CronJobNameSyncTransactionBroadcast: { - Period: 30 * time.Second, - Handler: BuxClientHandler(taskBroadcastTransactions), - }, - CronJobNameSyncTransactionSync: { - Period: 120 * time.Second, - Handler: BuxClientHandler(taskSyncTransactions), - }, -} +func (c *Client) cronJobs() taskmanager.CronJobs { + // handler adds the client pointer to the cron job handler by using a closure + handler := func(cronJobTask cronJobHandler) taskmanager.CronJobHandler { + return func(ctx context.Context) error { + return cronJobTask(ctx, c) + } + } -// utility function - converts a handler with the *Client target to a generic taskmanager.CronJobHandler -func BuxClientHandler(handler func(ctx context.Context, client *Client) error) taskmanager.CronJobHandler { - return func(ctx context.Context, target interface{}) error { - return handler(ctx, target.(*Client)) + return taskmanager.CronJobs{ + CronJobNameDraftTransactionCleanUp: { + Period: defaultMonitorHeartbeat * time.Second, + Handler: handler(taskCleanupDraftTransactions), + }, + CronJobNameIncomingTransaction: { + Period: 30 * time.Second, + Handler: handler(taskProcessIncomingTransactions), + }, + CronJobNameSyncTransactionBroadcast: { + Period: 30 * time.Second, + Handler: handler(taskBroadcastTransactions), + }, + CronJobNameSyncTransactionSync: { + Period: 120 * time.Second, + Handler: handler(taskSyncTransactions), + }, } } diff --git a/examples/client/custom_cron/custom_cron.go b/examples/client/custom_cron/custom_cron.go index 08ada653..daadd693 100644 --- a/examples/client/custom_cron/custom_cron.go +++ b/examples/client/custom_cron/custom_cron.go @@ -13,27 +13,8 @@ func main() { client, err := bux.NewClient( context.Background(), // Set context bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks - bux.WithCustomCronJobs(func(jobs taskmanager.CronJobs) taskmanager.CronJobs { - // update the period of the incoming transaction job - if incommingTransactionJob, ok := jobs[bux.CronJobNameIncomingTransaction]; ok { - incommingTransactionJob.Period = 10 * time.Second - jobs[bux.CronJobNameIncomingTransaction] = incommingTransactionJob - } - - // remove the sync transaction job - delete(jobs, bux.CronJobNameSyncTransactionSync) - - // add custom job - jobs["custom_job"] = taskmanager.CronJob{ - Period: 2 * time.Second, - Handler: bux.BuxClientHandler(func(ctx context.Context, client *bux.Client) error { - log.Println("custom job!") - return nil - }), - } - - return jobs - }), + bux.WithCronCustmPeriod(bux.CronJobNameDraftTransactionCleanUp, 2*time.Second), + bux.WithCronCustmPeriod(bux.CronJobNameIncomingTransaction, 4*time.Second), ) if err != nil { log.Fatalln("error: " + err.Error()) @@ -43,8 +24,8 @@ func main() { _ = client.Close(context.Background()) }() - // wait for the custom cron job to run at least once - time.Sleep(4 * time.Second) + // wait for the customized cron jobs to run at least once + time.Sleep(8 * time.Second) log.Println("client loaded!", client.UserAgent()) } diff --git a/taskmanager/cron_jobs.go b/taskmanager/cron_jobs.go index f3396b64..32a169e5 100644 --- a/taskmanager/cron_jobs.go +++ b/taskmanager/cron_jobs.go @@ -6,7 +6,7 @@ import ( ) // CronJobHandler is the handler for a cron job -type CronJobHandler func(ctx context.Context, target interface{}) error +type CronJobHandler func(ctx context.Context) error // CronJob definition, params reduced to the minimum, all required type CronJob struct { @@ -18,7 +18,7 @@ type CronJob struct { type CronJobs map[string]CronJob // CronJobsInit registers and runs the cron jobs -func (tm *Client) CronJobsInit(target interface{}, cronJobsMap CronJobs) (err error) { +func (tm *Client) CronJobsInit(cronJobsMap CronJobs) (err error) { tm.ResetCron() defer func() { // stop other, already registered tasks if the func fails @@ -35,7 +35,7 @@ func (tm *Client) CronJobsInit(target interface{}, cronJobsMap CronJobs) (err er Name: name, RetryLimit: 1, Handler: func() error { - if taskErr := handler(ctx, target); taskErr != nil { + if taskErr := handler(ctx); taskErr != nil { if tm.options.logger != nil { tm.options.logger.Error(ctx, "error running %v task: %v", name, taskErr.Error()) } diff --git a/taskmanager/cron_jobs_test.go b/taskmanager/cron_jobs_test.go index 5887dc46..7191014d 100644 --- a/taskmanager/cron_jobs_test.go +++ b/taskmanager/cron_jobs_test.go @@ -27,12 +27,11 @@ func TestCronTasks(t *testing.T) { } target := &mockTarget{make(chan bool, desiredExecutions)} - err := client.CronJobsInit(target, CronJobs{ + err := client.CronJobsInit(CronJobs{ "test": { Period: 100 * time.Millisecond, - Handler: func(ctx context.Context, target interface{}) error { - t := target.(*mockTarget) - t.times <- true + Handler: func(ctx context.Context) error { + target.times <- true return nil }, }, @@ -61,20 +60,18 @@ func TestCronTasks(t *testing.T) { } target := &mockTarget{make(chan int, desiredExecutions)} - err := client.CronJobsInit(target, CronJobs{ + err := client.CronJobsInit(CronJobs{ "test1": { Period: 100 * time.Millisecond, - Handler: func(ctx context.Context, target interface{}) error { - t := target.(*mockTarget) - t.times <- 1 + Handler: func(ctx context.Context) error { + target.times <- 1 return nil }, }, "test2": { Period: 100 * time.Millisecond, - Handler: func(ctx context.Context, target interface{}) error { - t := target.(*mockTarget) - t.times <- 2 + Handler: func(ctx context.Context) error { + target.times <- 2 return nil }, }, diff --git a/taskmanager/interface.go b/taskmanager/interface.go index bc567944..6c94a4ee 100644 --- a/taskmanager/interface.go +++ b/taskmanager/interface.go @@ -12,7 +12,7 @@ type TaskService interface { ResetCron() RunTask(ctx context.Context, options *TaskOptions) error Tasks() map[string]*taskq.Task - CronJobsInit(target interface{}, cronJobsMap CronJobs) error + CronJobsInit(cronJobsMap CronJobs) error } // CronService is the cron service provider