Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
refactor(BUX-357): cronJobs simplification according to suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain committed Dec 12, 2023
1 parent 29ad371 commit 61559f0
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 70 deletions.
2 changes: 1 addition & 1 deletion bux_suite_mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ func (tm *taskManagerMockBase) IsNewRelicEnabled() bool {
return false
}

func (tm *taskManagerMockBase) CronJobsInit(target interface{}, cronJobsMap taskmanager.CronJobs) error {
func (tm *taskManagerMockBase) CronJobsInit(cronJobsMap taskmanager.CronJobs) error {
return nil
}
12 changes: 7 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bux
import (
"context"
"fmt"
"time"

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/cluster"
Expand Down Expand Up @@ -111,9 +112,10 @@ type (

// taskManagerOptions holds the configuration for taskmanager
taskManagerOptions struct {
taskmanager.ClientInterface // Client for TaskManager
cronJobs taskmanager.CronJobs // List of cron jobs
options []taskmanager.ClientOps // List of options
taskmanager.ClientInterface // Client for TaskManager
cronJobs taskmanager.CronJobs // List of cron jobs
options []taskmanager.ClientOps // List of options
cronCustomPeriods map[string]time.Duration // will override the default period of cronJob
}
)

Expand Down Expand Up @@ -181,8 +183,8 @@ func NewClient(ctx context.Context, opts ...ClientOps) (ClientInterface, error)
return nil, err
}

// Register all model tasks & custom tasks
if err = client.Taskmanager().CronJobsInit(client, client.options.taskManager.cronJobs); err != nil {
// Register all cron jobs
if err = client.registerCronJobs(); err != nil {
return nil, err
}

Expand Down
16 changes: 16 additions & 0 deletions client_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,22 @@ func (c *Client) runModelMigrations(models ...interface{}) (err error) {
return
}

func (c *Client) registerCronJobs() error {
cronJobs := c.cronJobs()

if c.options.taskManager.cronCustomPeriods != nil {
// override the default periods
for name, job := range cronJobs {
if custom, ok := c.options.taskManager.cronCustomPeriods[name]; ok {
job.Period = custom
cronJobs[name] = job
}
}
}

return c.Taskmanager().CronJobsInit(cronJobs)
}

// loadDefaultPaymailConfig will load the default paymail server configuration
func (c *Client) loadDefaultPaymailConfig() (err error) {
// Default FROM paymail
Expand Down
10 changes: 6 additions & 4 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/cluster"
Expand Down Expand Up @@ -101,8 +102,8 @@ func defaultClientOptions() *clientOptions {

// Blank TaskManager config
taskManager: &taskManagerOptions{
ClientInterface: nil,
cronJobs: defaultCronJobs,
ClientInterface: nil,
cronCustomPeriods: map[string]time.Duration{},
},

// Default user agent
Expand Down Expand Up @@ -572,10 +573,11 @@ func WithCronService(cronService taskmanager.CronService) ClientOps {
}
}

func WithCustomCronJobs(modifier func(cronJobs taskmanager.CronJobs) taskmanager.CronJobs) ClientOps {
// WithCronJobs will set the custom cron jobs period which will override the default
func WithCronCustmPeriod(cronJobName string, period time.Duration) ClientOps {
return func(c *clientOptions) {
if c.taskManager != nil {
c.taskManager.cronJobs = modifier(c.taskManager.cronJobs)
c.taskManager.cronCustomPeriods[cronJobName] = period
}
}
}
Expand Down
48 changes: 26 additions & 22 deletions cron_job_declarations.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,33 @@ const (
CronJobNameSyncTransactionSync = "sync_transaction_sync"
)

type cronJobHandler func(ctx context.Context, client *Client) error

// here is where we define all the cron jobs for the client
var defaultCronJobs = taskmanager.CronJobs{
CronJobNameDraftTransactionCleanUp: {
Period: defaultMonitorHeartbeat * time.Second,
Handler: BuxClientHandler(taskCleanupDraftTransactions),
},
CronJobNameIncomingTransaction: {
Period: 30 * time.Second,
Handler: BuxClientHandler(taskProcessIncomingTransactions),
},
CronJobNameSyncTransactionBroadcast: {
Period: 30 * time.Second,
Handler: BuxClientHandler(taskBroadcastTransactions),
},
CronJobNameSyncTransactionSync: {
Period: 120 * time.Second,
Handler: BuxClientHandler(taskSyncTransactions),
},
}
func (c *Client) cronJobs() taskmanager.CronJobs {
// handler adds the client pointer to the cron job handler by using a closure
handler := func(cronJobTask cronJobHandler) taskmanager.CronJobHandler {
return func(ctx context.Context) error {
return cronJobTask(ctx, c)
}
}

// utility function - converts a handler with the *Client target to a generic taskmanager.CronJobHandler
func BuxClientHandler(handler func(ctx context.Context, client *Client) error) taskmanager.CronJobHandler {
return func(ctx context.Context, target interface{}) error {
return handler(ctx, target.(*Client))
return taskmanager.CronJobs{
CronJobNameDraftTransactionCleanUp: {
Period: defaultMonitorHeartbeat * time.Second,
Handler: handler(taskCleanupDraftTransactions),
},
CronJobNameIncomingTransaction: {
Period: 30 * time.Second,
Handler: handler(taskProcessIncomingTransactions),
},
CronJobNameSyncTransactionBroadcast: {
Period: 30 * time.Second,
Handler: handler(taskBroadcastTransactions),
},
CronJobNameSyncTransactionSync: {
Period: 120 * time.Second,
Handler: handler(taskSyncTransactions),
},
}
}
27 changes: 4 additions & 23 deletions examples/client/custom_cron/custom_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,8 @@ func main() {
client, err := bux.NewClient(
context.Background(), // Set context
bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks
bux.WithCustomCronJobs(func(jobs taskmanager.CronJobs) taskmanager.CronJobs {
// update the period of the incoming transaction job
if incommingTransactionJob, ok := jobs[bux.CronJobNameIncomingTransaction]; ok {
incommingTransactionJob.Period = 10 * time.Second
jobs[bux.CronJobNameIncomingTransaction] = incommingTransactionJob
}

// remove the sync transaction job
delete(jobs, bux.CronJobNameSyncTransactionSync)

// add custom job
jobs["custom_job"] = taskmanager.CronJob{
Period: 2 * time.Second,
Handler: bux.BuxClientHandler(func(ctx context.Context, client *bux.Client) error {
log.Println("custom job!")
return nil
}),
}

return jobs
}),
bux.WithCronCustmPeriod(bux.CronJobNameDraftTransactionCleanUp, 2*time.Second),
bux.WithCronCustmPeriod(bux.CronJobNameIncomingTransaction, 4*time.Second),
)
if err != nil {
log.Fatalln("error: " + err.Error())
Expand All @@ -43,8 +24,8 @@ func main() {
_ = client.Close(context.Background())
}()

// wait for the custom cron job to run at least once
time.Sleep(4 * time.Second)
// wait for the customized cron jobs to run at least once
time.Sleep(8 * time.Second)

log.Println("client loaded!", client.UserAgent())
}
6 changes: 3 additions & 3 deletions taskmanager/cron_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

// CronJobHandler is the handler for a cron job
type CronJobHandler func(ctx context.Context, target interface{}) error
type CronJobHandler func(ctx context.Context) error

// CronJob definition, params reduced to the minimum, all required
type CronJob struct {
Expand All @@ -18,7 +18,7 @@ type CronJob struct {
type CronJobs map[string]CronJob

// CronJobsInit registers and runs the cron jobs
func (tm *Client) CronJobsInit(target interface{}, cronJobsMap CronJobs) (err error) {
func (tm *Client) CronJobsInit(cronJobsMap CronJobs) (err error) {
tm.ResetCron()
defer func() {
// stop other, already registered tasks if the func fails
Expand All @@ -35,7 +35,7 @@ func (tm *Client) CronJobsInit(target interface{}, cronJobsMap CronJobs) (err er
Name: name,
RetryLimit: 1,
Handler: func() error {
if taskErr := handler(ctx, target); taskErr != nil {
if taskErr := handler(ctx); taskErr != nil {
if tm.options.logger != nil {
tm.options.logger.Error(ctx, "error running %v task: %v", name, taskErr.Error())
}
Expand Down
19 changes: 8 additions & 11 deletions taskmanager/cron_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ func TestCronTasks(t *testing.T) {
}
target := &mockTarget{make(chan bool, desiredExecutions)}

err := client.CronJobsInit(target, CronJobs{
err := client.CronJobsInit(CronJobs{
"test": {
Period: 100 * time.Millisecond,
Handler: func(ctx context.Context, target interface{}) error {
t := target.(*mockTarget)
t.times <- true
Handler: func(ctx context.Context) error {
target.times <- true
return nil
},
},
Expand Down Expand Up @@ -61,20 +60,18 @@ func TestCronTasks(t *testing.T) {
}
target := &mockTarget{make(chan int, desiredExecutions)}

err := client.CronJobsInit(target, CronJobs{
err := client.CronJobsInit(CronJobs{
"test1": {
Period: 100 * time.Millisecond,
Handler: func(ctx context.Context, target interface{}) error {
t := target.(*mockTarget)
t.times <- 1
Handler: func(ctx context.Context) error {
target.times <- 1
return nil
},
},
"test2": {
Period: 100 * time.Millisecond,
Handler: func(ctx context.Context, target interface{}) error {
t := target.(*mockTarget)
t.times <- 2
Handler: func(ctx context.Context) error {
target.times <- 2
return nil
},
},
Expand Down
2 changes: 1 addition & 1 deletion taskmanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type TaskService interface {
ResetCron()
RunTask(ctx context.Context, options *TaskOptions) error
Tasks() map[string]*taskq.Task
CronJobsInit(target interface{}, cronJobsMap CronJobs) error
CronJobsInit(cronJobsMap CronJobs) error
}

// CronService is the cron service provider
Expand Down

0 comments on commit 61559f0

Please sign in to comment.