Skip to content

Commit

Permalink
Update River test helpers to allow for River drivers
Browse files Browse the repository at this point in the history
We recently added River database drivers that allow it to be more
agnostic about the database package in use with River rather than being
tightly tied to pgx v5. The test helpers were also quite a recent
addition and when the drivers came in weren't updated to use a similar
system, thereby still being coupled to pgx v5.

The purpose of this change is to get test helpers onto the drivers
system as well. Like with top-level River, it's mostly about getting the
right APIs in place -- we're still tightly coupled to pgx v5 under the
covers, and will have to refactor all of that at a later time.

The old API doesn't quite work anymore, so we bifurcate the helpers into
something more akin to what we see in `river.Client`, with a transaction
and non-transaction variant. Furthermore, because these are top-level
package helpers, we need to take a driver generic (at least for the
transaction-based variants), making them look like `JobCompleteTx`:

* `RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil)`
* `RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)`
* `RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []rivertest.ExpectedJob{...})`
* `RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{...})`

One approach I experimented with was to do something similar to the
`require` package by allowing a struct to be created that'd then give
you a shortcut without type parameters necessary (kind of like what
`Client` does currently):

    tRiver := rivertest.New(riverpgxv5.New(dbPool))
    tRiver.RequireInsert(ctx, t, &RequiredArgs{}, nil)
    tRiver.RequireInsertTx(ctx, t, tx, &RequiredArgs{}, nil)

The problem I ran into there is similar to the top-level insert helpers
we ended up removing from `river` -- namely that Go has the limitation
that struct-level functions can't have their own type parameters, so
going with that model would no longer allow `RequireInsert` to return
`Job[T]` (it'd have to be `JobRow` instead) which is a non-starter right
now because we want people to be able to easily assert on the return
values of these functions.

I'm still not sure this is the package's perfect form, but is pretty
goo,d and I think this is about as good as we're going to be able to for
now, and is ~identical functionality-wise to what we have before. I'm
still open to continuing to experiment with args equality assertions
being built-in, but we should try to get a reasonably quick fix for the
missing driver problem in so we don't have anyone accidentally start
using an API that's definitively outdated.
  • Loading branch information
brandur committed Nov 15, 2023
1 parent 03f52d0 commit 5d3228a
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 82 deletions.
19 changes: 16 additions & 3 deletions rivertest/example_require_inserted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ func Example_requireInserted() {
panic(err)
}

_, err = riverClient.Insert(ctx, RequiredArgs{
tx, err := dbPool.Begin(ctx)
if err != nil {
panic(err)
}
defer func() { _ = tx.Rollback(ctx) }()

_, err = riverClient.InsertTx(ctx, tx, &RequiredArgs{
Message: "Hello.",
}, nil)
if err != nil {
Expand All @@ -65,16 +71,23 @@ func Example_requireInserted() {
// *testing.T that comes from a test's argument.
t := &testing.T{}

job := rivertest.RequireInserted(ctx, t, dbPool, &RequiredArgs{}, nil)
job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
fmt.Printf("Test passed with message: %s\n", job.Args.Message)

// Verify the same job again, and this time that it was inserted at the
// default priority and default queue.
_ = rivertest.RequireInserted(ctx, t, dbPool, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
_ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
Priority: 1,
Queue: river.DefaultQueue,
})

// Insert and verify one on a pool instead of transaction.
_, err = riverClient.Insert(ctx, &RequiredArgs{Message: "Hello from pool."}, nil)
if err != nil {
panic(err)
}
_ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil)

// Output:
// Test passed with message: Hello.
}
25 changes: 20 additions & 5 deletions rivertest/example_require_many_inserted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,23 @@ func Example_requireManyInserted() {
panic(err)
}

_, err = riverClient.Insert(ctx, FirstRequiredArgs{Message: "Hello from first."}, nil)
tx, err := dbPool.Begin(ctx)
if err != nil {
panic(err)
}
defer func() { _ = tx.Rollback(ctx) }()

_, err = riverClient.Insert(ctx, SecondRequiredArgs{Message: "Hello from second."}, nil)
_, err = riverClient.InsertTx(ctx, tx, &FirstRequiredArgs{Message: "Hello from first."}, nil)
if err != nil {
panic(err)
}

_, err = riverClient.Insert(ctx, FirstRequiredArgs{Message: "Hello from first (again)."}, nil)
_, err = riverClient.InsertTx(ctx, tx, &SecondRequiredArgs{Message: "Hello from second."}, nil)
if err != nil {
panic(err)
}

_, err = riverClient.InsertTx(ctx, tx, &FirstRequiredArgs{Message: "Hello from first (again)."}, nil)
if err != nil {
panic(err)
}
Expand All @@ -91,7 +97,7 @@ func Example_requireManyInserted() {
// *testing.T that comes from a test's argument.
t := &testing.T{}

jobs := rivertest.RequireManyInserted(ctx, t, dbPool, []rivertest.ExpectedJob{
jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
{Args: &FirstRequiredArgs{}},
{Args: &SecondRequiredArgs{}},
{Args: &FirstRequiredArgs{}},
Expand All @@ -102,13 +108,22 @@ func Example_requireManyInserted() {

// Verify again, and this time that the second job was inserted at the
// default priority and default queue.
_ = rivertest.RequireManyInserted(ctx, t, dbPool, []rivertest.ExpectedJob{
_ = rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
{Args: &SecondRequiredArgs{}, Opts: &rivertest.RequireInsertedOpts{
Priority: 1,
Queue: river.DefaultQueue,
}},
})

// Insert and verify one on a pool instead of transaction.
_, err = riverClient.Insert(ctx, &FirstRequiredArgs{Message: "Hello from pool."}, nil)
if err != nil {
panic(err)
}
_ = rivertest.RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []rivertest.ExpectedJob{
{Args: &FirstRequiredArgs{}},
})

// Output:
// Job 0 args: {"message": "Hello from first."}
// Job 1 args: {"message": "Hello from second."}
Expand Down
140 changes: 108 additions & 32 deletions rivertest/rivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,24 @@ import (
"github.com/riverqueue/river"
"github.com/riverqueue/river/internal/dbsqlc"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/riverdriver"
)

// DBTX is a database-like executor which is implemented by all of pgxpool.Pool,
type Tester[TTx any] struct {
driver riverdriver.Driver[TTx]
}

func New[TTx any](driver riverdriver.Driver[TTx]) *Tester[TTx] {
return &Tester[TTx]{driver: driver}
}

// func (t *Tester[TTx]) RequireInserted[T river.JobArgs](ctx context.Context, tb testing.TB, db dbtx, expectedJob T, opts *RequireInsertedOpts) *river.Job[T] {
// }

// dbtx is a database-like executor which is implemented by all of pgxpool.Pool,
// pgx.Conn, and pgx.Tx. It's used to let this package's assertions be as
// flexible as possible in what database argument they can take.
type DBTX interface {
type dbtx interface {
CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
Expand Down Expand Up @@ -79,14 +91,14 @@ type RequireInsertedOpts struct {
Tags []string
}

// RequireInserted is a test helper that verifies that a job of the given kind was
// inserted for work, failing the test if it wasn't. The dbtx argument can be
// any of a Pgx connection pool, connection, or transaction. If found, the
// inserted job is returned so that further assertions can be made against it.
// RequireInserted is a test helper that verifies that a job of the given kind
// was inserted for work, failing the test if it wasn't. If found, the inserted
// job is returned so that further assertions can be made against it.
//
// job := RequireInserted(ctx, t, riverpgxv5.New(dbPool), &Job1Args{}, nil)
//
// func TestInsert(t *testing.T) {
// job := RequireInserted(ctx, t, poolOrConnOrTx, &Job1Args{}, nil)
// ...
// This variant takes a driver that wraps a database pool. See also
// RequireManyInsertedTx which takes a transaction.
//
// A RequireInsertedOpts struct can be provided as the last argument, and if it is,
// its properties (e.g. max attempts, priority, queue name) will act as required
Expand All @@ -95,26 +107,56 @@ type RequireInsertedOpts struct {
// The assertion will fail if more than one job of the given kind was found
// because at that point the job to return is ambiguous. Use RequireManyInserted
// to cover that case instead.
func RequireInserted[T river.JobArgs](ctx context.Context, tb testing.TB, dbtx DBTX, expectedJob T, opts *RequireInsertedOpts) *river.Job[T] {
func RequireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
tb.Helper()
return requireInserted(ctx, tb, dbtx, expectedJob, opts)
return requireInserted(ctx, tb, driver, expectedJob, opts)
}

func requireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.GetDBPool(), expectedJob, opts)
if err != nil {
failure(t, "Internal failure: %s", err)
}
return actualArgs
}

// RequireInsertedTx is a test helper that verifies that a job of the given kind
// was inserted for work, failing the test if it wasn't. If found, the inserted
// job is returned so that further assertions can be made against it.
//
// job := RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &Job1Args{}, nil)
//
// This variant takes a transaction. See also RequireInserted which takes a
// driver that wraps a database pool.
//
// A RequireInsertedOpts struct can be provided as the last argument, and if it is,
// its properties (e.g. max attempts, priority, queue name) will act as required
// assertions in the inserted job row. UniqueOpts is ignored.
//
// The assertion will fail if more than one job of the given kind was found
// because at that point the job to return is ambiguous. Use RequireManyInserted
// to cover that case instead.
func RequireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
tb.Helper()
return requireInsertedTx[TDriver](ctx, tb, tx, expectedJob, opts)
}

// Internal function used by the tests so that the exported version can take
// `testing.TB` instead of `testing.T`.
func requireInserted[T river.JobArgs](ctx context.Context, t testingT, dbtx DBTX, expectedJob T, opts *RequireInsertedOpts) *river.Job[T] {
actualArgs, err := requireInsertedErr(ctx, t, dbtx, expectedJob, opts)
func requireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
var driver TDriver
actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.UnwrapTx(tx), expectedJob, opts)
if err != nil {
failure(t, "Internal failure: %s", err)
}
return actualArgs
}

func requireInsertedErr[T river.JobArgs](ctx context.Context, t testingT, dbtx DBTX, expectedJob T, opts *RequireInsertedOpts) (*river.Job[T], error) {
func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, db dbtx, expectedJob TArgs, opts *RequireInsertedOpts) (*river.Job[TArgs], error) {
queries := dbsqlc.New()

// Returned ordered by ID.
dbJobs, err := queries.JobGetByKind(ctx, dbtx, expectedJob.Kind())
dbJobs, err := queries.JobGetByKind(ctx, db, expectedJob.Kind())
if err != nil {
return nil, fmt.Errorf("error querying jobs: %w", err)
}
Expand All @@ -131,7 +173,7 @@ func requireInsertedErr[T river.JobArgs](ctx context.Context, t testingT, dbtx D

jobRow := jobRowFromInternal(dbJobs[0])

var actualArgs T
var actualArgs TArgs
if err := json.Unmarshal(jobRow.EncodedArgs, &actualArgs); err != nil {
return nil, fmt.Errorf("error unmarshaling job args: %w", err)
}
Expand All @@ -142,7 +184,7 @@ func requireInsertedErr[T river.JobArgs](ctx context.Context, t testingT, dbtx D
}
}

return &river.Job[T]{JobRow: jobRow, Args: actualArgs}, nil
return &river.Job[TArgs]{JobRow: jobRow, Args: actualArgs}, nil
}

// ExpectedJob is a single job to expect encapsulating job args and possible
Expand All @@ -156,17 +198,50 @@ type ExpectedJob struct {
Opts *RequireInsertedOpts
}

// RequireManyInserted is a test helper that verifies that jobs of the given kinds
// were inserted for work, failing the test if they weren't, or were inserted in
// the wrong order. The dbtx argument can be any of a Pgx connection pool,
// connection, or transaction. If found, the inserted jobs are returned so that
// RequireManyInserted is a test helper that verifies that jobs of the given
// kinds were inserted for work, failing the test if they weren't, or were
// inserted in the wrong order. If found, the inserted jobs are returned so that
// further assertions can be made against them.
//
// func TestInsertMany(t *testing.T) {
// job := RequireManyInserted(ctx, t, poolOrConnOrTx, []river.JobArgs{
// &Job1Args{},
// })
// ...
// job := RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []river.JobArgs{
// &Job1Args{},
// })
//
// This variant takes a driver that wraps a database pool. See also
// RequireManyInsertedTx which takes a transaction.
//
// A RequireInsertedOpts struct can be provided for each expected job, and if it is,
// its properties (e.g. max attempts, priority, queue name) will act as required
// assertions for the corresponding inserted job row. UniqueOpts is ignored.
//
// The assertion expects emitted jobs to have occurred exactly in the order and
// the number specified, and will fail in case this expectation isn't met. So if
// a job of a certain kind is emitted multiple times, it must be expected
// multiple times.
func RequireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, driver TDriver, expectedJobs []ExpectedJob) []*river.JobRow {
tb.Helper()
return requireManyInserted(ctx, tb, driver, expectedJobs)
}

func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, driver TDriver, expectedJobs []ExpectedJob) []*river.JobRow {
actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.GetDBPool(), expectedJobs)
if err != nil {
failure(t, "Internal failure: %s", err)
}
return actualArgs
}

// RequireManyInsertedTx is a test helper that verifies that jobs of the given
// kinds were inserted for work, failing the test if they weren't, or were
// inserted in the wrong order. If found, the inserted jobs are returned so that
// further assertions can be made against them.
//
// job := RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []river.JobArgs{
// &Job1Args{},
// })
//
// This variant takes a transaction. See also RequireManyInserted which takes a
// driver that wraps a database pool.
//
// A RequireInsertedOpts struct can be provided for each expected job, and if it is,
// its properties (e.g. max attempts, priority, queue name) will act as required
Expand All @@ -176,28 +251,29 @@ type ExpectedJob struct {
// the number specified, and will fail in case this expectation isn't met. So if
// a job of a certain kind is emitted multiple times, it must be expected
// multiple times.
func RequireManyInserted(ctx context.Context, tb testing.TB, dbtx DBTX, expectedJobs []ExpectedJob) []*river.JobRow {
func RequireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, tx TTx, expectedJobs []ExpectedJob) []*river.JobRow {
tb.Helper()
return requireManyInserted(ctx, tb, dbtx, expectedJobs)
return requireManyInsertedTx[TDriver](ctx, tb, tx, expectedJobs)
}

// Internal function used by the tests so that the exported version can take
// `testing.TB` instead of `testing.T`.
func requireManyInserted(ctx context.Context, t testingT, dbtx DBTX, expectedJobs []ExpectedJob) []*river.JobRow {
actualArgs, err := requireManyInsertedErr(ctx, t, dbtx, expectedJobs)
func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, tx TTx, expectedJobs []ExpectedJob) []*river.JobRow {
var driver TDriver
actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.UnwrapTx(tx), expectedJobs)
if err != nil {
failure(t, "Internal failure: %s", err)
}
return actualArgs
}

func requireManyInsertedErr(ctx context.Context, t testingT, dbtx DBTX, expectedJobs []ExpectedJob) ([]*river.JobRow, error) {
func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, db dbtx, expectedJobs []ExpectedJob) ([]*river.JobRow, error) {
queries := dbsqlc.New()

expectedArgsKinds := sliceutil.Map(expectedJobs, func(j ExpectedJob) string { return j.Args.Kind() })

// Returned ordered by ID.
dbJobs, err := queries.JobGetByKindMany(ctx, dbtx, expectedArgsKinds)
dbJobs, err := queries.JobGetByKindMany(ctx, db, expectedArgsKinds)
if err != nil {
return nil, fmt.Errorf("error querying jobs: %w", err)
}
Expand Down
Loading

0 comments on commit 5d3228a

Please sign in to comment.