Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

refactor(BUX-411): taskmanager simplification & tasq with redis fixes #510

Merged
merged 20 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
fd65853
refactor(BUX-411): remove empty taskmanager files
chris-4chain Dec 15, 2023
95e1e6d
refactor(BUX-411): rename files
chris-4chain Dec 15, 2023
26d7c36
refactor(BUX-411): taskq as the only task queue
chris-4chain Dec 15, 2023
60a3fd5
refactor(BUX-411): localCron as the only cron service
chris-4chain Dec 15, 2023
940725b
refactor(BUX-411): unnecessary With* func options - partially removed
chris-4chain Dec 15, 2023
7b94981
refactor(BUX-411): WithTaskQ to WithTaskqConfig
chris-4chain Dec 15, 2023
4d63b33
refactor(BUX-411): redis options by WithRedis
chris-4chain Dec 15, 2023
40b4e33
refactor(BUX-411): rename Client to TaskManager
chris-4chain Dec 15, 2023
51ca270
refactor(BUX-411): taskmanager with redis - tests
chris-4chain Dec 15, 2023
432149f
fix(BUX-411): distributed locking when using taskq-redis with multipl…
chris-4chain Dec 15, 2023
0a04ed1
refactor(BUX-411): defaultOptions moved right into NewTaskManager
chris-4chain Dec 15, 2023
584d09d
refactor(BUX-411): additional redis version comment note
chris-4chain Dec 19, 2023
51318af
refactor(BUX-411): Update version number to v0.9.0
chris-4chain Dec 19, 2023
01d3d8e
refactor(BUX-411): apply suggested comment
chris-4chain Dec 20, 2023
15aec9a
refactor(BUX-411): apply suggested comment
chris-4chain Dec 20, 2023
bb55a2f
refactor(BUX-411): redundant comments & log error if RegisterTask panic
chris-4chain Dec 20, 2023
13d9ba1
Merge branch 'refactor-411-taskmanager-simplification' of https://git…
chris-4chain Dec 20, 2023
e830802
refactor(BUX-411): taskq comments & error if runEveryPeriod < 1sec
chris-4chain Dec 20, 2023
04e02c9
refactor(BUX-411): taskmanager logger
chris-4chain Dec 20, 2023
12a41f8
refactor(BUX-411): Tasker to TaskEngine
chris-4chain Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion action_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func BenchmarkAction_Transaction_newTransaction(b *testing.B) {

func initBenchmarkData(b *testing.B) (context.Context, ClientInterface, *Xpub, *TransactionConfig, error) {
ctx, client, _ := CreateBenchmarkSQLiteClient(b, false, true,
WithCustomTaskManager(&taskManagerMockBase{}),
withTaskManagerMockup(),
WithFreeCache(),
WithIUCDisabled(),
)
Expand Down
15 changes: 9 additions & 6 deletions bux_suite_mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ type taskManagerMockBase struct{}

func (tm *taskManagerMockBase) Info(context.Context, string, ...interface{}) {}

func (tm *taskManagerMockBase) RegisterTask(*taskmanager.Task) error {
func (tm *taskManagerMockBase) RegisterTask(string, interface{}) error {
return nil
}

func (tm *taskManagerMockBase) ResetCron() {}

func (tm *taskManagerMockBase) RunTask(context.Context, *taskmanager.TaskOptions) error {
func (tm *taskManagerMockBase) RunTask(context.Context, *taskmanager.TaskRunOptions) error {
return nil
}

Expand All @@ -32,10 +32,6 @@ func (tm *taskManagerMockBase) Close(context.Context) error {

func (tm *taskManagerMockBase) Debug(bool) {}

func (tm *taskManagerMockBase) Engine() taskmanager.Engine {
return taskmanager.Empty
}

func (tm *taskManagerMockBase) Factory() taskmanager.Factory {
return taskmanager.FactoryEmpty
}
Expand All @@ -55,3 +51,10 @@ func (tm *taskManagerMockBase) IsNewRelicEnabled() bool {
func (tm *taskManagerMockBase) CronJobsInit(cronJobsMap taskmanager.CronJobs) error {
return nil
}

// Sets custom task manager only for testing
func withTaskManagerMockup() ClientOps {
return func(c *clientOptions) {
c.taskManager.TaskEngine = &taskManagerMockBase{}
}
}
13 changes: 5 additions & 8 deletions bux_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func (ts *EmbeddedDBTestSuite) serveMySQL() {

// SetupSuite runs at the start of the suite
func (ts *EmbeddedDBTestSuite) SetupSuite() {

var err error

// Create the MySQL server
Expand Down Expand Up @@ -120,7 +119,6 @@ func (ts *EmbeddedDBTestSuite) SetupSuite() {

// TearDownSuite runs after the suite finishes
func (ts *EmbeddedDBTestSuite) TearDownSuite() {

// Stop the Mongo server
if ts.MongoServer != nil {
ts.MongoServer.Stop()
Expand Down Expand Up @@ -157,8 +155,8 @@ func (ts *EmbeddedDBTestSuite) TearDownTest() {
//
// NOTE: you need to close the client: ts.Close()
func (ts *EmbeddedDBTestSuite) createTestClient(ctx context.Context, database datastore.Engine,
tablePrefix string, mockDB, mockRedis bool, opts ...ClientOps) (*TestingClient, error) {

tablePrefix string, mockDB, mockRedis bool, opts ...ClientOps,
) (*TestingClient, error) {
var err error

// Start the suite
Expand Down Expand Up @@ -201,7 +199,6 @@ func (ts *EmbeddedDBTestSuite) createTestClient(ctx context.Context, database da
}

} else {

// Load the in-memory version of the database
if database == datastore.SQLite {
opts = append(opts, WithSQLite(&datastore.SQLiteConfig{
Expand Down Expand Up @@ -329,9 +326,9 @@ func (ts *EmbeddedDBTestSuite) genericDBClient(t *testing.T, database datastore.
WithAutoMigrate(&PaymailAddress{}),
)
if taskManagerEnabled {
opts = append(opts, WithTaskQ(taskmanager.DefaultTaskQConfig(prefix+"_queue"), taskmanager.FactoryMemory))
opts = append(opts, WithTaskqConfig(taskmanager.DefaultTaskQConfig(prefix+"_queue")))
} else {
opts = append(opts, WithCustomTaskManager(&taskManagerMockBase{}))
opts = append(opts, withTaskManagerMockup())
}

tc, err := ts.createTestClient(
Expand All @@ -358,7 +355,7 @@ func (ts *EmbeddedDBTestSuite) genericMockedDBClient(t *testing.T, database data
),
database, prefix,
true, true, WithDebugging(),
WithCustomTaskManager(&taskManagerMockBase{}),
withTaskManagerMockup(),
)
require.NoError(t, err)
require.NotNil(t, tc)
Expand Down
10 changes: 5 additions & 5 deletions bux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func DefaultClientOpts(debug, shared bool) []ClientOps {
opts := make([]ClientOps, 0)
opts = append(
opts,
WithTaskQ(tqc, taskmanager.FactoryMemory),
WithTaskqConfig(tqc),
WithSQLite(tester.SQLiteTestConfig(debug, shared)),
WithChainstateOptions(false, false, false, false),
WithMinercraft(&chainstate.MinerCraftBase{}),
Expand Down Expand Up @@ -150,8 +150,8 @@ func (a *account) Unlocker(context.Context, *bscript.Script) (bt.Unlocker, error

// CreateFakeFundingTransaction will create a valid (fake) transaction for funding
func CreateFakeFundingTransaction(t *testing.T, masterKey *bip32.ExtendedKey,
destinations []*Destination, satoshis uint64) string {

destinations []*Destination, satoshis uint64,
) string {
// Create new tx
rawTx := bt.NewTx()
txErr := rawTx.From(testTxScriptSigID, 0, testTxScriptSigOut, satoshis+354)
Expand Down Expand Up @@ -185,8 +185,8 @@ func CreateFakeFundingTransaction(t *testing.T, masterKey *bip32.ExtendedKey,

// CreateNewXPub will create a new xPub and return all the information to use the xPub
func CreateNewXPub(ctx context.Context, t *testing.T, buxClient ClientInterface,
opts ...ModelOps) (*bip32.ExtendedKey, *Xpub, string) {

opts ...ModelOps,
) (*bip32.ExtendedKey, *Xpub, string) {
// Generate a key pair
masterKey, err := bitcoin.GenerateHDKey(bitcoin.SecureSeedLength)
require.NoError(t, err)
Expand Down
21 changes: 8 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ type (

// taskManagerOptions holds the configuration for taskmanager
taskManagerOptions struct {
taskmanager.ClientInterface // Client for TaskManager
cronJobs taskmanager.CronJobs // List of cron jobs
options []taskmanager.ClientOps // List of options
cronCustomPeriods map[string]time.Duration // will override the default period of cronJob
taskmanager.TaskEngine // Client for TaskManager
cronJobs taskmanager.CronJobs // List of cron jobs
options []taskmanager.TaskManagerOptions // List of options
cronCustomPeriods map[string]time.Duration // will override the default period of cronJob
}
)

Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *Client) Close(ctx context.Context) error {
if err := tm.Close(ctx); err != nil {
return err
}
c.options.taskManager.ClientInterface = nil
c.options.taskManager.TaskEngine = nil
}
return nil
}
Expand Down Expand Up @@ -340,11 +340,6 @@ func (c *Client) Debug(on bool) {
if n := c.Notifications(); n != nil {
n.Debug(on)
}

// Set debugging on the Taskmanager
if tm := c.Taskmanager(); tm != nil {
tm.Debug(on)
}
}

// DefaultSyncConfig will return the default sync config from the client defaults (for chainstate)
Expand Down Expand Up @@ -445,9 +440,9 @@ func (c *Client) SetNotificationsClient(client notifications.ClientInterface) {
}

// Taskmanager will return the Taskmanager if it exists
func (c *Client) Taskmanager() taskmanager.ClientInterface {
if c.options.taskManager != nil && c.options.taskManager.ClientInterface != nil {
return c.options.taskManager.ClientInterface
func (c *Client) Taskmanager() taskmanager.TaskEngine {
if c.options.taskManager != nil && c.options.taskManager.TaskEngine != nil {
return c.options.taskManager.TaskEngine
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions client_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func (c *Client) loadPaymailClient() (err error) {
// loadTaskmanager will load the TaskManager and start the TaskManager client
func (c *Client) loadTaskmanager(ctx context.Context) (err error) {
// Load if a custom interface was NOT provided
if c.options.taskManager.ClientInterface == nil {
c.options.taskManager.ClientInterface, err = taskmanager.NewClient(
if c.options.taskManager.TaskEngine == nil {
c.options.taskManager.TaskEngine, err = taskmanager.NewTaskManager(
ctx, c.options.taskManager.options...,
)
}
Expand Down
48 changes: 4 additions & 44 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func defaultClientOptions() *clientOptions {

// Blank TaskManager config
taskManager: &taskManagerOptions{
ClientInterface: nil,
TaskEngine: nil,
cronCustomPeriods: map[string]time.Duration{},
},

Expand Down Expand Up @@ -215,7 +215,6 @@ func WithDebugging() ClientOps {
c.chainstate.options = append(c.chainstate.options, chainstate.WithDebugging())
c.dataStore.options = append(c.dataStore.options, datastore.WithDebugging())
c.notifications.options = append(c.notifications.options, notifications.WithDebugging())
c.taskManager.options = append(c.taskManager.options, taskmanager.WithDebugging())
}
}

Expand Down Expand Up @@ -532,57 +531,18 @@ func WithPaymailServerConfig(config *server.Configuration, defaultFromPaymail, d
// TASK MANAGER
// -----------------------------------------------------------------

// WithCustomTaskManager will set the taskmanager
func WithCustomTaskManager(taskManager taskmanager.ClientInterface) ClientOps {
return func(c *clientOptions) {
if taskManager != nil {
c.taskManager.ClientInterface = taskManager
}
}
}

// WithTaskQ will set the task manager to use TaskQ & in-memory
func WithTaskQ(config *taskq.QueueOptions, factory taskmanager.Factory) ClientOps {
return func(c *clientOptions) {
if config != nil {
c.taskManager.options = append(
c.taskManager.options,
taskmanager.WithTaskQ(config, factory),
)
}
}
}

// WithTaskQUsingRedis will set the task manager to use TaskQ & Redis
func WithTaskQUsingRedis(config *taskq.QueueOptions, redisOptions *redis.Options) ClientOps {
// WithTaskqConfig will set the task manager to use TaskQ & in-memory
func WithTaskqConfig(config *taskq.QueueOptions) ClientOps {
return func(c *clientOptions) {
if config != nil {

// Create a new redis client
if config.Redis == nil {

// Remove prefix if found
redisOptions.Addr = strings.Replace(redisOptions.Addr, cachestore.RedisPrefix, "", -1)
config.Redis = redis.NewClient(redisOptions)
}

c.taskManager.options = append(
c.taskManager.options,
taskmanager.WithTaskQ(config, taskmanager.FactoryRedis),
taskmanager.WithTaskqConfig(config),
)
}
}
}

// WithCronService will set the custom cron service provider
func WithCronService(cronService taskmanager.CronService) ClientOps {
return func(c *clientOptions) {
if cronService != nil && c.taskManager != nil {
c.taskManager.options = append(c.taskManager.options, taskmanager.WithCronService(cronService))
}
}
}

// WithCronCustmPeriod will set the custom cron jobs period which will override the default
func WithCronCustmPeriod(cronJobName string, period time.Duration) ClientOps {
return func(c *clientOptions) {
Expand Down
26 changes: 9 additions & 17 deletions client_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/BuxOrg/bux/utils"
"github.com/bitcoin-sv/go-paymail"
"github.com/coocood/freecache"
"github.com/go-redis/redis/v8"
"github.com/mrz1836/go-cachestore"
"github.com/mrz1836/go-datastore"
"github.com/newrelic/go-agent/v3/newrelic"
Expand Down Expand Up @@ -212,7 +211,6 @@ func TestWithDebugging(t *testing.T) {
assert.Equal(t, true, tc.IsDebug())
assert.Equal(t, true, tc.Cachestore().IsDebug())
assert.Equal(t, true, tc.Datastore().IsDebug())
assert.Equal(t, true, tc.Taskmanager().IsDebug())
})
}

Expand Down Expand Up @@ -265,7 +263,7 @@ func TestWithRedis(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedis(&cachestore.RedisConfig{
URL: cachestore.RedisPrefix + "localhost:6379",
}),
Expand All @@ -288,7 +286,7 @@ func TestWithRedis(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedis(&cachestore.RedisConfig{
URL: "localhost:6379",
}),
Expand All @@ -315,7 +313,7 @@ func TestWithRedisConnection(t *testing.T) {
t.Run("using a nil connection", func(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedisConnection(nil),
WithSQLite(tester.SQLiteTestConfig(false, true)),
WithMinercraft(&chainstate.MinerCraftBase{}),
Expand All @@ -336,7 +334,7 @@ func TestWithRedisConnection(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedisConnection(client),
WithSQLite(tester.SQLiteTestConfig(false, true)),
WithMinercraft(&chainstate.MinerCraftBase{}),
Expand Down Expand Up @@ -364,7 +362,7 @@ func TestWithFreeCache(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithFreeCache(),
WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithMinercraft(&chainstate.MinerCraftBase{}))
require.NoError(t, err)
Expand Down Expand Up @@ -392,7 +390,7 @@ func TestWithFreeCacheConnection(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithFreeCacheConnection(nil),
WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithMinercraft(&chainstate.MinerCraftBase{}),
WithLogger(&logger),
Expand All @@ -413,7 +411,7 @@ func TestWithFreeCacheConnection(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithFreeCacheConnection(fc),
WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithMinercraft(&chainstate.MinerCraftBase{}),
WithLogger(&logger),
Expand Down Expand Up @@ -470,7 +468,6 @@ func TestWithTaskQ(t *testing.T) {
// todo: test cases where config is nil, or cannot load TaskQ

t.Run("using taskq using memory", func(t *testing.T) {

logger := zerolog.Nop()
tcOpts := DefaultClientOpts(true, true)
tcOpts = append(tcOpts, WithLogger(&logger))
Expand All @@ -485,7 +482,6 @@ func TestWithTaskQ(t *testing.T) {

tm := tc.Taskmanager()
require.NotNil(t, tm)
assert.Equal(t, taskmanager.TaskQ, tm.Engine())
assert.Equal(t, taskmanager.FactoryMemory, tm.Factory())
})

Expand All @@ -498,11 +494,8 @@ func TestWithTaskQ(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQUsingRedis(
taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()),
&redis.Options{
Addr: "localhost:6379",
},
WithTaskqConfig(
taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), taskmanager.WithRedis("localhost:6379")),
),
WithRedis(&cachestore.RedisConfig{
URL: cachestore.RedisPrefix + "localhost:6379",
Expand All @@ -517,7 +510,6 @@ func TestWithTaskQ(t *testing.T) {

tm := tc.Taskmanager()
require.NotNil(t, tm)
assert.Equal(t, taskmanager.TaskQ, tm.Engine())
assert.Equal(t, taskmanager.FactoryRedis, tm.Factory())
})
}
Expand Down
1 change: 0 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestClient_Debug(t *testing.T) {
assert.Equal(t, true, tc.Cachestore().IsDebug())
assert.Equal(t, true, tc.Datastore().IsDebug())
assert.Equal(t, true, tc.Notifications().IsDebug())
assert.Equal(t, true, tc.Taskmanager().IsDebug())
})
}

Expand Down
Loading
Loading