Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Substantially (~20-45x) faster unique insertion using unique index #451

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

The migration **includes a new index**. Users with a very large job table may want to consider raising the index separately using `CONCURRENTLY` (which must be run outside of a transaction), then run `river migrate-up` to finalize the process (it will tolerate an index that already exists):

```sql
ALTER TABLE river_job
ADD COLUMN unique_key bytea;

CREATE UNIQUE INDEX CONCURRENTLY river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL;
```
Comment on lines +17 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to say this should be a separate migration just so we can add the index concurrently. I don't love that the default of our own tooling is to do a thing which is likely dangerous for large installations.

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dang it, I'm not sure we have a choice except this for the time being. The migration framework automatically uses transactions so that in case a statement within a migration fails, the whole thing can be rolled back without tainting the database.

We'd have to add some new kind of "no transaction" mode to the migrator. Definitely possible, but would take a little more work right now.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a silly idea, but I felt the need to throw it out there:

inside the migration you could detect if the index was not created manually and the jobs table is above a certain (unfortunately arbitrary) threshold of rows, and fail the migration with an error indicating that the index should be created concurrently manually?

Copy link

@irreal irreal Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandur btw, creating a new type of migration job that runs without a tx would also not help with users who export sql to run in their own migration frameworks, while my proposal would

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good thought.

It's still a little difficult because given a very large table, even doing a SELECT count(*) can sometimes be a bit of a debacle. Sill though, cheaper than building an index.


```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

### Added

- Fully functional driver for `database/sql` for use with packages like Bun and GORM. [PR #351](https://github.com/riverqueue/river/pull/351).
Expand Down
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type Config struct {
// option then it's recommended to leave it unset because the prefix leaves
// only 32 bits of number space for advisory lock hashes, so it makes
// internally conflicting River-generated keys more likely.
//
// Advisory locks are currently only used for the fallback/slow path of
// unique job insertion where finalized states are included in a ByState
// configuration.
AdvisoryLockPrefix int32

// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
Expand Down
2 changes: 2 additions & 0 deletions cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ../../riverd

replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5

replace github.com/riverqueue/river/rivertype => ../../rivertype
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to remember to re-comment all these replaces in the CLI module before final release.


require (
github.com/jackc/pgx/v5 v5.6.0
github.com/lmittmann/tint v1.0.4
Expand Down
59 changes: 59 additions & 0 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,62 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
})
})
}

func BenchmarkDriverRiverPgxV5Insert(b *testing.B) {
ctx := context.Background()

type testBundle struct {
exec riverdriver.Executor
tx pgx.Tx
}

setup := func(b *testing.B) (*riverpgxv5.Driver, *testBundle) {
b.Helper()

var (
driver = riverpgxv5.New(nil)
tx = riverinternaltest.TestTx(ctx, b)
)

bundle := &testBundle{
exec: driver.UnwrapExecutor(tx),
tx: tx,
}

return driver, bundle
}

b.Run("InsertFast", func(b *testing.B) {
_, bundle := setup(b)

for n := 0; n < b.N; n++ {
_, err := bundle.exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
})
require.NoError(b, err)
}
})

b.Run("InsertUnique", func(b *testing.B) {
_, bundle := setup(b)

for n := 0; n < b.N; n++ {
_, err := bundle.exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{
JobInsertFastParams: &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
},
})
require.NoError(b, err)
}
})
}
5 changes: 5 additions & 0 deletions insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ type UniqueOpts struct {
// With this setting, any jobs of the same kind that have been completed or
// discarded, but not yet cleaned out by the system, won't count towards the
// uniqueness of a new insert.
//
// Warning: A non-default slice of states in ByState will force the unique
// inserter to fall back to a slower insertion path that takes an advisory
// lock and performs a look up before insertion. For best performance, it's
// recommended that the default set of states is used.
ByState []rivertype.JobState
}

Expand Down
Loading
Loading