Skip to content

Commit

Permalink
Support for database/sql in migrations + framework for multi-driver…
Browse files Browse the repository at this point in the history
… River

Here, add a new minimal driver called `riverdriver/riversql` that
supports Go's built-in `database/sql` package, but only for purposes of
migrations. The idea here is to fully complete #57 by providing a way of
making `rivermigrate` interoperable with Go migration frameworks that
support Go-based migrations like Goose, which provides hooks for
`*sql.Tx` [1] rather than pgx.

`riverdriver/riversql` is not a full driver and is only meant to be used
with `rivermigrate`. We document this clearly in a number of places.

To make a multi-driver world possible with River, we have to start the
work of building a platform that does more than `riverpgxv5`'s "cheat"
workaround. This works by having each driver implement specific database
operations like `MigrationGetAll`, which target their wrapped database
package of choice.

This is accomplished by having each driver bundle in its own sqlc that
targets its package. So `riverpgxv5` has an `sqlc.yaml` that targets
`pgx/v5`, while `riversql` has one that targets `database/sql`. There's
some `sqlc.yaml` duplication involved here, but luckily both drivers can
share a `river_migration.sql` file that contains all queries involved,
so you only need to change one place. `river_migration.sql` also migrates
entirely out of the main `./internal/dbsqlc`.

The idea here is that eventually `./internal/dbsqlc` will disappear
completely, usurped entirely by driver-specific versions. As this is
done, all references to `pgx` will disappear from the top-level package.
There are some complications here to figure out like `LISTEN`/`NOTIFY`
though, and I'm not clear whether `database/sql` could ever become a
fully functional driver as it might be missing some needed functionality
(e.g. subtransactions are still not supported after talking about them
for ten f*ing years [2]. However, even if it's not, the system would let
us support other fully functional packages or future major versions of
pgx (or even past ones like `pgx/v4` if there's demand).

`river/riverdriver` becomes a package as it now has types in it that
need to be referenced by driver implementations, and this would
otherwise not be possible without introducing a circular dependency.

Notably, this development branch has to use some `go.mod` `replace`
directives to demonstrate that it works correctly. If we go this
direction, we'll need to break it into chunks to release it without
them:

1. Break out changes to `river/riverdriver`. Tag and release it.

2. Break out changes to `riverdriver/river*` drivers. Have them target
   the release in (1), comment out `replace`s, then tag and release them.

3. Target the remaining River changes to the releases in (1) and (2),
   comment out `replace`s, then tag and release the top-level driver.

Unfortunately future deep incisions to drivers will require similar
gymnastics, but I don't think there's a way around it (we already have
this process except it's currently two steps instead of three). The hope
is that these will change relatively rarely, so it won't be too painful.

[1] https://github.com/pressly/goose#go-migrations
[2] golang/go#7898
  • Loading branch information
brandur committed Dec 9, 2023
1 parent 13ecbd4 commit 6bd17d4
Show file tree
Hide file tree
Showing 36 changed files with 1,104 additions and 143 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ jobs:
sqlc-version: "1.22.0"

- name: Run sqlc diff
working-directory: ./internal/dbsqlc
run: |
echo "Please make sure that all sqlc changes are checked in!"
sqlc diff
make verify
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ linters-settings:
- Default
- Prefix(github.com/riverqueue)

gomoddirectives:
replace-local: true

gosec:
excludes:
- G404 # use of non-crypto random; overly broad for our use case
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `rivermigrate/riverdatabasesql` driver to enable River Go migrations through Go's built in `database/sql` package.

## [0.0.12] - 2023-12-02

### Added
Expand Down
14 changes: 13 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,16 @@ generate: generate/sqlc

.PHONY: generate/sqlc
generate/sqlc:
cd internal/dbsqlc && sqlc generate
cd internal/dbsqlc && sqlc generate
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc generate
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc generate

.PHONY: verify
verify:
verify: verify/sqlc

.PHONY: verify/sqlc
verify/sqlc:
cd internal/dbsqlc && sqlc diff
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc diff
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc diff
2 changes: 2 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ queries. After changing an sqlc `.sql` file, generate Go with:
```shell
git checkout master && git pull --rebase
VERSION=v0.0.x
git tag riverdriver/VERSION -m "release riverdriver/VERSION"
git tag riverdriver/riverpgxv5/$VERSION -m "release riverdriver/riverpgxv5/$VERSION"
git tag riverdriver/riverdatabasesql/$VERSION -m "release riverdriver/riverdatabasesql/$VERSION"
git tag $VERSION
git push --tags
```
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
module github.com/riverqueue/river

go 1.21.0
go 1.21.4

// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ./riverdriver/riverpgxv5
replace github.com/riverqueue/river/riverdriver => ./riverdriver

replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ./riverdriver/riverpgxv5

replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ./riverdriver/riverdatabasesql

require (
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgx/v5 v5.5.0
github.com/jackc/puddle/v2 v2.2.1
github.com/oklog/ulid/v2 v2.1.0
github.com/riverqueue/river/riverdriver v0.0.0-00010101000000-000000000000
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.0-00010101000000-000000000000
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
Expand All @@ -23,6 +29,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.15.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12 h1:mcDBnqwzEXY9WDOwbkd8xmFdSr/H6oHb1F3NCNCmLDY=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12/go.mod h1:k6hsPkW9Fl3qURzyLHbvxUCqWDpit0WrZ3oEaKezD3E=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
Expand Down
6 changes: 0 additions & 6 deletions internal/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions internal/dbsqlc/sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ sql:
queries:
- river_job.sql
- river_leader.sql
- river_migration.sql
schema:
- river_job.sql
- river_leader.sql
- river_migration.sql
gen:
go:
package: "dbsqlc"
Expand Down
29 changes: 25 additions & 4 deletions internal/riverinternaltest/riverinternaltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"log"
"log/slog"
"net/url"
"os"
"runtime"
"sync"
Expand Down Expand Up @@ -52,19 +53,39 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype {
}

func DatabaseConfig(databaseName string) *pgxpool.Config {
databaseURL := valutil.ValOrDefault(os.Getenv("TEST_DATABASE_URL"), "postgres:///river_testdb?sslmode=disable")

config, err := pgxpool.ParseConfig(databaseURL)
config, err := pgxpool.ParseConfig(DatabaseURL(databaseName))
if err != nil {
panic(fmt.Sprintf("error parsing database URL: %v", err))
}
config.MaxConns = dbPoolMaxConns
config.ConnConfig.ConnectTimeout = 10 * time.Second
config.ConnConfig.Database = databaseName
config.ConnConfig.RuntimeParams["timezone"] = "UTC"
return config
}

// DatabaseURL gets a test database URL from TEST_DATABASE_URL or falls back on
// a default pointing to `river_testdb`. If databaseName is set, it replaces the
// database in the URL, although the host and other parameters are preserved.
//
// Most of the time DatabaseConfig should be used instead of this function, but
// it may be useful in non-pgx situations like for examples showing the use of
// `database/sql`.
func DatabaseURL(databaseName string) string {
u, err := url.Parse(valutil.ValOrDefault(

Check failure on line 74 in internal/riverinternaltest/riverinternaltest.go

View workflow job for this annotation

GitHub Actions / lint

variable name 'u' is too short for the scope of its usage (varnamelen)
os.Getenv("TEST_DATABASE_URL"),
"postgres://localhost/river_testdb?sslmode=disable"),
)
if err != nil {
panic(err)
}

if databaseName != "" {
u.Path = databaseName
}

return u.String()
}

// DiscardContinuously drains continuously out of the given channel and discards
// anything that comes out of it. Returns a stop function that should be invoked
// to stop draining. Stop must be invoked before tests finish to stop an
Expand Down
33 changes: 33 additions & 0 deletions internal/util/dbutil/db_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/jackc/pgx/v5"

"github.com/riverqueue/river/internal/dbsqlc"
"github.com/riverqueue/river/riverdriver"
)

// Executor is an interface for a type that can begin a transaction and also
Expand Down Expand Up @@ -56,3 +57,35 @@ func WithTxV[T any](ctx context.Context, txBeginner TxBeginner, innerFunc func(c

return res, nil
}

// WithExecutorTx starts and commits a transaction on a driver executor around
// the given function, allowing the return of a generic value.
func WithExecutorTx(ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, tx riverdriver.ExecutorTx) error) error {
_, err := WithExecutorTxV(ctx, exec, func(ctx context.Context, tx riverdriver.ExecutorTx) (struct{}, error) {
return struct{}{}, innerFunc(ctx, tx)
})
return err
}

// WithExecutorTxV starts and commits a transaction on a driver executor around
// the given function, allowing the return of a generic value.
func WithExecutorTxV[T any](ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, tx riverdriver.ExecutorTx) (T, error)) (T, error) {
var defaultRes T

tx, err := exec.Begin(ctx)
if err != nil {
return defaultRes, fmt.Errorf("error beginning transaction: %w", err)
}
defer tx.Rollback(ctx)

res, err := innerFunc(ctx, tx)
if err != nil {
return defaultRes, err
}

if err := tx.Commit(ctx); err != nil {
return defaultRes, fmt.Errorf("error committing transaction: %w", err)
}

return res, nil
}
35 changes: 35 additions & 0 deletions internal/util/dbutil/db_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

func TestWithTx(t *testing.T) {
Expand Down Expand Up @@ -40,3 +42,36 @@ func TestWithTxV(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 7, ret)
}

func TestWithExecutorTx(t *testing.T) {
t.Parallel()

ctx := context.Background()
dbPool := riverinternaltest.TestDB(ctx, t)
driver := riverpgxv5.New(dbPool)

err := WithExecutorTx(ctx, driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) error {
_, err := tx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

return nil
})
require.NoError(t, err)
}

func TestWithExecutorTxV(t *testing.T) {
t.Parallel()

ctx := context.Background()
dbPool := riverinternaltest.TestDB(ctx, t)
driver := riverpgxv5.New(dbPool)

ret, err := WithExecutorTxV(ctx, driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (int, error) {
_, err := tx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

return 7, nil
})
require.NoError(t, err)
require.Equal(t, 7, ret)
}
14 changes: 14 additions & 0 deletions riverdriver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module github.com/riverqueue/river/riverdriver

go 1.21.4

require github.com/jackc/pgx/v5 v5.5.0

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
28 changes: 28 additions & 0 deletions riverdriver/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading

0 comments on commit 6bd17d4

Please sign in to comment.