Skip to content

Commit

Permalink
Substantially (~20-45x) faster unique insertion using unique index
Browse files Browse the repository at this point in the history
Here, rebuild the unique job insertion infrastructure so that insertions
become substantially faster, in the range of 20 to 45x.

    $ go test -bench=. ./internal/dbunique
    goos: darwin
    goarch: arm64
    pkg: github.com/riverqueue/river/internal/dbunique
    BenchmarkUniqueInserter/FastPathEmptyDatabase-8                     9632            126446 ns/op
    BenchmarkUniqueInserter/FastPathManyExistingJobs-8                  9718            127795 ns/op
    BenchmarkUniqueInserter/SlowPathEmptyDatabase-8                      468           3008752 ns/op
    BenchmarkUniqueInserter/SlowPathManyExistingJobs-8                   214           6197776 ns/op
    PASS
    ok      github.com/riverqueue/river/internal/dbunique   13.558s

The speed up is accomplished by mostly abandoning the old methodology
that took an advisory lock, did a job look up, and then did an insertion
if no equivalent unique job was found. Instead, we add a new
`unique_key` field to the jobs table, put a partial index on it, and
use it in conjunction with `kind` to do upserts for unique insertions.
Its value is similar to what we used for advisory locks -- a hash of a
string representing the unique opts in question.

There is however, a downside. `unique_key` is easy when all we need to
think about are uniqueness based on something immutable like arguments
or queue, but more difficult when we have to factor in job state, which
may change over the lifetime of a job.

To compensate for this, we clear `unique_key` on a job when setting it
to states not included in the default unique state list, like when it's
being cancelled or discarded. This allows a new job with the same unique
properties to be inserted again.

But the corollary of this technique is that if a state like `cancelled`
or `discarded` is included in the `ByState` property, the technique
obviously doesn't work anymore. So instead, in these cases we _keep_ the
old insertion technique involving advisory locks, and fall back to this
slower insertion path when we have to. So while we get the benefits of
substantial performance improvements, we have the downside of more
complex code -- there's now two paths to think about and which have to
be tested. Overall though, I think the benefit is worth it.

The addition does require a new index. Luckily it's a partial so it only
gets used on unique inserts, and I benchmarked before/after, and found
no degradation in non-unique insert performance. I added instructions to
the CHANGELOG for building the index with `CONCURRENTLY` for any users
who may already have a large jobs table, giving them an operationally
safer alternative to use.
  • Loading branch information
brandur committed Jul 15, 2024
1 parent 0f5e131 commit 6f18ea8
Show file tree
Hide file tree
Showing 25 changed files with 1,349 additions and 291 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

The migration **includes a new index**. Users with a very large job table may want to consider raising the index separately using `CONCURRENTLY` (which must be run outside of a transaction), then run `river migrate-up` to finalize the process (it will tolerate an index that already exists):

```sql
ALTER TABLE river_job
ADD COLUMN unique_key bytea;

CREATE UNIQUE INDEX CONCURRENTLY river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL;
```

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

### Added

- Fully functional driver for `database/sql` for use with packages like Bun and GORM. [PR #351](https://github.com/riverqueue/river/pull/351).
Expand Down
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type Config struct {
// option then it's recommended to leave it unset because the prefix leaves
// only 32 bits of number space for advisory lock hashes, so it makes
// internally conflicting River-generated keys more likely.
//
// Advisory locks are currently only used for the fallback/slow path of
// unique job insertion where finalized states are included in a ByState
// configuration.
AdvisoryLockPrefix int32

// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
Expand Down
2 changes: 2 additions & 0 deletions cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ../../riverd

replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5

replace github.com/riverqueue/river/rivertype => ../../rivertype

require (
github.com/jackc/pgx/v5 v5.6.0
github.com/lmittmann/tint v1.0.4
Expand Down
59 changes: 59 additions & 0 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,62 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
})
})
}

func BenchmarkDriverRiverPgxV5Insert(b *testing.B) {
ctx := context.Background()

type testBundle struct {
exec riverdriver.Executor
tx pgx.Tx
}

setup := func(b *testing.B) (*riverpgxv5.Driver, *testBundle) {
b.Helper()

var (
driver = riverpgxv5.New(nil)
tx = riverinternaltest.TestTx(ctx, b)
)

bundle := &testBundle{
exec: driver.UnwrapExecutor(tx),
tx: tx,
}

return driver, bundle
}

b.Run("InsertFast", func(b *testing.B) {
_, bundle := setup(b)

for n := 0; n < b.N; n++ {
_, err := bundle.exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
})
require.NoError(b, err)
}
})

b.Run("InsertUnique", func(b *testing.B) {
_, bundle := setup(b)

for n := 0; n < b.N; n++ {
_, err := bundle.exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{
JobInsertFastParams: &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
},
})
require.NoError(b, err)
}
})
}
5 changes: 5 additions & 0 deletions insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ type UniqueOpts struct {
// With this setting, any jobs of the same kind that have been completed or
// discarded, but not yet cleaned out by the system, won't count towards the
// uniqueness of a new insert.
//
// Warning: A non-default slice of states in ByState will force the unique
// inserter to fall back to a slower insertion path that takes an advisory
// lock and performs a look up before insertion. For best performance, it's
// recommended that the default set of states is used.
ByState []rivertype.JobState
}

Expand Down
Loading

0 comments on commit 6f18ea8

Please sign in to comment.