diff --git a/action_transaction_test.go b/action_transaction_test.go index 7fea5d6d..e4b05114 100644 --- a/action_transaction_test.go +++ b/action_transaction_test.go @@ -325,7 +325,7 @@ func BenchmarkAction_Transaction_newTransaction(b *testing.B) { func initBenchmarkData(b *testing.B) (context.Context, ClientInterface, *Xpub, *TransactionConfig, error) { ctx, client, _ := CreateBenchmarkSQLiteClient(b, false, true, - WithCustomTaskManager(&taskManagerMockBase{}), + withTaskManagerMockup(), WithFreeCache(), WithIUCDisabled(), ) diff --git a/bux_suite_mocks_test.go b/bux_suite_mocks_test.go index ddbfbf56..1dfada21 100644 --- a/bux_suite_mocks_test.go +++ b/bux_suite_mocks_test.go @@ -12,13 +12,13 @@ type taskManagerMockBase struct{} func (tm *taskManagerMockBase) Info(context.Context, string, ...interface{}) {} -func (tm *taskManagerMockBase) RegisterTask(*taskmanager.Task) error { +func (tm *taskManagerMockBase) RegisterTask(string, interface{}) error { return nil } func (tm *taskManagerMockBase) ResetCron() {} -func (tm *taskManagerMockBase) RunTask(context.Context, *taskmanager.TaskOptions) error { +func (tm *taskManagerMockBase) RunTask(context.Context, *taskmanager.TaskRunOptions) error { return nil } @@ -32,10 +32,6 @@ func (tm *taskManagerMockBase) Close(context.Context) error { func (tm *taskManagerMockBase) Debug(bool) {} -func (tm *taskManagerMockBase) Engine() taskmanager.Engine { - return taskmanager.Empty -} - func (tm *taskManagerMockBase) Factory() taskmanager.Factory { return taskmanager.FactoryEmpty } @@ -55,3 +51,10 @@ func (tm *taskManagerMockBase) IsNewRelicEnabled() bool { func (tm *taskManagerMockBase) CronJobsInit(cronJobsMap taskmanager.CronJobs) error { return nil } + +// Sets custom task manager only for testing +func withTaskManagerMockup() ClientOps { + return func(c *clientOptions) { + c.taskManager.TaskEngine = &taskManagerMockBase{} + } +} diff --git a/bux_suite_test.go b/bux_suite_test.go index 0070d850..aac0aaf9 100644 --- a/bux_suite_test.go +++ b/bux_suite_test.go @@ -86,7 +86,6 @@ func (ts *EmbeddedDBTestSuite) serveMySQL() { // SetupSuite runs at the start of the suite func (ts *EmbeddedDBTestSuite) SetupSuite() { - var err error // Create the MySQL server @@ -120,7 +119,6 @@ func (ts *EmbeddedDBTestSuite) SetupSuite() { // TearDownSuite runs after the suite finishes func (ts *EmbeddedDBTestSuite) TearDownSuite() { - // Stop the Mongo server if ts.MongoServer != nil { ts.MongoServer.Stop() @@ -157,8 +155,8 @@ func (ts *EmbeddedDBTestSuite) TearDownTest() { // // NOTE: you need to close the client: ts.Close() func (ts *EmbeddedDBTestSuite) createTestClient(ctx context.Context, database datastore.Engine, - tablePrefix string, mockDB, mockRedis bool, opts ...ClientOps) (*TestingClient, error) { - + tablePrefix string, mockDB, mockRedis bool, opts ...ClientOps, +) (*TestingClient, error) { var err error // Start the suite @@ -201,7 +199,6 @@ func (ts *EmbeddedDBTestSuite) createTestClient(ctx context.Context, database da } } else { - // Load the in-memory version of the database if database == datastore.SQLite { opts = append(opts, WithSQLite(&datastore.SQLiteConfig{ @@ -329,9 +326,9 @@ func (ts *EmbeddedDBTestSuite) genericDBClient(t *testing.T, database datastore. WithAutoMigrate(&PaymailAddress{}), ) if taskManagerEnabled { - opts = append(opts, WithTaskQ(taskmanager.DefaultTaskQConfig(prefix+"_queue"), taskmanager.FactoryMemory)) + opts = append(opts, WithTaskqConfig(taskmanager.DefaultTaskQConfig(prefix+"_queue"))) } else { - opts = append(opts, WithCustomTaskManager(&taskManagerMockBase{})) + opts = append(opts, withTaskManagerMockup()) } tc, err := ts.createTestClient( @@ -358,7 +355,7 @@ func (ts *EmbeddedDBTestSuite) genericMockedDBClient(t *testing.T, database data ), database, prefix, true, true, WithDebugging(), - WithCustomTaskManager(&taskManagerMockBase{}), + withTaskManagerMockup(), ) require.NoError(t, err) require.NotNil(t, tc) diff --git a/bux_test.go b/bux_test.go index 34d3aeb7..f9e0432a 100644 --- a/bux_test.go +++ b/bux_test.go @@ -66,7 +66,7 @@ func DefaultClientOpts(debug, shared bool) []ClientOps { opts := make([]ClientOps, 0) opts = append( opts, - WithTaskQ(tqc, taskmanager.FactoryMemory), + WithTaskqConfig(tqc), WithSQLite(tester.SQLiteTestConfig(debug, shared)), WithChainstateOptions(false, false, false, false), WithMinercraft(&chainstate.MinerCraftBase{}), @@ -150,8 +150,8 @@ func (a *account) Unlocker(context.Context, *bscript.Script) (bt.Unlocker, error // CreateFakeFundingTransaction will create a valid (fake) transaction for funding func CreateFakeFundingTransaction(t *testing.T, masterKey *bip32.ExtendedKey, - destinations []*Destination, satoshis uint64) string { - + destinations []*Destination, satoshis uint64, +) string { // Create new tx rawTx := bt.NewTx() txErr := rawTx.From(testTxScriptSigID, 0, testTxScriptSigOut, satoshis+354) @@ -185,8 +185,8 @@ func CreateFakeFundingTransaction(t *testing.T, masterKey *bip32.ExtendedKey, // CreateNewXPub will create a new xPub and return all the information to use the xPub func CreateNewXPub(ctx context.Context, t *testing.T, buxClient ClientInterface, - opts ...ModelOps) (*bip32.ExtendedKey, *Xpub, string) { - + opts ...ModelOps, +) (*bip32.ExtendedKey, *Xpub, string) { // Generate a key pair masterKey, err := bitcoin.GenerateHDKey(bitcoin.SecureSeedLength) require.NoError(t, err) diff --git a/client.go b/client.go index f4bb5710..bf595545 100644 --- a/client.go +++ b/client.go @@ -113,10 +113,10 @@ type ( // taskManagerOptions holds the configuration for taskmanager taskManagerOptions struct { - taskmanager.ClientInterface // Client for TaskManager - cronJobs taskmanager.CronJobs // List of cron jobs - options []taskmanager.ClientOps // List of options - cronCustomPeriods map[string]time.Duration // will override the default period of cronJob + taskmanager.TaskEngine // Client for TaskManager + cronJobs taskmanager.CronJobs // List of cron jobs + options []taskmanager.TaskManagerOptions // List of options + cronCustomPeriods map[string]time.Duration // will override the default period of cronJob } ) @@ -303,7 +303,7 @@ func (c *Client) Close(ctx context.Context) error { if err := tm.Close(ctx); err != nil { return err } - c.options.taskManager.ClientInterface = nil + c.options.taskManager.TaskEngine = nil } return nil } @@ -340,11 +340,6 @@ func (c *Client) Debug(on bool) { if n := c.Notifications(); n != nil { n.Debug(on) } - - // Set debugging on the Taskmanager - if tm := c.Taskmanager(); tm != nil { - tm.Debug(on) - } } // DefaultSyncConfig will return the default sync config from the client defaults (for chainstate) @@ -445,9 +440,9 @@ func (c *Client) SetNotificationsClient(client notifications.ClientInterface) { } // Taskmanager will return the Taskmanager if it exists -func (c *Client) Taskmanager() taskmanager.ClientInterface { - if c.options.taskManager != nil && c.options.taskManager.ClientInterface != nil { - return c.options.taskManager.ClientInterface +func (c *Client) Taskmanager() taskmanager.TaskEngine { + if c.options.taskManager != nil && c.options.taskManager.TaskEngine != nil { + return c.options.taskManager.TaskEngine } return nil } diff --git a/client_internal.go b/client_internal.go index c43c2a57..41748d5e 100644 --- a/client_internal.go +++ b/client_internal.go @@ -114,8 +114,8 @@ func (c *Client) loadPaymailClient() (err error) { // loadTaskmanager will load the TaskManager and start the TaskManager client func (c *Client) loadTaskmanager(ctx context.Context) (err error) { // Load if a custom interface was NOT provided - if c.options.taskManager.ClientInterface == nil { - c.options.taskManager.ClientInterface, err = taskmanager.NewClient( + if c.options.taskManager.TaskEngine == nil { + c.options.taskManager.TaskEngine, err = taskmanager.NewTaskManager( ctx, c.options.taskManager.options..., ) } diff --git a/client_options.go b/client_options.go index d0a16a16..b9c07d3f 100644 --- a/client_options.go +++ b/client_options.go @@ -105,7 +105,7 @@ func defaultClientOptions() *clientOptions { // Blank TaskManager config taskManager: &taskManagerOptions{ - ClientInterface: nil, + TaskEngine: nil, cronCustomPeriods: map[string]time.Duration{}, }, @@ -215,7 +215,6 @@ func WithDebugging() ClientOps { c.chainstate.options = append(c.chainstate.options, chainstate.WithDebugging()) c.dataStore.options = append(c.dataStore.options, datastore.WithDebugging()) c.notifications.options = append(c.notifications.options, notifications.WithDebugging()) - c.taskManager.options = append(c.taskManager.options, taskmanager.WithDebugging()) } } @@ -532,57 +531,18 @@ func WithPaymailServerConfig(config *server.Configuration, defaultFromPaymail, d // TASK MANAGER // ----------------------------------------------------------------- -// WithCustomTaskManager will set the taskmanager -func WithCustomTaskManager(taskManager taskmanager.ClientInterface) ClientOps { - return func(c *clientOptions) { - if taskManager != nil { - c.taskManager.ClientInterface = taskManager - } - } -} - -// WithTaskQ will set the task manager to use TaskQ & in-memory -func WithTaskQ(config *taskq.QueueOptions, factory taskmanager.Factory) ClientOps { - return func(c *clientOptions) { - if config != nil { - c.taskManager.options = append( - c.taskManager.options, - taskmanager.WithTaskQ(config, factory), - ) - } - } -} - -// WithTaskQUsingRedis will set the task manager to use TaskQ & Redis -func WithTaskQUsingRedis(config *taskq.QueueOptions, redisOptions *redis.Options) ClientOps { +// WithTaskqConfig will set the task manager to use TaskQ & in-memory +func WithTaskqConfig(config *taskq.QueueOptions) ClientOps { return func(c *clientOptions) { if config != nil { - - // Create a new redis client - if config.Redis == nil { - - // Remove prefix if found - redisOptions.Addr = strings.Replace(redisOptions.Addr, cachestore.RedisPrefix, "", -1) - config.Redis = redis.NewClient(redisOptions) - } - c.taskManager.options = append( c.taskManager.options, - taskmanager.WithTaskQ(config, taskmanager.FactoryRedis), + taskmanager.WithTaskqConfig(config), ) } } } -// WithCronService will set the custom cron service provider -func WithCronService(cronService taskmanager.CronService) ClientOps { - return func(c *clientOptions) { - if cronService != nil && c.taskManager != nil { - c.taskManager.options = append(c.taskManager.options, taskmanager.WithCronService(cronService)) - } - } -} - // WithCronCustmPeriod will set the custom cron jobs period which will override the default func WithCronCustmPeriod(cronJobName string, period time.Duration) ClientOps { return func(c *clientOptions) { diff --git a/client_options_test.go b/client_options_test.go index d2e08869..d31e3133 100644 --- a/client_options_test.go +++ b/client_options_test.go @@ -14,7 +14,6 @@ import ( "github.com/BuxOrg/bux/utils" "github.com/bitcoin-sv/go-paymail" "github.com/coocood/freecache" - "github.com/go-redis/redis/v8" "github.com/mrz1836/go-cachestore" "github.com/mrz1836/go-datastore" "github.com/newrelic/go-agent/v3/newrelic" @@ -212,7 +211,6 @@ func TestWithDebugging(t *testing.T) { assert.Equal(t, true, tc.IsDebug()) assert.Equal(t, true, tc.Cachestore().IsDebug()) assert.Equal(t, true, tc.Datastore().IsDebug()) - assert.Equal(t, true, tc.Taskmanager().IsDebug()) }) } @@ -265,7 +263,7 @@ func TestWithRedis(t *testing.T) { tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), - WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())), WithRedis(&cachestore.RedisConfig{ URL: cachestore.RedisPrefix + "localhost:6379", }), @@ -288,7 +286,7 @@ func TestWithRedis(t *testing.T) { tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), - WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())), WithRedis(&cachestore.RedisConfig{ URL: "localhost:6379", }), @@ -315,7 +313,7 @@ func TestWithRedisConnection(t *testing.T) { t.Run("using a nil connection", func(t *testing.T) { tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), - WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())), WithRedisConnection(nil), WithSQLite(tester.SQLiteTestConfig(false, true)), WithMinercraft(&chainstate.MinerCraftBase{}), @@ -336,7 +334,7 @@ func TestWithRedisConnection(t *testing.T) { tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), - WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())), WithRedisConnection(client), WithSQLite(tester.SQLiteTestConfig(false, true)), WithMinercraft(&chainstate.MinerCraftBase{}), @@ -364,7 +362,7 @@ func TestWithFreeCache(t *testing.T) { tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), WithFreeCache(), - WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)), WithSQLite(&datastore.SQLiteConfig{Shared: true}), WithMinercraft(&chainstate.MinerCraftBase{})) require.NoError(t, err) @@ -392,7 +390,7 @@ func TestWithFreeCacheConnection(t *testing.T) { tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), WithFreeCacheConnection(nil), - WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)), WithSQLite(&datastore.SQLiteConfig{Shared: true}), WithMinercraft(&chainstate.MinerCraftBase{}), WithLogger(&logger), @@ -413,7 +411,7 @@ func TestWithFreeCacheConnection(t *testing.T) { tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), WithFreeCacheConnection(fc), - WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)), WithSQLite(&datastore.SQLiteConfig{Shared: true}), WithMinercraft(&chainstate.MinerCraftBase{}), WithLogger(&logger), @@ -470,7 +468,6 @@ func TestWithTaskQ(t *testing.T) { // todo: test cases where config is nil, or cannot load TaskQ t.Run("using taskq using memory", func(t *testing.T) { - logger := zerolog.Nop() tcOpts := DefaultClientOpts(true, true) tcOpts = append(tcOpts, WithLogger(&logger)) @@ -485,7 +482,6 @@ func TestWithTaskQ(t *testing.T) { tm := tc.Taskmanager() require.NotNil(t, tm) - assert.Equal(t, taskmanager.TaskQ, tm.Engine()) assert.Equal(t, taskmanager.FactoryMemory, tm.Factory()) }) @@ -498,11 +494,8 @@ func TestWithTaskQ(t *testing.T) { tc, err := NewClient( tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx), - WithTaskQUsingRedis( - taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), - &redis.Options{ - Addr: "localhost:6379", - }, + WithTaskqConfig( + taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), taskmanager.WithRedis("localhost:6379")), ), WithRedis(&cachestore.RedisConfig{ URL: cachestore.RedisPrefix + "localhost:6379", @@ -517,7 +510,6 @@ func TestWithTaskQ(t *testing.T) { tm := tc.Taskmanager() require.NotNil(t, tm) - assert.Equal(t, taskmanager.TaskQ, tm.Engine()) assert.Equal(t, taskmanager.FactoryRedis, tm.Factory()) }) } diff --git a/client_test.go b/client_test.go index 0a12bd5c..28ef625b 100644 --- a/client_test.go +++ b/client_test.go @@ -36,7 +36,6 @@ func TestClient_Debug(t *testing.T) { assert.Equal(t, true, tc.Cachestore().IsDebug()) assert.Equal(t, true, tc.Datastore().IsDebug()) assert.Equal(t, true, tc.Notifications().IsDebug()) - assert.Equal(t, true, tc.Taskmanager().IsDebug()) }) } diff --git a/definitions.go b/definitions.go index 66ad0a39..aec72e2c 100644 --- a/definitions.go +++ b/definitions.go @@ -24,7 +24,7 @@ const ( dustLimit = uint64(1) // Dust limit mongoTestVersion = "6.0.4" // Mongo Testing Version sqliteTestVersion = "3.37.0" // SQLite Testing Version (dummy version for now) - version = "v0.8.1" // bux version + version = "v0.9.0" // bux version ) // All the base models diff --git a/examples/client/broadcast_miners/broadcast_miners.go b/examples/client/broadcast_miners/broadcast_miners.go index 3ec6a97e..bcb09187 100644 --- a/examples/client/broadcast_miners/broadcast_miners.go +++ b/examples/client/broadcast_miners/broadcast_miners.go @@ -7,12 +7,10 @@ import ( "github.com/BuxOrg/bux" "github.com/BuxOrg/bux/chainstate" - "github.com/BuxOrg/bux/taskmanager" "github.com/tonicpow/go-minercraft/v2" ) func main() { - // Create a custom miner (using your api key for custom rates) miners, _ := minercraft.DefaultMiners() minerTaal := minercraft.MinerByName(miners, minercraft.MinerTaal) @@ -22,8 +20,8 @@ func main() { APIs: []minercraft.API{ { Token: os.Getenv("BUX_TAAL_API_KEY"), - URL: "https://tapi.taal.com/arc", - Type: minercraft.Arc, + URL: "https://tapi.taal.com/arc", + Type: minercraft.Arc, }, }, }, @@ -32,9 +30,8 @@ func main() { // Create the client client, err := bux.NewClient( context.Background(), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks - bux.WithBroadcastMiners([]*chainstate.Miner{{Miner: minerTaal}}), // This will auto-fetch a policy using the token (api key) - bux.WithQueryMiners([]*chainstate.Miner{{Miner: minerTaal}}), // This will only use this as a query provider + bux.WithBroadcastMiners([]*chainstate.Miner{{Miner: minerTaal}}), // This will auto-fetch a policy using the token (api key) + bux.WithQueryMiners([]*chainstate.Miner{{Miner: minerTaal}}), // This will only use this as a query provider bux.WithMinercraftAPIs(minerCraftApis), bux.WithArc(), ) diff --git a/examples/client/chainstate/chainstate.go b/examples/client/chainstate/chainstate.go index 262ee0ac..7a29bdcc 100644 --- a/examples/client/chainstate/chainstate.go +++ b/examples/client/chainstate/chainstate.go @@ -5,14 +5,12 @@ import ( "log" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" ) func main() { client, err := bux.NewClient( context.Background(), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks - bux.WithDebugging(), // Enable debugging (verbose logs) + bux.WithDebugging(), // Enable debugging (verbose logs) bux.WithChainstateOptions(true, true, true, true), // Broadcasting enabled by default ) if err != nil { diff --git a/examples/client/custom_cron/custom_cron.go b/examples/client/custom_cron/custom_cron.go index daadd693..acf5baaa 100644 --- a/examples/client/custom_cron/custom_cron.go +++ b/examples/client/custom_cron/custom_cron.go @@ -6,13 +6,11 @@ import ( "time" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" ) func main() { client, err := bux.NewClient( context.Background(), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks bux.WithCronCustmPeriod(bux.CronJobNameDraftTransactionCleanUp, 2*time.Second), bux.WithCronCustmPeriod(bux.CronJobNameIncomingTransaction, 4*time.Second), ) diff --git a/examples/client/custom_models/custom_models.go b/examples/client/custom_models/custom_models.go index b01bdbdd..b50347f8 100644 --- a/examples/client/custom_models/custom_models.go +++ b/examples/client/custom_models/custom_models.go @@ -5,7 +5,6 @@ import ( "log" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" ) func main() { @@ -13,8 +12,7 @@ func main() { context.Background(), // Set context bux.WithDebugging(), bux.WithAutoMigrate(bux.BaseModels...), - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks - bux.WithModels(NewExample("example-field")), // Add additional custom models to Bux + bux.WithModels(NewExample("example-field")), // Add additional custom models to Bux ) if err != nil { log.Fatalln("error: " + err.Error()) diff --git a/examples/client/custom_rates/custom_rates.go b/examples/client/custom_rates/custom_rates.go index b7a638b7..9593fc3d 100644 --- a/examples/client/custom_rates/custom_rates.go +++ b/examples/client/custom_rates/custom_rates.go @@ -8,12 +8,10 @@ import ( "github.com/BuxOrg/bux" "github.com/BuxOrg/bux/chainstate" - "github.com/BuxOrg/bux/taskmanager" "github.com/tonicpow/go-minercraft/v2" ) func main() { - const testXPub = "xpub661MyMwAqRbcFrBJbKwBGCB7d3fr2SaAuXGM95BA62X41m6eW2ehRQGW4xLi9wkEXUGnQZYxVVj4PxXnyrLk7jdqvBAs1Qq9gf6ykMvjR7J" // Create a custom miner (using your api key for custom rates) @@ -25,8 +23,8 @@ func main() { APIs: []minercraft.API{ { Token: os.Getenv("BUX_TAAL_API_KEY"), - URL: "https://tapi.taal.com/arc", - Type: minercraft.Arc, + URL: "https://tapi.taal.com/arc", + Type: minercraft.Arc, }, }, }, @@ -34,10 +32,9 @@ func main() { // Create the client client, err := bux.NewClient( - context.Background(), // Set context - bux.WithAutoMigrate(bux.BaseModels...), // All models - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks - bux.WithBroadcastMiners([]*chainstate.Miner{{Miner: minerTaal}}), // This will auto-fetch a policy using the token (api key) + context.Background(), // Set context + bux.WithAutoMigrate(bux.BaseModels...), // All models + bux.WithBroadcastMiners([]*chainstate.Miner{{Miner: minerTaal}}), // This will auto-fetch a policy using the token (api key) bux.WithMinercraftAPIs(minerCraftApis), bux.WithArc(), ) diff --git a/examples/client/custom_user_agent/custom_user_agent.go b/examples/client/custom_user_agent/custom_user_agent.go index b0949d0f..3a0cb084 100644 --- a/examples/client/custom_user_agent/custom_user_agent.go +++ b/examples/client/custom_user_agent/custom_user_agent.go @@ -5,14 +5,12 @@ import ( "log" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" ) func main() { client, err := bux.NewClient( - context.Background(), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks - bux.WithUserAgent("my-custom-user-agent"), // Custom user agent + context.Background(), // Set context + bux.WithUserAgent("my-custom-user-agent"), // Custom user agent ) if err != nil { log.Fatalln("error: " + err.Error()) diff --git a/examples/client/debugging/debugging.go b/examples/client/debugging/debugging.go index 30818b0b..9b74082b 100644 --- a/examples/client/debugging/debugging.go +++ b/examples/client/debugging/debugging.go @@ -5,14 +5,12 @@ import ( "log" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" ) func main() { client, err := bux.NewClient( context.Background(), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks - bux.WithDebugging(), // Enable debugging (verbose logs) + bux.WithDebugging(), // Enable debugging (verbose logs) ) if err != nil { log.Fatalln("error: " + err.Error()) diff --git a/examples/client/encryption/encryption.go b/examples/client/encryption/encryption.go index fdd20dce..92cdaeed 100644 --- a/examples/client/encryption/encryption.go +++ b/examples/client/encryption/encryption.go @@ -6,14 +6,12 @@ import ( "os" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" ) func main() { client, err := bux.NewClient( context.Background(), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks - bux.WithDebugging(), // Enable debugging (verbose logs) + bux.WithDebugging(), // Enable debugging (verbose logs) bux.WithEncryption(os.Getenv("BUX_ENCRYPTION_KEY")), // Encryption key for external public keys (paymail) ) if err != nil { diff --git a/examples/client/logging/logging.go b/examples/client/logging/logging.go index c1824118..92ec76b2 100644 --- a/examples/client/logging/logging.go +++ b/examples/client/logging/logging.go @@ -2,18 +2,17 @@ package main import ( "context" - "github.com/BuxOrg/bux/logging" "log" + "github.com/BuxOrg/bux/logging" + "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" ) func main() { client, err := bux.NewClient( - context.Background(), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks - bux.WithLogger(logging.GetDefaultLogger()), // Example of using a custom logger + context.Background(), // Set context + bux.WithLogger(logging.GetDefaultLogger()), // Example of using a custom logger ) if err != nil { log.Fatalln("error: " + err.Error()) diff --git a/examples/client/monitor/monitor.go b/examples/client/monitor/monitor.go index aaa17e65..bdfe02d0 100644 --- a/examples/client/monitor/monitor.go +++ b/examples/client/monitor/monitor.go @@ -10,7 +10,6 @@ import ( "github.com/BuxOrg/bux" "github.com/BuxOrg/bux/chainstate" - "github.com/BuxOrg/bux/taskmanager" ) func main() { @@ -38,7 +37,7 @@ func main() { ProcessorType: chainstate.FilterRegex, SaveTransactionDestinations: false, }), - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks + bux.WithDebugging(), // Enable debugging (verbose logs) bux.WithChainstateOptions(true, true, true, true), // Broadcasting enabled by default bux.WithAutoMigrate(bux.BaseModels...), diff --git a/examples/client/mysql/mysql.go b/examples/client/mysql/mysql.go index 69e01b78..bd65566b 100644 --- a/examples/client/mysql/mysql.go +++ b/examples/client/mysql/mysql.go @@ -7,7 +7,6 @@ import ( "time" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" "github.com/mrz1836/go-datastore" ) @@ -35,7 +34,6 @@ func main() { User: os.Getenv("DB_USER"), }), bux.WithPaymailSupport([]string{"test.com"}, "example@test.com", "Example note", false, false), - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks bux.WithAutoMigrate(bux.BaseModels...), ) if err != nil { diff --git a/examples/client/new/new.go b/examples/client/new/new.go index 0083657a..8255e24b 100644 --- a/examples/client/new/new.go +++ b/examples/client/new/new.go @@ -5,13 +5,11 @@ import ( "log" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" ) func main() { client, err := bux.NewClient( context.Background(), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks ) if err != nil { log.Fatalln("error: " + err.Error()) diff --git a/examples/client/new_relic/new_relic.go b/examples/client/new_relic/new_relic.go index 6cb13d8e..d20c8b1f 100644 --- a/examples/client/new_relic/new_relic.go +++ b/examples/client/new_relic/new_relic.go @@ -5,13 +5,11 @@ import ( "log" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" "github.com/BuxOrg/bux/tester" "github.com/newrelic/go-agent/v3/newrelic" ) func main() { - // EXAMPLE: new relic application // replace this with your ALREADY EXISTING new relic application app, err := tester.GetNewRelicApp("test-app") @@ -21,8 +19,7 @@ func main() { var client bux.ClientInterface client, err = bux.NewClient( - newrelic.NewContext(context.Background(), app.StartTransaction("test-txn")), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks + newrelic.NewContext(context.Background(), app.StartTransaction("test-txn")), // Set context bux.WithNewRelic(app), // New relic application (from your own application or server) ) if err != nil { diff --git a/examples/client/paymail_support/paymail_support.go b/examples/client/paymail_support/paymail_support.go index d1dab884..11379cdc 100644 --- a/examples/client/paymail_support/paymail_support.go +++ b/examples/client/paymail_support/paymail_support.go @@ -5,13 +5,11 @@ import ( "log" "github.com/BuxOrg/bux" - "github.com/BuxOrg/bux/taskmanager" ) func main() { client, err := bux.NewClient( context.Background(), // Set context - bux.WithTaskQ(taskmanager.DefaultTaskQConfig("test_queue"), taskmanager.FactoryMemory), // Tasks bux.WithPaymailSupport( []string{"test.com"}, "from@test.com", diff --git a/examples/client/redis/redis.go b/examples/client/redis/redis.go index ee425594..6f979787 100644 --- a/examples/client/redis/redis.go +++ b/examples/client/redis/redis.go @@ -6,7 +6,6 @@ import ( "github.com/BuxOrg/bux" "github.com/BuxOrg/bux/taskmanager" - "github.com/go-redis/redis/v8" "github.com/mrz1836/go-cachestore" ) @@ -15,9 +14,9 @@ func main() { client, err := bux.NewClient( context.Background(), // Set context bux.WithRedis(&cachestore.RedisConfig{URL: redisURL}), // Cache - bux.WithTaskQUsingRedis( // Tasks - taskmanager.DefaultTaskQConfig("example_queue"), - &redis.Options{Addr: redisURL}), + bux.WithTaskqConfig( // Tasks + taskmanager.DefaultTaskQConfig("example_queue", taskmanager.WithRedis(redisURL)), + ), ) if err != nil { log.Fatalln("error: " + err.Error()) diff --git a/interface.go b/interface.go index 53a85d14..5e661208 100644 --- a/interface.go +++ b/interface.go @@ -66,7 +66,7 @@ type ClientService interface { Logger() *zerolog.Logger Notifications() notifications.ClientInterface PaymailClient() paymail.ClientInterface - Taskmanager() taskmanager.ClientInterface + Taskmanager() taskmanager.TaskEngine } // DestinationService is the destination actions diff --git a/model_access_keys_test.go b/model_access_keys_test.go index 9f0e7a6e..0f80dc97 100644 --- a/model_access_keys_test.go +++ b/model_access_keys_test.go @@ -30,7 +30,7 @@ func Test_newAccessKey(t *testing.T) { }) t.Run("save", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() key := newAccessKey(testXPubID, append( @@ -56,7 +56,7 @@ func Test_newAccessKey(t *testing.T) { }) t.Run("revoke", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() key := newAccessKey(testXPubID, append(client.DefaultModelOptions(), New())...) @@ -88,7 +88,7 @@ func Test_newAccessKey(t *testing.T) { // TestAccessKey_GetAccessKey will test the method getAccessKey() func TestAccessKey_GetAccessKey(t *testing.T) { t.Run("not found", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() accessKey, err := getAccessKey(ctx, testXPubID, client.DefaultModelOptions()...) require.NoError(t, err) @@ -116,7 +116,7 @@ func TestAccessKey_GetAccessKey(t *testing.T) { // TestAccessKey_GetAccessKeys will test the method getAccessKeysByXPubID() func TestAccessKey_GetAccessKeys(t *testing.T) { t.Run("not found", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() accessKey, err := getAccessKeysByXPubID(ctx, testXPubID, nil, nil, nil, client.DefaultModelOptions()...) require.NoError(t, err) diff --git a/model_destinations_test.go b/model_destinations_test.go index 9f4cde1e..baa1fe84 100644 --- a/model_destinations_test.go +++ b/model_destinations_test.go @@ -15,10 +15,12 @@ import ( // todo: finish unit tests! -var testLockingScript = "76a9147ff514e6ae3deb46e6644caac5cdd0bf2388906588ac" -var testAddressID = "fc1e635d98151c6008f29908ee2928c60c745266f9853e945c917b1baa05973e" -var testDestinationID = "c775e7b757ede630cd0aa1113bd102661ab38829ca52a6422ab782862f268646" -var stasHex = "76a9146d3562a8ec96bcb3b2253fd34f38a556fb66733d88ac6976aa607f5f7f7c5e7f7c5d7f7c5c7f7c5b7f7c5a7f7c597f7c587f7c577f7c567f7c557f7c547f7c537f7c527f7c517f7c7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7c5f7f7c5e7f7c5d7f7c5c7f7c5b7f7c5a7f7c597f7c587f7c577f7c567f7c557f7c547f7c537f7c527f7c517f7c7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e01007e818b21414136d08c5ed2bf3ba048afe6dcaebafeffffffffffffffffffffffffffffff007d976e7c5296a06394677768827601249301307c7e23022079be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798027e7c7e7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e01417e21038ff83d8cf12121491609c4939dc11c4aa35503508fe432dc5a5c1905608b9218ad547f7701207f01207f7701247f517f7801007e8102fd00a063546752687f7801007e817f727e7b01177f777b557a766471567a577a786354807e7e676d68aa880067765158a569765187645294567a5379587a7e7e78637c8c7c53797e577a7e6878637c8c7c53797e577a7e6878637c8c7c53797e577a7e6878637c8c7c53797e577a7e6878637c8c7c53797e577a7e6867567a6876aa587a7d54807e577a597a5a7a786354807e6f7e7eaa727c7e676d6e7eaa7c687b7eaa587a7d877663516752687c72879b69537a647500687c7b547f77517f7853a0916901247f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e816854937f77788c6301247f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e816854937f777852946301247f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e816854937f77686877517f7c52797d8b9f7c53a09b91697c76638c7c587f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e81687f777c6876638c7c587f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e81687f777c6863587f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e81687f7768587f517f7801007e817602fc00a06302fd00a063546752687f7801007e81727e7b7b687f75537f7c0376a9148801147f775379645579887567726881766968789263556753687a76026c057f7701147f8263517f7c766301007e817f7c6775006877686b537992635379528763547a6b547a6b677c6b567a6b537a7c717c71716868547a587f7c81547a557964936755795187637c686b687c547f7701207f75748c7a7669765880748c7a76567a876457790376a9147e7c7e557967041976a9147c7e0288ac687e7e5579636c766976748c7a9d58807e6c0376a9147e748c7a7e6c7e7e676c766b8263828c007c80517e846864745aa0637c748c7a76697d937b7b58807e56790376a9147e748c7a7e55797e7e6868686c567a5187637500678263828c007c80517e846868647459a0637c748c7a76697d937b7b58807e55790376a9147e748c7a7e55797e7e687459a0637c748c7a76697d937b7b58807e55790376a9147e748c7a7e55797e7e68687c537a9d547963557958807e041976a91455797e0288ac7e7e68aa87726d77776a14f566909f378788e61108d619e40df2757455d14c010005546f6b656e" +var ( + testLockingScript = "76a9147ff514e6ae3deb46e6644caac5cdd0bf2388906588ac" + testAddressID = "fc1e635d98151c6008f29908ee2928c60c745266f9853e945c917b1baa05973e" + testDestinationID = "c775e7b757ede630cd0aa1113bd102661ab38829ca52a6422ab782862f268646" + stasHex = "76a9146d3562a8ec96bcb3b2253fd34f38a556fb66733d88ac6976aa607f5f7f7c5e7f7c5d7f7c5c7f7c5b7f7c5a7f7c597f7c587f7c577f7c567f7c557f7c547f7c537f7c527f7c517f7c7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7c5f7f7c5e7f7c5d7f7c5c7f7c5b7f7c5a7f7c597f7c587f7c577f7c567f7c557f7c547f7c537f7c527f7c517f7c7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e01007e818b21414136d08c5ed2bf3ba048afe6dcaebafeffffffffffffffffffffffffffffff007d976e7c5296a06394677768827601249301307c7e23022079be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798027e7c7e7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c8276638c687f7c7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e7e01417e21038ff83d8cf12121491609c4939dc11c4aa35503508fe432dc5a5c1905608b9218ad547f7701207f01207f7701247f517f7801007e8102fd00a063546752687f7801007e817f727e7b01177f777b557a766471567a577a786354807e7e676d68aa880067765158a569765187645294567a5379587a7e7e78637c8c7c53797e577a7e6878637c8c7c53797e577a7e6878637c8c7c53797e577a7e6878637c8c7c53797e577a7e6878637c8c7c53797e577a7e6867567a6876aa587a7d54807e577a597a5a7a786354807e6f7e7eaa727c7e676d6e7eaa7c687b7eaa587a7d877663516752687c72879b69537a647500687c7b547f77517f7853a0916901247f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e816854937f77788c6301247f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e816854937f777852946301247f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e816854937f77686877517f7c52797d8b9f7c53a09b91697c76638c7c587f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e81687f777c6876638c7c587f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e81687f777c6863587f77517f7c01007e817602fc00a06302fd00a063546752687f7c01007e81687f7768587f517f7801007e817602fc00a06302fd00a063546752687f7801007e81727e7b7b687f75537f7c0376a9148801147f775379645579887567726881766968789263556753687a76026c057f7701147f8263517f7c766301007e817f7c6775006877686b537992635379528763547a6b547a6b677c6b567a6b537a7c717c71716868547a587f7c81547a557964936755795187637c686b687c547f7701207f75748c7a7669765880748c7a76567a876457790376a9147e7c7e557967041976a9147c7e0288ac687e7e5579636c766976748c7a9d58807e6c0376a9147e748c7a7e6c7e7e676c766b8263828c007c80517e846864745aa0637c748c7a76697d937b7b58807e56790376a9147e748c7a7e55797e7e6868686c567a5187637500678263828c007c80517e846868647459a0637c748c7a76697d937b7b58807e55790376a9147e748c7a7e55797e7e687459a0637c748c7a76697d937b7b58807e55790376a9147e748c7a7e55797e7e68687c537a9d547963557958807e041976a91455797e0288ac7e7e68aa87726d77776a14f566909f378788e61108d619e40df2757455d14c010005546f6b656e" +) // TestDestination_newDestination will test the method newDestination() func TestDestination_newDestination(t *testing.T) { @@ -86,7 +88,6 @@ func TestDestination_newAddress(t *testing.T) { assert.Equal(t, bscript2.ScriptTypePubKeyHash, address.Type) assert.Equal(t, testAddressID, address.GetID()) }) - } // TestDestination_GetModelName will test the method GetModelName() @@ -126,7 +127,6 @@ func TestDestination_GetID(t *testing.T) { // TestDestination_setAddress will test the method setAddress() func TestDestination_setAddress(t *testing.T) { - t.Run("internal 1", func(t *testing.T) { destination := newDestination(testXPubID, testLockingScript) destination.Chain = utils.ChainInternal @@ -166,9 +166,8 @@ func TestDestination_setAddress(t *testing.T) { // TestDestination_getDestinationByID will test the method getDestinationByID() func TestDestination_getDestinationByID(t *testing.T) { - t.Run("does not exist", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub, err := getDestinationByID(ctx, testDestinationID, client.DefaultModelOptions()...) @@ -177,7 +176,7 @@ func TestDestination_getDestinationByID(t *testing.T) { }) t.Run("get", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() destination := newDestination(testXPubID, testLockingScript, client.DefaultModelOptions()...) @@ -198,9 +197,8 @@ func TestDestination_getDestinationByID(t *testing.T) { // TestDestination_getDestinationByAddress will test the method getDestinationByAddress() func TestDestination_getDestinationByAddress(t *testing.T) { - t.Run("does not exist", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub, err := getDestinationByAddress(ctx, testExternalAddress, client.DefaultModelOptions()...) @@ -209,7 +207,7 @@ func TestDestination_getDestinationByAddress(t *testing.T) { }) t.Run("get", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() destination := newDestination(testXPubID, testLockingScript, client.DefaultModelOptions()...) @@ -230,9 +228,8 @@ func TestDestination_getDestinationByAddress(t *testing.T) { // TestDestination_getDestinationByLockingScript will test the method getDestinationByLockingScript() func TestDestination_getDestinationByLockingScript(t *testing.T) { - t.Run("does not exist", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub, err := getDestinationByLockingScript(ctx, testLockingScript, client.DefaultModelOptions()...) @@ -241,7 +238,7 @@ func TestDestination_getDestinationByLockingScript(t *testing.T) { }) t.Run("get destination", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() destination := newDestination(testXPubID, testLockingScript, client.DefaultModelOptions()...) @@ -269,9 +266,8 @@ func BenchmarkDestination_newAddress(b *testing.B) { // TestClient_NewDestination will test the method NewDestination() func TestClient_NewDestination(t *testing.T) { - t.Run("valid - simple destination", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() // Get new random key @@ -304,7 +300,7 @@ func TestClient_NewDestination(t *testing.T) { }) t.Run("error - invalid xPub", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() opts := append( @@ -324,7 +320,7 @@ func TestClient_NewDestination(t *testing.T) { }) t.Run("error - xPub not found", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() opts := append( @@ -345,7 +341,7 @@ func TestClient_NewDestination(t *testing.T) { }) t.Run("error - unsupported destination type", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() // Get new random key @@ -369,7 +365,7 @@ func TestClient_NewDestination(t *testing.T) { }) t.Run("stas token", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() // Get new random key diff --git a/model_draft_transactions_test.go b/model_draft_transactions_test.go index f64cb0a2..2f64f914 100644 --- a/model_draft_transactions_test.go +++ b/model_draft_transactions_test.go @@ -144,7 +144,7 @@ func TestDraftTransaction_getOutputSatoshis(t *testing.T) { // TestDraftTransaction_setChangeDestinations sets the given of change destinations on the draft transaction func TestDraftTransaction_setChangeDestinations(t *testing.T) { t.Run("1 change destination", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -163,7 +163,7 @@ func TestDraftTransaction_setChangeDestinations(t *testing.T) { }) t.Run("5 change destinations", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -185,7 +185,7 @@ func TestDraftTransaction_setChangeDestinations(t *testing.T) { // TestDraftTransaction_getDraftTransactionID tests getting the draft transaction by draft id func TestDraftTransaction_getDraftTransactionID(t *testing.T) { t.Run("not found", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() draftTx, err := getDraftTransactionID(ctx, testXPubID, testDraftID, client.DefaultModelOptions()...) require.NoError(t, err) @@ -193,7 +193,7 @@ func TestDraftTransaction_getDraftTransactionID(t *testing.T) { }) t.Run("found by draft id", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() draftTransaction := newDraftTransaction(testXPub, &TransactionConfig{}, client.DefaultModelOptions()...) err := draftTransaction.Save(ctx) @@ -210,7 +210,7 @@ func TestDraftTransaction_getDraftTransactionID(t *testing.T) { // TestDraftTransaction_createTransaction create a transaction hex func TestDraftTransaction_createTransaction(t *testing.T) { t.Run("empty transaction", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() draftTransaction := newDraftTransaction(testXPub, &TransactionConfig{}, append(client.DefaultModelOptions(), New())...) @@ -219,7 +219,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { }) t.Run("transaction with no utxos", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() draftTransaction := newDraftTransaction(testXPub, &TransactionConfig{ Outputs: []*TransactionOutput{{ @@ -233,7 +233,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { }) t.Run("transaction with utxos", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -307,7 +307,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { }) t.Run("send to all", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -349,7 +349,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { }) t.Run("fee calculation - MAP", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -405,7 +405,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { }) t.Run("fee calculation - MAP 2", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -469,7 +469,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { }) t.Run("fee calculation - tonicpow", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -526,7 +526,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { }) t.Run("send to all - multiple utxos", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -576,7 +576,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { }) t.Run("send to all - selected utxos", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -633,7 +633,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { }) t.Run("include utxos - tokens", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -804,7 +804,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { t.Run("SendAllTo + 2 utxos", func(t *testing.T) { p := newTestPaymailClient(t, []string{"handcash.io"}) ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, - WithCustomTaskManager(&taskManagerMockBase{}), + withTaskManagerMockup(), WithPaymailClient(p), ) defer deferMe() @@ -911,7 +911,7 @@ func TestDraftTransaction_createTransaction(t *testing.T) { // TestDraftTransaction_setChangeDestination setting the change destination func TestDraftTransaction_setChangeDestination(t *testing.T) { t.Run("missing xpub", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() draftTransaction := &DraftTransaction{ Model: *NewBaseModel( @@ -932,7 +932,7 @@ func TestDraftTransaction_setChangeDestination(t *testing.T) { }) t.Run("set valid destination", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) xPub.NextExternalNum = 121 @@ -967,7 +967,7 @@ func TestDraftTransaction_setChangeDestination(t *testing.T) { }) t.Run("use existing output", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -1001,7 +1001,7 @@ func TestDraftTransaction_setChangeDestination(t *testing.T) { }) t.Run("use existing outputs", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) err := xPub.Save(ctx) @@ -1544,7 +1544,7 @@ func TestDraftTransaction_SignInputs(t *testing.T) { } func initSimpleTestCase(t *testing.T) (context.Context, ClientInterface, func()) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) xPub := newXpub(testXPub, append(client.DefaultModelOptions(), New())...) xPub.CurrentBalance = 100000 diff --git a/model_sync_transactions_test.go b/model_sync_transactions_test.go index 34cfe5ca..e57044e4 100644 --- a/model_sync_transactions_test.go +++ b/model_sync_transactions_test.go @@ -25,7 +25,7 @@ func TestSyncTransaction_GetModelName(t *testing.T) { } func Test_areParentsBroadcast(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() opts := []ModelOps{WithClient(client)} @@ -113,7 +113,7 @@ func TestSyncTransaction_SaveHook(t *testing.T) { t.Run("trim Results to last 20 messages", func(t *testing.T) { // Given - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() opts := []ModelOps{WithClient(client), New()} @@ -133,5 +133,4 @@ func TestSyncTransaction_SaveHook(t *testing.T) { resultsLen := len(syncTx.Results.Results) require.Equal(t, 20, resultsLen) }) - } diff --git a/model_transactions_test.go b/model_transactions_test.go index c99ce134..09113837 100644 --- a/model_transactions_test.go +++ b/model_transactions_test.go @@ -105,7 +105,7 @@ func TestTransaction_newTransactionWithDraftID(t *testing.T) { // TestTransaction_getTransactionByID will test the method getTransactionByID() func TestTransaction_getTransactionByID(t *testing.T) { t.Run("not found", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() transaction, err := getTransactionByID(ctx, testXPubID, testTxID, client.DefaultModelOptions()...) require.NoError(t, err) @@ -135,7 +135,7 @@ func TestTransaction_getTransactionByID(t *testing.T) { // TestTransaction_getTransactionsByXpubID will test the method getTransactionsByXpubID() func TestTransaction_getTransactionsByXpubID(t *testing.T) { t.Run("tx not found", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() transactions, err := getTransactionsByXpubID(ctx, testXPub, nil, nil, nil, client.DefaultModelOptions()...) require.NoError(t, err) @@ -517,7 +517,7 @@ func TestTransaction_processInputs(t *testing.T) { }) t.Run("inputUtxoChecksOff", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{}), WithIUCDisabled()) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup(), WithIUCDisabled()) defer deferMe() transaction, err := txFromHex(testTxHex, append(client.DefaultModelOptions(), New())...) diff --git a/model_utxos_test.go b/model_utxos_test.go index 0b24fc04..a30b3733 100644 --- a/model_utxos_test.go +++ b/model_utxos_test.go @@ -90,7 +90,7 @@ func TestUtxo_getUtxo(t *testing.T) { // t.Parallel() t.Run("getUtxo empty", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() utxo, err := getUtxo(ctx, testTxID, 12, client.DefaultModelOptions()...) assert.NoError(t, err) @@ -98,7 +98,7 @@ func TestUtxo_getUtxo(t *testing.T) { }) t.Run("getUtxo", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() _utxo := newUtxo(testXPubID, testTxID, testLockingScript, 12, 1225, append(client.DefaultModelOptions(), New())...) _ = _utxo.Save(ctx) @@ -112,7 +112,7 @@ func TestUtxo_getUtxo(t *testing.T) { // TestUtxo_getUtxosByXpubID will test the method getUtxosByXpubID() func TestUtxo_getUtxosByXpubID(t *testing.T) { t.Run("getUtxos empty", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() utxos, err := getUtxosByXpubID( @@ -127,7 +127,7 @@ func TestUtxo_getUtxosByXpubID(t *testing.T) { }) t.Run("getUtxos", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -156,7 +156,7 @@ func TestUtxo_GetModelName(t *testing.T) { // TestUtxo_UnReserveUtxos un-reserve utxos func TestUtxo_UnReserveUtxos(t *testing.T) { t.Run("un-reserve 2000", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) @@ -188,7 +188,7 @@ func TestUtxo_UnReserveUtxos(t *testing.T) { // TestUtxo_ReserveUtxos reserve utxos func TestUtxo_ReserveUtxos(t *testing.T) { t.Run("reserve 1000", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -202,7 +202,7 @@ func TestUtxo_ReserveUtxos(t *testing.T) { }) t.Run("reserve 2000", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -218,7 +218,7 @@ func TestUtxo_ReserveUtxos(t *testing.T) { }) t.Run("reserve 20000", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -228,7 +228,7 @@ func TestUtxo_ReserveUtxos(t *testing.T) { }) t.Run("reserve fromUtxos", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -249,7 +249,7 @@ func TestUtxo_ReserveUtxos(t *testing.T) { }) t.Run("reserve fromUtxos 2", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -277,7 +277,7 @@ func TestUtxo_ReserveUtxos(t *testing.T) { }) t.Run("reserve fromUtxos err", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -291,7 +291,7 @@ func TestUtxo_ReserveUtxos(t *testing.T) { }) t.Run("reserve utxos paginated", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -303,7 +303,7 @@ func TestUtxo_ReserveUtxos(t *testing.T) { }) t.Run("duplicate inputs", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() opts := append(client.DefaultModelOptions(), New()) @@ -327,7 +327,7 @@ func TestUtxo_ReserveUtxos(t *testing.T) { // TestUtxo_GetSpendableUtxos get spendable utxos func TestUtxo_GetSpendableUtxos(t *testing.T) { t.Run("spendable", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -362,7 +362,7 @@ func TestUtxo_GetSpendableUtxos(t *testing.T) { }) t.Run("paginated spendable", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() err := createTestUtxos(ctx, client) require.NoError(t, err) @@ -398,7 +398,7 @@ func TestUtxo_Save(t *testing.T) { // t.Parallel() t.Run("Save empty", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() _utxo := newUtxo("", "", "", 0, 0, append(client.DefaultModelOptions(), New())...) err := _utxo.Save(ctx) @@ -406,7 +406,7 @@ func TestUtxo_Save(t *testing.T) { }) t.Run("Save", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() index := uint32(12) satoshis := uint64(1225) diff --git a/model_xpubs_test.go b/model_xpubs_test.go index 1afbef17..5edfa869 100644 --- a/model_xpubs_test.go +++ b/model_xpubs_test.go @@ -38,9 +38,8 @@ func TestXpub_newXpub(t *testing.T) { // TestXpub_getXpub will test the method getXpub() func TestXpub_getXpub(t *testing.T) { - t.Run("get xpub - does not exist", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub, err := getXpub(ctx, testXPub, client.DefaultModelOptions()...) @@ -49,7 +48,7 @@ func TestXpub_getXpub(t *testing.T) { }) t.Run("get xpub", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, client.DefaultModelOptions()...) @@ -80,9 +79,8 @@ func TestXpub_GetID(t *testing.T) { // TestXpub_getNewDestination will test the method GetNewDestination() func TestXpub_getNewDestination(t *testing.T) { - t.Run("err destination", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub("test", client.DefaultModelOptions()...) err := xPub.Save(ctx) @@ -96,7 +94,7 @@ func TestXpub_getNewDestination(t *testing.T) { }) t.Run("new internal destination", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, client.DefaultModelOptions()...) err := xPub.Save(ctx) @@ -119,7 +117,7 @@ func TestXpub_getNewDestination(t *testing.T) { }) t.Run("new external destination", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, client.DefaultModelOptions()...) err := xPub.Save(ctx) @@ -145,7 +143,7 @@ func TestXpub_getNewDestination(t *testing.T) { // TestXpub_childModels will test the method ChildModels() func TestXpub_childModels(t *testing.T) { t.Run("with 1 child model", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, client.DefaultModelOptions()...) err := xPub.Save(ctx) @@ -160,7 +158,7 @@ func TestXpub_childModels(t *testing.T) { }) t.Run("with 2 child model", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, false, withTaskManagerMockup()) defer deferMe() xPub := newXpub(testXPub, client.DefaultModelOptions()...) err := xPub.Save(ctx) @@ -196,7 +194,7 @@ func TestXpub_BeforeCreating(t *testing.T) { }) t.Run("valid random xpub", func(t *testing.T) { - ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, WithCustomTaskManager(&taskManagerMockBase{})) + ctx, client, deferMe := CreateTestSQLiteClient(t, false, true, withTaskManagerMockup()) defer deferMe() _, xPub, _ := CreateNewXPub(ctx, t, client) @@ -251,6 +249,7 @@ func TestXpub_AfterUpdated(t *testing.T) { opts := DefaultClientOpts(false, false) client, err := NewClient(context.Background(), opts...) + require.NoError(t, err) xPub.client = client err = xPub.BeforeUpdating(context.Background()) @@ -264,7 +263,6 @@ func TestXpub_AfterUpdated(t *testing.T) { // TestXpub_RemovePrivateData will test the method RemovePrivateData() func TestXpub_RemovePrivateData(t *testing.T) { - t.Run("remove private data", func(t *testing.T) { xPub := newXpub(testXPub, New()) require.NotNil(t, xPub) @@ -289,7 +287,6 @@ func TestXpub_RemovePrivateData(t *testing.T) { // TestXpub_Save will test the method Save() func (ts *EmbeddedDBTestSuite) TestXpub_Save() { - for _, testCase := range dbTestCases { ts.T().Run(testCase.name+" - valid Save (basic)", func(t *testing.T) { tc := ts.genericDBClient(t, testCase.database, false) diff --git a/paymail_test.go b/paymail_test.go index 1bc0b0eb..9bf0081b 100644 --- a/paymail_test.go +++ b/paymail_test.go @@ -219,7 +219,7 @@ func Test_getCapabilities(t *testing.T) { tc, err := NewClient(context.Background(), WithRedisConnection(redisClient), - WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)), WithSQLite(&datastore.SQLiteConfig{Shared: true}), WithChainstateOptions(false, false, false, false), WithDebugging(), @@ -261,7 +261,7 @@ func Test_getCapabilities(t *testing.T) { tc, err := NewClient(context.Background(), WithRedisConnection(redisClient), - WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)), WithSQLite(&datastore.SQLiteConfig{Shared: true}), WithChainstateOptions(false, false, false, false), WithDebugging(), @@ -374,7 +374,7 @@ func Test_resolvePaymailAddress(t *testing.T) { tc, err := NewClient(context.Background(), WithRedisConnection(redisClient), - WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory), + WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)), WithSQLite(&datastore.SQLiteConfig{Shared: true}), WithChainstateOptions(false, false, false, false), WithDebugging(), diff --git a/taskmanager/client.go b/taskmanager/client.go deleted file mode 100644 index 945d2d67..00000000 --- a/taskmanager/client.go +++ /dev/null @@ -1,166 +0,0 @@ -package taskmanager - -import ( - "context" - "errors" - - "github.com/BuxOrg/bux/logging" - "github.com/newrelic/go-agent/v3/newrelic" - "github.com/rs/zerolog" - taskq "github.com/vmihailenco/taskq/v3" -) - -type ( - - // Client is the taskmanager client (configuration) - Client struct { - options *clientOptions - } - - // clientOptions holds all the configuration for the client - clientOptions struct { - cronService CronService // Internal cron job client - debug bool // For extra logs and additional debug information - engine Engine // Taskmanager engine (taskq or machinery) - logger *zerolog.Logger // Internal logging - newRelicEnabled bool // If NewRelic is enabled (parent application) - taskq *taskqOptions // All configuration and options for using TaskQ - } - - // taskqOptions holds all the configuration for the TaskQ engine - taskqOptions struct { - config *taskq.QueueOptions // Configuration for the TaskQ engine - factory taskq.Factory // Factory for TaskQ (in-memory or Redis) - factoryType Factory // Type of factory to use (in-memory or Redis) - queue taskq.Queue // Queue for TaskQ - tasks map[string]*taskq.Task // Registered tasks - } -) - -// NewClient creates a new client for all TaskManager functionality -// -// If no options are given, it will use the defaultClientOptions() -// ctx may contain a NewRelic txn (or one will be created) -func NewClient(_ context.Context, opts ...ClientOps) (ClientInterface, error) { - // Create a new client with defaults - client := &Client{options: defaultClientOptions()} - - // Overwrite defaults with any set by user - for _, opt := range opts { - opt(client.options) - } - - // Set logger if not set - if client.options.logger == nil { - client.options.logger = logging.GetDefaultLogger() - } - - // EMPTY! Engine was NOT set - if client.Engine().IsEmpty() { - return nil, ErrNoEngine - } - - // Use NewRelic if it's enabled (use existing txn if found on ctx) - // ctx = client.options.getTxnCtx(ctx) - - // Load based on engine - if client.Engine() == Machinery { - return nil, errors.New("machinery is not implemented (yet)") - } else if client.Engine() == TaskQ { - if err := client.loadTaskQ(); err != nil { - return nil, err - } - } - - // Detect if a cron service provider was set - if client.options.cronService == nil { // Use a local cron - client.localCron() - } - - // Return the client - return client, nil -} - -// Close will close client and any open connections -func (c *Client) Close(ctx context.Context) error { - if txn := newrelic.FromContext(ctx); txn != nil { - defer txn.StartSegment("close_taskmanager").End() - } - if c != nil && c.options != nil { - - // Stop the cron scheduler - if c.options.cronService != nil { - c.options.cronService.Stop() - c.options.cronService = nil - } - - if c.options.engine == TaskQ { - - // Close the queue - if err := c.options.taskq.queue.Close(); err != nil { - return err - } - - // Empty all values and reset - c.options.taskq.factoryType = FactoryEmpty - c.options.taskq.config = nil - c.options.taskq.factory = nil - c.options.taskq.queue = nil - } else if c.options.engine == Machinery { - c.DebugLog("not implemented yet") - } - - // Empty the engine - c.options.engine = Empty - } - - return nil -} - -// ResetCron will reset the cron scheduler and all loaded tasks -func (c *Client) ResetCron() { - c.options.cronService.New() - c.options.cronService.Start() -} - -// Debug will set the debug flag -func (c *Client) Debug(on bool) { - c.options.debug = on -} - -// IsDebug will return if debugging is enabled -func (c *Client) IsDebug() bool { - return c.options.debug -} - -// DebugLog will display verbose logs -func (c *Client) DebugLog(text string) { - if c.IsDebug() { - c.options.logger.Info().Msg(text) - } -} - -// IsNewRelicEnabled will return if new relic is enabled -func (c *Client) IsNewRelicEnabled() bool { - return c.options.newRelicEnabled -} - -// Engine will return the engine that is set -func (c *Client) Engine() Engine { - return c.options.engine -} - -// Tasks will return the list of tasks -func (c *Client) Tasks() map[string]*taskq.Task { - return c.options.taskq.tasks -} - -// Factory will return the factory that is set -func (c *Client) Factory() Factory { - if c.Engine() == TaskQ { - return c.options.taskq.factoryType - } else if c.Engine() == Machinery { - return FactoryRedis - } - return FactoryEmpty -} diff --git a/taskmanager/client_options.go b/taskmanager/client_options.go deleted file mode 100644 index de5a018f..00000000 --- a/taskmanager/client_options.go +++ /dev/null @@ -1,83 +0,0 @@ -package taskmanager - -import ( - "context" - - "github.com/newrelic/go-agent/v3/newrelic" - "github.com/rs/zerolog" - taskq "github.com/vmihailenco/taskq/v3" -) - -// ClientOps allow functional options to be supplied -// that overwrite default client options. -type ClientOps func(c *clientOptions) - -// defaultClientOptions will return an clientOptions struct with the default settings -// -// Useful for starting with the default and then modifying as needed -func defaultClientOptions() *clientOptions { - - // Set the default options - return &clientOptions{ - debug: false, - engine: Empty, - newRelicEnabled: false, - taskq: &taskqOptions{ - tasks: make(map[string]*taskq.Task), - }, - } -} - -// GetTxnCtx will check for an existing transaction -func (c *Client) GetTxnCtx(ctx context.Context) context.Context { - if c.options.newRelicEnabled { - txn := newrelic.FromContext(ctx) - if txn != nil { - ctx = newrelic.NewContext(ctx, txn) - } - } - return ctx -} - -// WithNewRelic will enable the NewRelic wrapper -func WithNewRelic() ClientOps { - return func(c *clientOptions) { - c.newRelicEnabled = true - } -} - -// WithDebugging will enable debugging mode -func WithDebugging() ClientOps { - return func(c *clientOptions) { - c.debug = true - } -} - -// WithTaskQ will use the TaskQ engine -func WithTaskQ(config *taskq.QueueOptions, factory Factory) ClientOps { - return func(c *clientOptions) { - if config != nil && !factory.IsEmpty() { - c.engine = TaskQ - c.taskq.config = config - c.taskq.factoryType = factory - } - } -} - -// WithLogger will set the custom logger interface -func WithLogger(customLogger *zerolog.Logger) ClientOps { - return func(c *clientOptions) { - if customLogger != nil { - c.logger = customLogger - } - } -} - -// WithCronService will set the cron service -func WithCronService(cronService CronService) ClientOps { - return func(c *clientOptions) { - if cronService != nil { - c.cronService = cronService - } - } -} diff --git a/taskmanager/client_options_test.go b/taskmanager/client_options_test.go deleted file mode 100644 index dca6cba3..00000000 --- a/taskmanager/client_options_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package taskmanager - -import ( - "testing" - - "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" -) - -// TestWithNewRelic will test the method WithNewRelic() -func TestWithNewRelic(t *testing.T) { - - t.Run("check type", func(t *testing.T) { - opt := WithNewRelic() - assert.IsType(t, *new(ClientOps), opt) - }) - - t.Run("test applying", func(t *testing.T) { - options := &clientOptions{} - opt := WithNewRelic() - opt(options) - assert.Equal(t, true, options.newRelicEnabled) - }) -} - -// TestWithDebugging will test the method WithDebugging() -func TestWithDebugging(t *testing.T) { - - t.Run("check type", func(t *testing.T) { - opt := WithDebugging() - assert.IsType(t, *new(ClientOps), opt) - }) - - t.Run("test applying", func(t *testing.T) { - options := &clientOptions{} - opt := WithDebugging() - opt(options) - assert.Equal(t, true, options.debug) - }) -} - -// TestWithTaskQ will test the method WithTaskQ() -func TestWithTaskQ(t *testing.T) { - t.Run("check type", func(t *testing.T) { - opt := WithTaskQ(nil, FactoryEmpty) - assert.IsType(t, *new(ClientOps), opt) - }) - - t.Run("test applying nil config", func(t *testing.T) { - options := &clientOptions{ - taskq: &taskqOptions{ - config: nil, - factory: nil, - factoryType: "", - queue: nil, - tasks: nil, - }, - } - opt := WithTaskQ(nil, FactoryEmpty) - opt(options) - assert.Equal(t, Factory(""), options.taskq.factoryType) - assert.Nil(t, options.taskq.config) - }) - - t.Run("test applying valid config", func(t *testing.T) { - options := &clientOptions{ - taskq: &taskqOptions{}, - } - opt := WithTaskQ(DefaultTaskQConfig(testQueueName), FactoryMemory) - opt(options) - assert.Equal(t, FactoryMemory, options.taskq.factoryType) - assert.NotNil(t, options.taskq.config) - assert.Equal(t, TaskQ, options.engine) - }) -} - -// TestWithLogger will test the method WithLogger() -func TestWithLogger(t *testing.T) { - t.Parallel() - - t.Run("check type", func(t *testing.T) { - opt := WithLogger(nil) - assert.IsType(t, *new(ClientOps), opt) - }) - - t.Run("test applying nil", func(t *testing.T) { - options := &clientOptions{} - opt := WithLogger(nil) - opt(options) - assert.Nil(t, options.logger) - }) - - t.Run("test applying option", func(t *testing.T) { - options := &clientOptions{} - customLogger := zerolog.Nop() - opt := WithLogger(&customLogger) - opt(options) - assert.Equal(t, &customLogger, options.logger) - }) -} - -// TestWithCronService will test the method WithCronService() -func TestWithCronService(t *testing.T) { - t.Parallel() - - t.Run("check type", func(t *testing.T) { - opt := WithCronService(nil) - assert.IsType(t, *new(ClientOps), opt) - }) - - t.Run("test applying nil", func(t *testing.T) { - options := &clientOptions{} - opt := WithCronService(nil) - opt(options) - assert.Nil(t, options.cronService) - }) - - t.Run("test applying option", func(t *testing.T) { - options := &clientOptions{} - customCron := &cronLocal{} - opt := WithCronService(customCron) - opt(options) - assert.Equal(t, customCron, options.cronService) - }) -} diff --git a/taskmanager/client_test.go b/taskmanager/client_test.go deleted file mode 100644 index 1bbb8907..00000000 --- a/taskmanager/client_test.go +++ /dev/null @@ -1 +0,0 @@ -package taskmanager diff --git a/taskmanager/cron.go b/taskmanager/cron.go deleted file mode 100644 index 78c387df..00000000 --- a/taskmanager/cron.go +++ /dev/null @@ -1,40 +0,0 @@ -package taskmanager - -import "github.com/robfig/cron/v3" - -// localCron will load a local version of cron if it was not provided by the user -func (c *Client) localCron() { - cr := &cronLocal{} - cr.New() - cr.Start() - c.options.cronService = cr -} - -// cronLocal is the interface for the "local cron" service -type cronLocal struct { - cronService *cron.Cron -} - -// New will stop any existing cron service and start a new one -func (c *cronLocal) New() { - if c.cronService != nil { - c.cronService.Stop() - } - c.cronService = cron.New() -} - -// AddFunc will add a function to the cron service -func (c *cronLocal) AddFunc(spec string, cmd func()) (int, error) { - e, err := c.cronService.AddFunc(spec, cmd) - return int(e), err -} - -// Start will start the cron service -func (c *cronLocal) Start() { - c.cronService.Start() -} - -// Stop will stop the cron service -func (c *cronLocal) Stop() { - c.cronService.Stop() -} diff --git a/taskmanager/cron_jobs.go b/taskmanager/cron_jobs.go index 85f88653..7cf467fc 100644 --- a/taskmanager/cron_jobs.go +++ b/taskmanager/cron_jobs.go @@ -18,7 +18,7 @@ type CronJob struct { type CronJobs map[string]CronJob // CronJobsInit registers and runs the cron jobs -func (tm *Client) CronJobsInit(cronJobsMap CronJobs) (err error) { +func (tm *TaskManager) CronJobsInit(cronJobsMap CronJobs) (err error) { tm.ResetCron() defer func() { // stop other, already registered tasks if the func fails @@ -31,23 +31,19 @@ func (tm *Client) CronJobsInit(cronJobsMap CronJobs) (err error) { for name, taskDef := range cronJobsMap { handler := taskDef.Handler - if err = tm.RegisterTask(&Task{ - Name: name, - RetryLimit: 1, - Handler: func() error { - if taskErr := handler(ctx); taskErr != nil { - if tm.options.logger != nil { - tm.options.logger.Error().Msgf("error running %v task: %v", name, taskErr.Error()) - } + if err = tm.RegisterTask(name, func() error { + if taskErr := handler(ctx); taskErr != nil { + if tm.options.logger != nil { + tm.options.logger.Error().Msgf("error running %v task: %v", name, taskErr.Error()) } - return nil - }, + } + return nil }); err != nil { return } // Run the task periodically - if err = tm.RunTask(ctx, &TaskOptions{ + if err = tm.RunTask(ctx, &TaskRunOptions{ RunEveryPeriod: taskDef.Period, TaskName: name, }); err != nil { diff --git a/taskmanager/cron_jobs_test.go b/taskmanager/cron_jobs_test.go index 7191014d..9addfac6 100644 --- a/taskmanager/cron_jobs_test.go +++ b/taskmanager/cron_jobs_test.go @@ -9,16 +9,15 @@ import ( ) func TestCronTasks(t *testing.T) { - makeClient := func() *Client { - client, _ := NewClient( + makeTaskManager := func() *TaskManager { + client, _ := NewTaskManager( context.Background(), - WithTaskQ(DefaultTaskQConfig(testQueueName), FactoryMemory), ) - return client.(*Client) + return client.(*TaskManager) } t.Run("register one cron job", func(t *testing.T) { - client := makeClient() + tm := makeTaskManager() desiredExecutions := 2 @@ -27,9 +26,9 @@ func TestCronTasks(t *testing.T) { } target := &mockTarget{make(chan bool, desiredExecutions)} - err := client.CronJobsInit(CronJobs{ + err := tm.CronJobsInit(CronJobs{ "test": { - Period: 100 * time.Millisecond, + Period: 1 * time.Second, Handler: func(ctx context.Context) error { target.times <- true return nil @@ -51,7 +50,7 @@ func TestCronTasks(t *testing.T) { }) t.Run("register two cron jobs", func(t *testing.T) { - client := makeClient() + tm := makeTaskManager() desiredExecutions := 6 @@ -60,16 +59,16 @@ func TestCronTasks(t *testing.T) { } target := &mockTarget{make(chan int, desiredExecutions)} - err := client.CronJobsInit(CronJobs{ + err := tm.CronJobsInit(CronJobs{ "test1": { - Period: 100 * time.Millisecond, + Period: 1 * time.Second, Handler: func(ctx context.Context) error { target.times <- 1 return nil }, }, "test2": { - Period: 100 * time.Millisecond, + Period: 1 * time.Second, Handler: func(ctx context.Context) error { target.times <- 2 return nil diff --git a/taskmanager/engine.go b/taskmanager/engine.go deleted file mode 100644 index cbd939e2..00000000 --- a/taskmanager/engine.go +++ /dev/null @@ -1,21 +0,0 @@ -package taskmanager - -// Engine is the different types of task manager's that are supported -type Engine string - -// Supported engines -const ( - Empty Engine = "empty" - Machinery Engine = "machinery" - TaskQ Engine = "taskq" -) - -// String is the string version of engine -func (e Engine) String() string { - return string(e) -} - -// IsEmpty will return true if the engine is not set -func (e Engine) IsEmpty() bool { - return e == Empty -} diff --git a/taskmanager/engine_test.go b/taskmanager/engine_test.go deleted file mode 100644 index c1777dae..00000000 --- a/taskmanager/engine_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package taskmanager - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -// TestEngine_String will test the method String() -func TestEngine_String(t *testing.T) { - - t.Run("test all engine names", func(t *testing.T) { - assert.Equal(t, "empty", Empty.String()) - assert.Equal(t, "machinery", Machinery.String()) - assert.Equal(t, "taskq", TaskQ.String()) - }) -} - -// TestEngine_IsEmpty will test the method IsEmpty() -func TestEngine_IsEmpty(t *testing.T) { - t.Run("test empty engine", func(t *testing.T) { - e := Empty - assert.Equal(t, true, e.IsEmpty()) - }) - - t.Run("test regular engine", func(t *testing.T) { - e := Machinery - assert.Equal(t, false, e.IsEmpty()) - }) -} diff --git a/taskmanager/errors.go b/taskmanager/errors.go deleted file mode 100644 index 7d845f91..00000000 --- a/taskmanager/errors.go +++ /dev/null @@ -1,30 +0,0 @@ -package taskmanager - -import "errors" - -// ErrNoEngine is returned when there is no engine set (missing engine) -var ErrNoEngine = errors.New("task manager engine is empty: choose taskq or machinery (IE: WithTaskQ())") - -// ErrMissingTaskQConfig is when the taskq configuration is missing prior to loading taskq -var ErrMissingTaskQConfig = errors.New("missing taskq configuration") - -// ErrMissingRedis is when the Redis connection is missing prior to loading taskq -var ErrMissingRedis = errors.New("missing redis connection") - -// ErrMissingFactory is when the factory type is missing or empty -var ErrMissingFactory = errors.New("missing factory type to load taskq") - -// ErrEngineNotSupported is when a feature is not supported by another engine -var ErrEngineNotSupported = errors.New("engine not supported") - -// ErrTaskNotFound is when a task was not found -var ErrTaskNotFound = errors.New("task not found") - -// ErrMissingTaskName is when the task name is missing -var ErrMissingTaskName = errors.New("missing task name") - -// ErrInvalidTaskDuration is when the task duration is invalid -var ErrInvalidTaskDuration = errors.New("invalid duration for task") - -// ErrNoTasksFound is when there are no tasks found in the taskmanager -var ErrNoTasksFound = errors.New("no tasks found") diff --git a/taskmanager/interface.go b/taskmanager/interface.go index 6c94a4ee..4985640b 100644 --- a/taskmanager/interface.go +++ b/taskmanager/interface.go @@ -6,31 +6,15 @@ import ( taskq "github.com/vmihailenco/taskq/v3" ) -// TaskService is the task related methods -type TaskService interface { - RegisterTask(task *Task) error +// TaskEngine is the taskmanager client interface +type TaskEngine interface { + RegisterTask(name string, handler interface{}) error ResetCron() - RunTask(ctx context.Context, options *TaskOptions) error + RunTask(ctx context.Context, options *TaskRunOptions) error Tasks() map[string]*taskq.Task CronJobsInit(cronJobsMap CronJobs) error -} - -// CronService is the cron service provider -type CronService interface { - AddFunc(spec string, cmd func()) (int, error) - New() - Start() - Stop() -} - -// ClientInterface is the taskmanager client interface -type ClientInterface interface { - TaskService Close(ctx context.Context) error - Debug(on bool) - Engine() Engine Factory() Factory GetTxnCtx(ctx context.Context) context.Context - IsDebug() bool IsNewRelicEnabled() bool } diff --git a/taskmanager/options.go b/taskmanager/options.go new file mode 100644 index 00000000..f6760501 --- /dev/null +++ b/taskmanager/options.go @@ -0,0 +1,34 @@ +package taskmanager + +import ( + "github.com/rs/zerolog" + taskq "github.com/vmihailenco/taskq/v3" +) + +// TaskManagerOptions allow functional options to be supplied +type TaskManagerOptions func(c *options) + +// WithNewRelic will enable the NewRelic wrapper +func WithNewRelic() TaskManagerOptions { + return func(c *options) { + c.newRelicEnabled = true + } +} + +// WithTaskqConfig will set the taskq custom config +func WithTaskqConfig(config *taskq.QueueOptions) TaskManagerOptions { + return func(c *options) { + if config != nil { + c.taskq.config = config + } + } +} + +// WithLogger will set the custom logger interface +func WithLogger(customLogger *zerolog.Logger) TaskManagerOptions { + return func(c *options) { + if customLogger != nil { + c.logger = customLogger + } + } +} diff --git a/taskmanager/options_test.go b/taskmanager/options_test.go new file mode 100644 index 00000000..3fcfbffa --- /dev/null +++ b/taskmanager/options_test.go @@ -0,0 +1,78 @@ +package taskmanager + +import ( + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" +) + +// TestWithNewRelic will test the method WithNewRelic() +func TestWithNewRelic(t *testing.T) { + t.Run("check type", func(t *testing.T) { + opt := WithNewRelic() + assert.IsType(t, *new(TaskManagerOptions), opt) + }) + + t.Run("test applying", func(t *testing.T) { + options := &options{} + opt := WithNewRelic() + opt(options) + assert.Equal(t, true, options.newRelicEnabled) + }) +} + +// TestWithTaskQ will test the method WithTaskQ() +func TestWithTaskQ(t *testing.T) { + t.Run("check type", func(t *testing.T) { + opt := WithTaskqConfig(nil) + assert.IsType(t, *new(TaskManagerOptions), opt) + }) + + t.Run("test applying nil config", func(t *testing.T) { + options := &options{ + taskq: &taskqOptions{ + config: nil, + queue: nil, + tasks: nil, + }, + } + opt := WithTaskqConfig(nil) + opt(options) + assert.Nil(t, options.taskq.config) + }) + + t.Run("test applying valid config", func(t *testing.T) { + options := &options{ + taskq: &taskqOptions{}, + } + opt := WithTaskqConfig(DefaultTaskQConfig("test-queue")) + opt(options) + assert.NotNil(t, options.taskq.config) + }) +} + +// TestWithLogger will test the method WithLogger() +func TestWithLogger(t *testing.T) { + t.Parallel() + + t.Run("check type", func(t *testing.T) { + opt := WithLogger(nil) + assert.IsType(t, *new(TaskManagerOptions), opt) + }) + + t.Run("test applying nil", func(t *testing.T) { + options := &options{} + opt := WithLogger(nil) + opt(options) + assert.Nil(t, options.logger) + }) + + t.Run("test applying option", func(t *testing.T) { + options := &options{} + customLogger := zerolog.Nop() + opt := WithLogger(&customLogger) + opt(options) + assert.Equal(t, &customLogger, options.logger) + }) +} diff --git a/taskmanager/task.go b/taskmanager/task.go deleted file mode 100644 index 6d4146b3..00000000 --- a/taskmanager/task.go +++ /dev/null @@ -1,75 +0,0 @@ -package taskmanager - -import ( - "context" - "time" -) - -// Task is the options for a new task (mimics TaskQ) -type Task struct { - Name string // Task name. - - // Function called to process a message. - // There are three permitted types of signature: - // 1. A zero-argument function - // 2. A function whose arguments are assignable in type from those which are passed in the message - // 3. A function which takes a single `*Message` argument - // The handler function may also optionally take a Context as a first argument and may optionally return an error. - // If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to - // `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/. - Handler interface{} - // Function called to process failed message after the specified number of retries have all failed. - // The FallbackHandler accepts the same types of function as the Handler. - FallbackHandler interface{} - - // Optional function used by Consumer with defer statement to recover from panics. - DeferFunc func() - - // Number of tries/releases after which the message fails permanently and is deleted. Default is 64 retries. - RetryLimit int - - // Minimum backoff time between retries. Default is 30 seconds. - MinBackoff time.Duration - - // Maximum backoff time between retries. Default is 30 minutes. - MaxBackoff time.Duration -} - -// TaskOptions are used for running a task -type TaskOptions struct { - Arguments []interface{} `json:"arguments"` // Arguments for the task - Delay time.Duration `json:"delay"` // Run after X delay - OnceInPeriod time.Duration `json:"once_in_period"` // Run once in X period - RunEveryPeriod time.Duration `json:"run_every_period"` // Cron job! - TaskName string `json:"task_name"` // Name of the task -} - -/* -// todo: add this functionality to the task options -OnceInPeriod(period time.Duration, args ...interface{}) -OnceWithDelay(delay time.Duration) -OnceWithSchedule(tm time.Time) -*/ - -// RegisterTask is a universal method to register a task -func (c *Client) RegisterTask(task *Task) error { - - // Register using TaskQ - if c.Engine() == TaskQ { - c.registerTaskUsingTaskQ(task) - return nil - } - - return ErrEngineNotSupported -} - -// RunTask is a universal method to run a task -func (c *Client) RunTask(ctx context.Context, options *TaskOptions) error { - - // Run using TaskQ - if c.Engine() == TaskQ { - return c.runTaskUsingTaskQ(ctx, options) - } - - return ErrEngineNotSupported -} diff --git a/taskmanager/taskmanager.go b/taskmanager/taskmanager.go index d131ae5d..d73dac58 100644 --- a/taskmanager/taskmanager.go +++ b/taskmanager/taskmanager.go @@ -1,4 +1,137 @@ /* -Package taskmanager is the task/job management service layer for concurrent and asynchronous tasks +Package taskmanager is the task/job management service layer for concurrent and asynchronous tasks with cron scheduling. */ package taskmanager + +import ( + "context" + + "github.com/BuxOrg/bux/logging" + "github.com/newrelic/go-agent/v3/newrelic" + "github.com/robfig/cron/v3" + "github.com/rs/zerolog" + taskq "github.com/vmihailenco/taskq/v3" +) + +type ( + + // TaskManager implements the TaskEngine interface + TaskManager struct { + options *options + } + + options struct { + cronService *cron.Cron // Internal cron job client + logger *zerolog.Logger // Internal logging + newRelicEnabled bool // If NewRelic is enabled (parent application) + taskq *taskqOptions // All configuration and options for using TaskQ + } + + // taskqOptions holds all the configuration for the TaskQ engine + taskqOptions struct { + config *taskq.QueueOptions // Configuration for the TaskQ engine + queue taskq.Queue // Queue for TaskQ + tasks map[string]*taskq.Task // Registered tasks + } +) + +// NewTaskManager creates a new client for all TaskManager functionality +// If no options are given, it will use local memory for the queue. +// ctx may contain a NewRelic txn (or one will be created) +func NewTaskManager(ctx context.Context, opts ...TaskManagerOptions) (TaskEngine, error) { + // Create a new tm with defaults + tm := &TaskManager{options: &options{ + newRelicEnabled: false, + taskq: &taskqOptions{ + tasks: make(map[string]*taskq.Task), + config: DefaultTaskQConfig("taskq"), + }, + }} + + // Overwrite defaults with any set by user + for _, opt := range opts { + opt(tm.options) + } + + if tm.options.logger == nil { + tm.options.logger = logging.GetDefaultLogger() + } + + // Use NewRelic if it's enabled (use existing txn if found on ctx) + // ctx = tm.options.getTxnCtx(ctx) + + if err := tm.loadTaskQ(ctx); err != nil { + return nil, err + } + + tm.ResetCron() + + return tm, nil +} + +// Close the client and any open connections +func (tm *TaskManager) Close(ctx context.Context) error { + if txn := newrelic.FromContext(ctx); txn != nil { + defer txn.StartSegment("close_taskmanager").End() + } + if tm != nil && tm.options != nil { + + // Stop the cron scheduler + if tm.options.cronService != nil { + tm.options.cronService.Stop() + tm.options.cronService = nil + } + + // Close the taskq queue + if err := tm.options.taskq.queue.Close(); err != nil { + return err + } + + // Empty all values and reset + tm.options.taskq.config = nil + tm.options.taskq.queue = nil + } + + return nil +} + +// ResetCron will reset the cron scheduler and all loaded tasks +func (tm *TaskManager) ResetCron() { + if tm.options.cronService != nil { + tm.options.cronService.Stop() + } + tm.options.cronService = cron.New() + tm.options.cronService.Start() +} + +// IsNewRelicEnabled will return if new relic is enabled +func (tm *TaskManager) IsNewRelicEnabled() bool { + return tm.options.newRelicEnabled +} + +// Tasks will return the list of tasks +func (tm *TaskManager) Tasks() map[string]*taskq.Task { + return tm.options.taskq.tasks +} + +// Factory will return the factory that is set +func (tm *TaskManager) Factory() Factory { + if tm.options == nil || tm.options.taskq == nil { + return FactoryEmpty + } + if tm.options.taskq.config.Redis != nil { + return FactoryRedis + } + return FactoryMemory +} + +// GetTxnCtx will check for an existing transaction +func (tm *TaskManager) GetTxnCtx(ctx context.Context) context.Context { + if tm.options.newRelicEnabled { + txn := newrelic.FromContext(ctx) + if txn != nil { + ctx = newrelic.NewContext(ctx, txn) + } + } + return ctx +} diff --git a/taskmanager/taskmanager_test.go b/taskmanager/taskmanager_test.go deleted file mode 100644 index 1bbb8907..00000000 --- a/taskmanager/taskmanager_test.go +++ /dev/null @@ -1 +0,0 @@ -package taskmanager diff --git a/taskmanager/taskq.go b/taskmanager/taskq.go index 8305b3f0..9a2c7eb4 100644 --- a/taskmanager/taskq.go +++ b/taskmanager/taskq.go @@ -3,22 +3,35 @@ package taskmanager import ( "context" "fmt" + "strings" "sync" "time" + "github.com/go-redis/redis/v8" "github.com/go-redis/redis_rate/v9" taskq "github.com/vmihailenco/taskq/v3" "github.com/vmihailenco/taskq/v3/memqueue" "github.com/vmihailenco/taskq/v3/redisq" ) -var ( - mutex sync.Mutex -) +var mutex sync.Mutex + +// TasqOps allow functional options to be supplied +type TasqOps func(*taskq.QueueOptions) + +// WithRedis will set the redis client for the TaskQ engine +// Note: Because we use redis/v8, we need to use Redis lower than 7.2.0 +func WithRedis(addr string) TasqOps { + return func(queueOptions *taskq.QueueOptions) { + queueOptions.Redis = redis.NewClient(&redis.Options{ + Addr: strings.Replace(addr, "redis://", "", -1), + }) + } +} -// DefaultTaskQConfig will return a default configuration that can be modified -func DefaultTaskQConfig(name string) *taskq.QueueOptions { - return &taskq.QueueOptions{ +// DefaultTaskQConfig will return a QueueOptions with specified name and functional options applied +func DefaultTaskQConfig(name string, opts ...TasqOps) *taskq.QueueOptions { + queueOptions := &taskq.QueueOptions{ BufferSize: 10, // Size of the buffer where reserved messages are stored. ConsumerIdleTimeout: 6 * time.Hour, // ConsumerIdleTimeout Time after which the consumer need to be deleted. Handler: nil, // Optional message handler. The default is the global Tasks registry. @@ -36,51 +49,48 @@ func DefaultTaskQConfig(name string) *taskq.QueueOptions { WaitTimeout: 3 * time.Second, // Time that a long polling receive call waits for a message to become available before returning an empty response. WorkerLimit: 0, // Global limit of concurrently running workers across all servers. Overrides MaxNumWorker. } -} -// convertTaskToTaskQ will convert our internal task to a TaskQ struct -func convertTaskToTaskQ(task *Task) *taskq.TaskOptions { - return &taskq.TaskOptions{ - Name: task.Name, - Handler: task.Handler, - FallbackHandler: task.FallbackHandler, - DeferFunc: task.DeferFunc, - RetryLimit: task.RetryLimit, - MinBackoff: task.MinBackoff, - MaxBackoff: task.MaxBackoff, + for _, opt := range opts { + opt(queueOptions) } + + return queueOptions } -// loadTaskQ will load TaskQ based on the Factory Type and configuration set by the client loading -func (c *Client) loadTaskQ() error { +// TaskRunOptions are the options for running a task +type TaskRunOptions struct { + Arguments []interface{} // Arguments for the task + RunEveryPeriod time.Duration // Cron job! + TaskName string // Name of the task +} +func (runOptions *TaskRunOptions) runImmediately() bool { + return runOptions.RunEveryPeriod == 0 +} + +// loadTaskQ will load TaskQ based on the Factory Type and configuration set by the client loading +func (c *TaskManager) loadTaskQ(ctx context.Context) error { // Check for a valid config (set on client creation) - if c.options.taskq.config == nil { - return ErrMissingTaskQConfig + factoryType := c.Factory() + if factoryType == FactoryEmpty { + return fmt.Errorf("missing factory type to load taskq") } - // Load using in-memory vs Redis - if c.options.taskq.factoryType == FactoryMemory { - - // Create the factory - c.options.taskq.factory = memqueue.NewFactory() - - } else if c.options.taskq.factoryType == FactoryRedis { - - // Check for a redis connection (given on taskq configuration) - if c.options.taskq.config.Redis == nil { - return ErrMissingRedis - } - - // Create the factory - c.options.taskq.factory = redisq.NewFactory() - - } else { - return ErrMissingFactory + var factory taskq.Factory + if factoryType == FactoryMemory { + factory = memqueue.NewFactory() + } else if factoryType == FactoryRedis { + factory = redisq.NewFactory() } // Set the queue - c.options.taskq.queue = c.options.taskq.factory.RegisterQueue(c.options.taskq.config) + q := factory.RegisterQueue(c.options.taskq.config) + c.options.taskq.queue = q + if factoryType == FactoryRedis { + if err := q.Consumer().Start(ctx); err != nil { + return err + } + } // turn off logger for now // NOTE: having issues with logger with system resources @@ -89,76 +99,85 @@ func (c *Client) loadTaskQ() error { return nil } -// registerTaskUsingTaskQ will register a new task using the TaskQ engine -func (c *Client) registerTaskUsingTaskQ(task *Task) { - +// RegisterTask will register a new task to handle asynchronously +func (c *TaskManager) RegisterTask(name string, handler interface{}) (err error) { defer func() { - if err := recover(); err != nil { - c.DebugLog(fmt.Sprintf("registering task panic: %v", err)) + if panicErr := recover(); panicErr != nil { + err = fmt.Errorf(fmt.Sprintf("registering task panic: %v", panicErr)) + c.options.logger.Error().Msg(err.Error()) } }() mutex.Lock() + defer mutex.Unlock() - // Check if task is already registered - if t := taskq.Tasks.Get(task.Name); t != nil { - - // Register the task locally - c.options.taskq.tasks[task.Name] = t + if t := taskq.Tasks.Get(name); t != nil { + // if already registered - register the task locally + c.options.taskq.tasks[name] = t + } else { + // Register and store the task + c.options.taskq.tasks[name] = taskq.RegisterTask(&taskq.TaskOptions{ + Name: name, + Handler: handler, + RetryLimit: 1, + }) + } - // Task already exists! - // c.DebugLog(fmt.Sprintf("registering task: %s... task already exists!", task.Name)) + c.options.logger.Debug().Msgf("registering task: %s...", c.options.taskq.tasks[name].Name()) + return nil +} - mutex.Unlock() +// RunTask will run a task using TaskQ +func (c *TaskManager) RunTask(ctx context.Context, options *TaskRunOptions) error { + c.options.logger.Info().Msgf("executing task: %s", options.TaskName) - return + // Try to get the task + task, ok := c.options.taskq.tasks[options.TaskName] + if !ok { + return fmt.Errorf("task %s not registered", options.TaskName) } - // Register and store the task - c.options.taskq.tasks[task.Name] = taskq.RegisterTask(convertTaskToTaskQ(task)) - - mutex.Unlock() + // Task message will be used to add to the queue + taskMessage := task.WithArgs(ctx, options.Arguments...) - // Debugging - c.DebugLog(fmt.Sprintf("registering task: %s...", c.options.taskq.tasks[task.Name].Name())) + if options.runImmediately() { + return c.options.taskq.queue.Add(taskMessage) + } + // Note: The first scheduled run will be after the period has passed + return c.scheduleTaskWithCron(ctx, task, taskMessage, options.RunEveryPeriod) } -// runTaskUsingTaskQ will run a task using TaskQ -func (c *Client) runTaskUsingTaskQ(ctx context.Context, options *TaskOptions) error { +func (c *TaskManager) scheduleTaskWithCron(ctx context.Context, task *taskq.Task, taskMessage *taskq.Message, runEveryPeriod time.Duration) error { + // When using Redis, we need to use a distributed timed lock to prevent the addition of the same task to the queue by multiple instances. + // With this approach, only one instance will add the task to the queue within a given period. + var tryLock func() bool + if c.Factory() == FactoryRedis { + key := fmt.Sprintf("taskq_cronlock_%s", task.Name()) - // Starting the execution of the task - c.DebugLog(fmt.Sprintf( - "executing task: %s... delay: %s arguments: %s", - options.TaskName, - options.Delay.String(), - fmt.Sprintf("%+v", options.Arguments), - )) + // The runEveryPeriod should be greater than 1 second + if runEveryPeriod < 1*time.Second { + return fmt.Errorf("runEveryPeriod should be greater than 1 second") + } - // Try to get the task - if _, ok := c.options.taskq.tasks[options.TaskName]; !ok { - return ErrTaskNotFound - } + // Lock time is the period minus 500ms to allow for some clock drift + lockTime := runEveryPeriod - 500*time.Millisecond - // Add arguments, and delay if set - msg := c.options.taskq.tasks[options.TaskName].WithArgs(ctx, options.Arguments...) - if options.OnceInPeriod > 0 { - msg.OnceInPeriod(options.OnceInPeriod, options.Arguments...) - } else if options.Delay > 0 { - msg.SetDelay(options.Delay) + tryLock = func() bool { + boolCmd := c.options.taskq.config.Redis.SetNX(ctx, key, "1", lockTime) + return boolCmd.Val() + } } - // This is the "cron" aspect of the task - if options.RunEveryPeriod > 0 { - _, err := c.options.cronService.AddFunc( - fmt.Sprintf("@every %ds", int(options.RunEveryPeriod.Seconds())), - func() { - // todo: log the error if it occurs? Cannot pass the error back up - _ = c.options.taskq.queue.Add(msg) - }, - ) - return err + // handler will be called by cron every runEveryPeriod seconds + handler := func() { + if tryLock != nil && !tryLock() { + return + } + _ = c.options.taskq.queue.Add(taskMessage) } - - // Add to the queue - return c.options.taskq.queue.Add(msg) + _, err := c.options.cronService.AddFunc( + fmt.Sprintf("@every %ds", int(runEveryPeriod.Seconds())), + handler, + ) + return err } diff --git a/taskmanager/taskq_test.go b/taskmanager/taskq_test.go index 316ad8be..3431202d 100644 --- a/taskmanager/taskq_test.go +++ b/taskmanager/taskq_test.go @@ -2,76 +2,184 @@ package taskmanager import ( "context" - "fmt" "testing" - "time" + "github.com/BuxOrg/bux/utils" "github.com/stretchr/testify/require" ) -const ( - testQueueName = "test_queue" -) - -// todo: finish unit tests! -// NOTE: these are just tests to get the library built ;) - -func TestNewClient(t *testing.T) { - c, err := NewClient( - context.Background(), - WithTaskQ(DefaultTaskQConfig(testQueueName), FactoryMemory), - ) +// NOTE: because of the taskq package has global state, the names of tasks must be unique +func TestNewTaskManager_Single(t *testing.T) { + c, err := NewTaskManager(context.Background()) require.NoError(t, err) require.NotNil(t, c) - defer func() { - _ = c.Close(context.Background()) - }() ctx := c.GetTxnCtx(context.Background()) - err = c.RegisterTask(&Task{ - Name: "task-1", - Handler: func(name string) error { - fmt.Println("TSK1 ran: " + name) - return nil - }, + task1Chan := make(chan string, 1) + task1Arg := "task a" + + err = c.RegisterTask(task1Arg, func(name string) error { + task1Chan <- name + return nil }) require.NoError(t, err) - err = c.RegisterTask(&Task{ - Name: "task-2", - Handler: func(name string) error { - fmt.Println("TSK2 ran: " + name) - return nil - }, + require.Equal(t, 1, len(c.Tasks())) + + // Run single task + err = c.RunTask(ctx, &TaskRunOptions{ + Arguments: []interface{}{task1Arg}, + TaskName: task1Arg, }) require.NoError(t, err) - t.Log("tasks: ", len(c.Tasks())) + require.Equal(t, task1Arg, <-task1Chan) + + // Close the client + err = c.Close(context.Background()) + require.NoError(t, err) +} + +func TestNewTaskManager_Multiple(t *testing.T) { + ctx := context.Background() + c, _ := NewTaskManager(ctx) + + task1Chan := make(chan string, 1) + task2Chan := make(chan string, 1) + + task1Arg := "task b" + task2Arg := "task c" + + err := c.RegisterTask(task1Arg, func(name string) error { + task1Chan <- name + return nil + }) + require.NoError(t, err) - time.Sleep(2 * time.Second) + err = c.RegisterTask(task2Arg, func(name string) error { + task2Chan <- name + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, len(c.Tasks())) // Run tasks - err = c.RunTask(ctx, &TaskOptions{ - Arguments: []interface{}{"task #1"}, - TaskName: "task-1", + err = c.RunTask(ctx, &TaskRunOptions{ + Arguments: []interface{}{task1Arg}, + TaskName: task1Arg, + }) + require.NoError(t, err) + + err = c.RunTask(ctx, &TaskRunOptions{ + Arguments: []interface{}{task2Arg}, + TaskName: task2Arg, + }) + require.NoError(t, err) + + require.Equal(t, task1Arg, <-task1Chan) + require.Equal(t, task2Arg, <-task2Chan) + + // Close the client + err = c.Close(context.Background()) + require.NoError(t, err) +} + +func TestNewTaskManager_RegisterTwice(t *testing.T) { + ctx := context.Background() + c, _ := NewTaskManager(ctx) + + task1Arg := "task d" + resultChan := make(chan int, 1) + + err := c.RegisterTask(task1Arg, func(name string) error { + resultChan <- 1 + return nil + }) + require.NoError(t, err) + + err = c.RegisterTask(task1Arg, func(name string) error { + resultChan <- 2 + return nil }) require.NoError(t, err) - err = c.RunTask(ctx, &TaskOptions{ - Arguments: []interface{}{"task #2"}, - TaskName: "task-2", + err = c.RunTask(ctx, &TaskRunOptions{ + Arguments: []interface{}{task1Arg}, + TaskName: task1Arg, }) require.NoError(t, err) - err = c.RunTask(ctx, &TaskOptions{ - Arguments: []interface{}{"task #2 with delay"}, - Delay: time.Second, - TaskName: "task-2", + require.Equal(t, 1, <-resultChan) +} + +func TestNewTaskManager_RunTwice(t *testing.T) { + ctx := context.Background() + c, _ := NewTaskManager(ctx) + + task1Arg := "task e" + + err := c.RegisterTask(task1Arg, func(name string) error { + return nil + }) + require.NoError(t, err) + + err = c.RunTask(ctx, &TaskRunOptions{ + TaskName: task1Arg, + }) + require.NoError(t, err) + + err = c.RunTask(ctx, &TaskRunOptions{ + TaskName: task1Arg, }) require.NoError(t, err) +} + +func TestNewTaskManager_NotRegistered(t *testing.T) { + ctx := context.Background() + c, _ := NewTaskManager(ctx) + + task1Arg := "task f" + + err := c.RunTask(ctx, &TaskRunOptions{ + TaskName: task1Arg, + }) + require.Error(t, err) +} - t.Log("ran all tasks...") +func TestNewTaskManager_WithRedis(t *testing.T) { + if testing.Short() { + t.Skip("skipping live local redis tests") + } - t.Log("closing...") + queueName, _ := utils.RandomHex(8) + c, err := NewTaskManager(context.Background(), WithTaskqConfig(DefaultTaskQConfig(queueName, WithRedis("redis://localhost:6379")))) + require.NoError(t, err) + require.NotNil(t, c) + + ctx := c.GetTxnCtx(context.Background()) + + task1Chan := make(chan string, 1) + task1Arg := "task redis" + + err = c.RegisterTask(task1Arg, func(name string) error { + task1Chan <- name + return nil + }) + require.NoError(t, err) + + require.Equal(t, 1, len(c.Tasks())) + + // Run single task + err = c.RunTask(ctx, &TaskRunOptions{ + Arguments: []interface{}{task1Arg}, + TaskName: task1Arg, + }) + require.NoError(t, err) + + require.Equal(t, task1Arg, <-task1Chan) + + // Close the client + err = c.Close(context.Background()) + require.NoError(t, err) }