Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
refactor(BUX-411): redis options by WithRedis
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain committed Dec 15, 2023
1 parent 7b94981 commit 4d63b33
Show file tree
Hide file tree
Showing 19 changed files with 109 additions and 182 deletions.
6 changes: 3 additions & 3 deletions bux_suite_mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ type taskManagerMockBase struct{}

func (tm *taskManagerMockBase) Info(context.Context, string, ...interface{}) {}

func (tm *taskManagerMockBase) RegisterTask(*taskmanager.Task) error {
func (tm *taskManagerMockBase) RegisterTask(string, interface{}) error {
return nil
}

func (tm *taskManagerMockBase) ResetCron() {}

func (tm *taskManagerMockBase) RunTask(context.Context, *taskmanager.TaskOptions) error {
func (tm *taskManagerMockBase) RunTask(context.Context, *taskmanager.TaskRunOptions) error {
return nil
}

Expand Down Expand Up @@ -55,6 +55,6 @@ func (tm *taskManagerMockBase) CronJobsInit(cronJobsMap taskmanager.CronJobs) er
// Sets custom task manager only for testing
func withTaskManagerMockup() ClientOps {
return func(c *clientOptions) {
c.taskManager.TaskManagerInterface = &taskManagerMockBase{}
c.taskManager.Tasker = &taskManagerMockBase{}
}
}
2 changes: 1 addition & 1 deletion bux_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (ts *EmbeddedDBTestSuite) genericDBClient(t *testing.T, database datastore.
WithAutoMigrate(&PaymailAddress{}),
)
if taskManagerEnabled {
opts = append(opts, WithTaskqConfig(taskmanager.DefaultTaskQConfig(prefix+"_queue", nil)))
opts = append(opts, WithTaskqConfig(taskmanager.DefaultTaskQConfig(prefix+"_queue")))
} else {
opts = append(opts, withTaskManagerMockup())
}
Expand Down
2 changes: 1 addition & 1 deletion bux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (tc *TestingClient) Close(ctx context.Context) {

// DefaultClientOpts will return a default set of client options required to load the new client
func DefaultClientOpts(debug, shared bool) []ClientOps {
tqc := taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), nil)
tqc := taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())
tqc.MaxNumWorker = 2
tqc.MaxNumFetcher = 2

Expand Down
16 changes: 8 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ type (

// taskManagerOptions holds the configuration for taskmanager
taskManagerOptions struct {
taskmanager.TaskManagerInterface // Client for TaskManager
cronJobs taskmanager.CronJobs // List of cron jobs
options []taskmanager.ClientOps // List of options
cronCustomPeriods map[string]time.Duration // will override the default period of cronJob
taskmanager.Tasker // Client for TaskManager
cronJobs taskmanager.CronJobs // List of cron jobs
options []taskmanager.ClientOps // List of options
cronCustomPeriods map[string]time.Duration // will override the default period of cronJob
}
)

Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *Client) Close(ctx context.Context) error {
if err := tm.Close(ctx); err != nil {
return err
}
c.options.taskManager.TaskManagerInterface = nil
c.options.taskManager.Tasker = nil
}
return nil
}
Expand Down Expand Up @@ -445,9 +445,9 @@ func (c *Client) SetNotificationsClient(client notifications.ClientInterface) {
}

// Taskmanager will return the Taskmanager if it exists
func (c *Client) Taskmanager() taskmanager.TaskManagerInterface {
if c.options.taskManager != nil && c.options.taskManager.TaskManagerInterface != nil {
return c.options.taskManager.TaskManagerInterface
func (c *Client) Taskmanager() taskmanager.Tasker {
if c.options.taskManager != nil && c.options.taskManager.Tasker != nil {
return c.options.taskManager.Tasker
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions client_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func (c *Client) loadPaymailClient() (err error) {
// loadTaskmanager will load the TaskManager and start the TaskManager client
func (c *Client) loadTaskmanager(ctx context.Context) (err error) {
// Load if a custom interface was NOT provided
if c.options.taskManager.TaskManagerInterface == nil {
c.options.taskManager.TaskManagerInterface, err = taskmanager.NewClient(
if c.options.taskManager.Tasker == nil {
c.options.taskManager.Tasker, err = taskmanager.NewClient(
ctx, c.options.taskManager.options...,
)
}
Expand Down
4 changes: 2 additions & 2 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func defaultClientOptions() *clientOptions {

// Blank TaskManager config
taskManager: &taskManagerOptions{
TaskManagerInterface: nil,
cronCustomPeriods: map[string]time.Duration{},
Tasker: nil,
cronCustomPeriods: map[string]time.Duration{},
},

// Default user agent
Expand Down
18 changes: 8 additions & 10 deletions client_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TestWithRedis(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedis(&cachestore.RedisConfig{
URL: cachestore.RedisPrefix + "localhost:6379",
}),
Expand All @@ -287,7 +287,7 @@ func TestWithRedis(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedis(&cachestore.RedisConfig{
URL: "localhost:6379",
}),
Expand All @@ -314,7 +314,7 @@ func TestWithRedisConnection(t *testing.T) {
t.Run("using a nil connection", func(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedisConnection(nil),
WithSQLite(tester.SQLiteTestConfig(false, true)),
WithMinercraft(&chainstate.MinerCraftBase{}),
Expand All @@ -335,7 +335,7 @@ func TestWithRedisConnection(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedisConnection(client),
WithSQLite(tester.SQLiteTestConfig(false, true)),
WithMinercraft(&chainstate.MinerCraftBase{}),
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestWithFreeCache(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithFreeCache(),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName, nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithMinercraft(&chainstate.MinerCraftBase{}))
require.NoError(t, err)
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestWithFreeCacheConnection(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithFreeCacheConnection(nil),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName, nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithMinercraft(&chainstate.MinerCraftBase{}),
WithLogger(&logger),
Expand All @@ -412,7 +412,7 @@ func TestWithFreeCacheConnection(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithFreeCacheConnection(fc),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName, nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithMinercraft(&chainstate.MinerCraftBase{}),
WithLogger(&logger),
Expand Down Expand Up @@ -497,9 +497,7 @@ func TestWithTaskQ(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskqConfig(
taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), &taskmanager.SimplifiedRedisOptions{
Addr: "localhost:6379",
}),
taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), taskmanager.WithRedis("localhost:6379")),
),
WithRedis(&cachestore.RedisConfig{
URL: cachestore.RedisPrefix + "localhost:6379",
Expand Down
5 changes: 2 additions & 3 deletions examples/client/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ func main() {
context.Background(), // Set context
bux.WithRedis(&cachestore.RedisConfig{URL: redisURL}), // Cache
bux.WithTaskqConfig( // Tasks
taskmanager.DefaultTaskQConfig("example_queue", &taskmanager.SimplifiedRedisOptions{
Addr: redisURL,
})),
taskmanager.DefaultTaskQConfig("example_queue", taskmanager.WithRedis(redisURL)),
),
)
if err != nil {
log.Fatalln("error: " + err.Error())
Expand Down
2 changes: 1 addition & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type ClientService interface {
Logger() *zerolog.Logger
Notifications() notifications.ClientInterface
PaymailClient() paymail.ClientInterface
Taskmanager() taskmanager.TaskManagerInterface
Taskmanager() taskmanager.Tasker
}

// DestinationService is the destination actions
Expand Down
6 changes: 3 additions & 3 deletions paymail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func Test_getCapabilities(t *testing.T) {

tc, err := NewClient(context.Background(),
WithRedisConnection(redisClient),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName, nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithChainstateOptions(false, false, false, false),
WithDebugging(),
Expand Down Expand Up @@ -261,7 +261,7 @@ func Test_getCapabilities(t *testing.T) {

tc, err := NewClient(context.Background(),
WithRedisConnection(redisClient),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName, nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithChainstateOptions(false, false, false, false),
WithDebugging(),
Expand Down Expand Up @@ -374,7 +374,7 @@ func Test_resolvePaymailAddress(t *testing.T) {

tc, err := NewClient(context.Background(),
WithRedisConnection(redisClient),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName, nil)),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithChainstateOptions(false, false, false, false),
WithDebugging(),
Expand Down
18 changes: 7 additions & 11 deletions taskmanager/cron_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,19 @@ func (tm *Client) CronJobsInit(cronJobsMap CronJobs) (err error) {

for name, taskDef := range cronJobsMap {
handler := taskDef.Handler
if err = tm.RegisterTask(&Task{
Name: name,
RetryLimit: 1,
Handler: func() error {
if taskErr := handler(ctx); taskErr != nil {
if tm.options.logger != nil {
tm.options.logger.Error().Msgf("error running %v task: %v", name, taskErr.Error())
}
if err = tm.RegisterTask(name, func() error {
if taskErr := handler(ctx); taskErr != nil {
if tm.options.logger != nil {
tm.options.logger.Error().Msgf("error running %v task: %v", name, taskErr.Error())
}
return nil
},
}
return nil
}); err != nil {
return
}

// Run the task periodically
if err = tm.RunTask(ctx, &TaskOptions{
if err = tm.RunTask(ctx, &TaskRunOptions{
RunEveryPeriod: taskDef.Period,
TaskName: name,
}); err != nil {
Expand Down
15 changes: 0 additions & 15 deletions taskmanager/errors.go

This file was deleted.

8 changes: 4 additions & 4 deletions taskmanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
taskq "github.com/vmihailenco/taskq/v3"
)

// TaskManagerInterface is the taskmanager client interface
type TaskManagerInterface interface {
RegisterTask(task *Task) error
// Tasker is the taskmanager client interface
type Tasker interface {
RegisterTask(name string, handler interface{}) error
ResetCron()
RunTask(ctx context.Context, options *TaskOptions) error
RunTask(ctx context.Context, options *TaskRunOptions) error
Tasks() map[string]*taskq.Task
CronJobsInit(cronJobsMap CronJobs) error
Close(ctx context.Context) error
Expand Down
3 changes: 2 additions & 1 deletion taskmanager/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func defaultClientOptions() *clientOptions {
newRelicEnabled: false,
taskq: &taskqOptions{
tasks: make(map[string]*taskq.Task),
config: DefaultTaskQConfig("taskq", nil),
config: DefaultTaskQConfig("taskq"),
},
}
}
Expand Down Expand Up @@ -52,6 +52,7 @@ func WithDebugging() ClientOps {
}
}

// WithTaskqConfig will set the taskq custom config
func WithTaskqConfig(config *taskq.QueueOptions) ClientOps {
return func(c *clientOptions) {
if config != nil {
Expand Down
2 changes: 1 addition & 1 deletion taskmanager/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestWithTaskQ(t *testing.T) {
options := &clientOptions{
taskq: &taskqOptions{},
}
opt := WithTaskqConfig(DefaultTaskQConfig(testQueueName, nil))
opt := WithTaskqConfig(DefaultTaskQConfig(testQueueName))
opt(options)
assert.NotNil(t, options.taskq.config)
})
Expand Down
51 changes: 0 additions & 51 deletions taskmanager/task.go

This file was deleted.

2 changes: 1 addition & 1 deletion taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type (
//
// If no options are given, it will use the defaultClientOptions()
// ctx may contain a NewRelic txn (or one will be created)
func NewClient(_ context.Context, opts ...ClientOps) (TaskManagerInterface, error) {
func NewClient(_ context.Context, opts ...ClientOps) (Tasker, error) {
// Create a new client with defaults
client := &Client{options: defaultClientOptions()}

Expand Down
Loading

0 comments on commit 4d63b33

Please sign in to comment.