Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
refactor(BUX-411): taskmanager with redis - tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain committed Dec 15, 2023
1 parent 40b4e33 commit 51ca270
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 55 deletions.
14 changes: 0 additions & 14 deletions taskmanager/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package taskmanager

import (
"context"

"github.com/newrelic/go-agent/v3/newrelic"
"github.com/rs/zerolog"
taskq "github.com/vmihailenco/taskq/v3"
)
Expand All @@ -27,17 +24,6 @@ func defaultClientOptions() *options {
}
}

// GetTxnCtx will check for an existing transaction
func (c *TaskManager) GetTxnCtx(ctx context.Context) context.Context {
if c.options.newRelicEnabled {
txn := newrelic.FromContext(ctx)
if txn != nil {
ctx = newrelic.NewContext(ctx, txn)
}
}
return ctx
}

// WithNewRelic will enable the NewRelic wrapper
func WithNewRelic() ClientOps {
return func(c *options) {
Expand Down
2 changes: 1 addition & 1 deletion taskmanager/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestWithTaskQ(t *testing.T) {
options := &options{
taskq: &taskqOptions{},
}
opt := WithTaskqConfig(DefaultTaskQConfig(testQueueName))
opt := WithTaskqConfig(DefaultTaskQConfig("test-queue"))
opt(options)
assert.NotNil(t, options.taskq.config)
})
Expand Down
17 changes: 14 additions & 3 deletions taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type (
//
// If no options are given, it will use the defaultClientOptions()
// ctx may contain a NewRelic txn (or one will be created)
func NewTaskManager(_ context.Context, opts ...ClientOps) (Tasker, error) {
func NewTaskManager(ctx context.Context, opts ...ClientOps) (Tasker, error) {
// Create a new tm with defaults
tm := &TaskManager{options: defaultClientOptions()}

Expand All @@ -55,10 +55,10 @@ func NewTaskManager(_ context.Context, opts ...ClientOps) (Tasker, error) {
}

// Use NewRelic if it's enabled (use existing txn if found on ctx)
// ctx = client.options.getTxnCtx(ctx)
// ctx = tm.options.getTxnCtx(ctx)

// Load the TaskQ engine
if err := tm.loadTaskQ(); err != nil {
if err := tm.loadTaskQ(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -141,3 +141,14 @@ func (tm *TaskManager) Factory() Factory {
}
return FactoryMemory
}

// GetTxnCtx will check for an existing transaction
func (tm *TaskManager) GetTxnCtx(ctx context.Context) context.Context {
if tm.options.newRelicEnabled {
txn := newrelic.FromContext(ctx)
if txn != nil {
ctx = newrelic.NewContext(ctx, txn)
}
}
return ctx
}
13 changes: 9 additions & 4 deletions taskmanager/taskq.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type TaskRunOptions struct {
}

// loadTaskQ will load TaskQ based on the Factory Type and configuration set by the client loading
func (c *TaskManager) loadTaskQ() error {
func (c *TaskManager) loadTaskQ(ctx context.Context) error {
// Check for a valid config (set on client creation)
factoryType := c.Factory()
if factoryType == FactoryEmpty {
Expand All @@ -82,7 +82,13 @@ func (c *TaskManager) loadTaskQ() error {
}

// Set the queue
c.options.taskq.queue = factory.RegisterQueue(c.options.taskq.config)
q := factory.RegisterQueue(c.options.taskq.config)
c.options.taskq.queue = q
if factoryType == FactoryRedis {
if err := q.Consumer().Start(ctx); err != nil {
return err
}
}

// turn off logger for now
// NOTE: having issues with logger with system resources
Expand All @@ -103,8 +109,7 @@ func (c *TaskManager) RegisterTask(name string, handler interface{}) (err error)
defer mutex.Unlock()

if t := taskq.Tasks.Get(name); t != nil {
// if already registered - register the task locally
c.options.taskq.tasks[name] = t
return fmt.Errorf("task %s already registered", name)
} else {
// Register and store the task
c.options.taskq.tasks[name] = taskq.RegisterTask(&taskq.TaskOptions{
Expand Down
180 changes: 147 additions & 33 deletions taskmanager/taskq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,184 @@ package taskmanager

import (
"context"
"fmt"
"testing"
"time"

"github.com/BuxOrg/bux/utils"
"github.com/stretchr/testify/require"
)

const (
testQueueName = "test_queue"
)

// todo: finish unit tests!
// NOTE: these are just tests to get the library built ;)

func TestNewClient(t *testing.T) {
c, err := NewTaskManager(
context.Background(),
WithTaskqConfig(DefaultTaskQConfig(testQueueName)),
)
// NOTE: because of the taskq package has global state, the names of tasks must be unique
func TestNewTaskManager_Single(t *testing.T) {
c, err := NewTaskManager(context.Background())
require.NoError(t, err)
require.NotNil(t, c)
defer func() {
_ = c.Close(context.Background())
}()

ctx := c.GetTxnCtx(context.Background())

err = c.RegisterTask("task-1", func(name string) error {
fmt.Println("TSK1 ran: " + name)
task1Chan := make(chan string, 1)
task1Arg := "task a"

err = c.RegisterTask(task1Arg, func(name string) error {
task1Chan <- name
return nil
})
require.NoError(t, err)

err = c.RegisterTask("task-2", func(name string) error {
fmt.Println("TSK2 ran: " + name)
return nil
require.Equal(t, 1, len(c.Tasks()))

// Run single task
err = c.RunTask(ctx, &TaskRunOptions{
Arguments: []interface{}{task1Arg},
TaskName: task1Arg,
})
require.NoError(t, err)

t.Log("tasks: ", len(c.Tasks()))
require.Equal(t, task1Arg, <-task1Chan)

// Close the client
err = c.Close(context.Background())
require.NoError(t, err)
}

func TestNewTaskManager_Multiple(t *testing.T) {
ctx := context.Background()
c, _ := NewTaskManager(ctx)

task1Chan := make(chan string, 1)
task2Chan := make(chan string, 1)

time.Sleep(2 * time.Second)
task1Arg := "task b"
task2Arg := "task c"

err := c.RegisterTask(task1Arg, func(name string) error {
task1Chan <- name
return nil
})
require.NoError(t, err)

err = c.RegisterTask(task2Arg, func(name string) error {
task2Chan <- name
return nil
})
require.NoError(t, err)
require.Equal(t, 2, len(c.Tasks()))

// Run tasks
err = c.RunTask(ctx, &TaskRunOptions{
Arguments: []interface{}{"task #1"},
TaskName: "task-1",
Arguments: []interface{}{task1Arg},
TaskName: task1Arg,
})
require.NoError(t, err)

err = c.RunTask(ctx, &TaskRunOptions{
Arguments: []interface{}{task2Arg},
TaskName: task2Arg,
})
require.NoError(t, err)

require.Equal(t, task1Arg, <-task1Chan)
require.Equal(t, task2Arg, <-task2Chan)

// Close the client
err = c.Close(context.Background())
require.NoError(t, err)
}

func TestNewTaskManager_RegisterTwice(t *testing.T) {
ctx := context.Background()
c, _ := NewTaskManager(ctx)

task1Arg := "task d"
resultChan := make(chan int, 1)

err := c.RegisterTask(task1Arg, func(name string) error {
resultChan <- 1
return nil
})
require.NoError(t, err)

err = c.RegisterTask(task1Arg, func(name string) error {
resultChan <- 2
return nil
})
require.Error(t, err)

err = c.RunTask(ctx, &TaskRunOptions{
Arguments: []interface{}{task1Arg},
TaskName: task1Arg,
})
require.NoError(t, err)

require.Equal(t, 1, <-resultChan)
}

func TestNewTaskManager_RunTwice(t *testing.T) {
ctx := context.Background()
c, _ := NewTaskManager(ctx)

task1Arg := "task e"

err := c.RegisterTask(task1Arg, func(name string) error {
return nil
})
require.NoError(t, err)

err = c.RunTask(ctx, &TaskRunOptions{
TaskName: task1Arg,
})
require.NoError(t, err)

err = c.RunTask(ctx, &TaskRunOptions{
Arguments: []interface{}{"task #2"},
TaskName: "task-2",
TaskName: task1Arg,
})
require.NoError(t, err)
}

func TestNewTaskManager_NotRegistered(t *testing.T) {
ctx := context.Background()
c, _ := NewTaskManager(ctx)

task1Arg := "task f"

err := c.RunTask(ctx, &TaskRunOptions{
TaskName: task1Arg,
})
require.Error(t, err)
}

func TestNewTaskManager_WithRedis(t *testing.T) {
if testing.Short() {
t.Skip("skipping live local redis tests")
}

queueName, _ := utils.RandomHex(8)
c, err := NewTaskManager(context.Background(), WithTaskqConfig(DefaultTaskQConfig(queueName, WithRedis("redis://localhost:6379"))))
require.NoError(t, err)
require.NotNil(t, c)

ctx := c.GetTxnCtx(context.Background())

task1Chan := make(chan string, 1)
task1Arg := "task redis"

err = c.RegisterTask(task1Arg, func(name string) error {
task1Chan <- name
return nil
})
require.NoError(t, err)

require.Equal(t, 1, len(c.Tasks()))

// Run single task
err = c.RunTask(ctx, &TaskRunOptions{
Arguments: []interface{}{"task #2 with delay"},
Delay: time.Second,
TaskName: "task-2",
Arguments: []interface{}{task1Arg},
TaskName: task1Arg,
})
require.NoError(t, err)

t.Log("ran all tasks...")
require.Equal(t, task1Arg, <-task1Chan)

t.Log("closing...")
// Close the client
err = c.Close(context.Background())
require.NoError(t, err)
}

0 comments on commit 51ca270

Please sign in to comment.