Skip to content

Commit

Permalink
Substantially (~20-45x) faster unique insertion using unique index
Browse files Browse the repository at this point in the history
Here, rebuild the unique job insertion infrastructure so that insertions
become substantially faster, in the range of 20 to 45x.

    $ go test -bench=. ./internal/dbunique
    goos: darwin
    goarch: arm64
    pkg: github.com/riverqueue/river/internal/dbunique
    BenchmarkUniqueInserter/FastPathEmptyDatabase-8                     9632            126446 ns/op
    BenchmarkUniqueInserter/FastPathManyExistingJobs-8                  9718            127795 ns/op
    BenchmarkUniqueInserter/SlowPathEmptyDatabase-8                      468           3008752 ns/op
    BenchmarkUniqueInserter/SlowPathManyExistingJobs-8                   214           6197776 ns/op
    PASS
    ok      github.com/riverqueue/river/internal/dbunique   13.558s

The speed up is accomplished by mostly abandoning the old methodology
that took an advisory lock, did a job look up, and then did an insertion
if no equivalent unique job was found. Instead, we add a new
`unique_key` field to the jobs table, put a partial index on it, and
use it in conjunction with `kind` to do upserts for unique insertions.
Its value is similar to what we used for advisory locks -- a hash of a
string representing the unique opts in question.

There is however, a downside. `unique_key` is easy when all we need to
think about are uniqueness based on something immutable like arguments
or queue, but more difficult when we have to factor in job state, which
may change over the lifetime of a job.

To compensate for this, we clear `unique_key` on a job when setting it
to states not included in the default unique state list, like when it's
being cancelled or discarded. This allows a new job with the same unique
properties to be inserted again.

But the corollary of this technique is that if a state like `cancelled`
or `discarded` is included in the `ByState` property, the technique
obviously doesn't work anymore. So instead, in these cases we _keep_ the
old insertion technique involving advisory locks, and fall back to this
slower insertion path when we have to. So while we get the benefits of
substantial performance improvements, we have the downside of more
complex code -- there's now two paths to think about and which have to
be tested. Overall though, I think the benefit is worth it.

The addition does require a new index. Luckily it's a partial so it only
gets used on unique inserts, and I benchmarked before/after, and found
no degradation in non-unique insert performance. I added instructions to
the CHANGELOG for building the index with `CONCURRENTLY` for any users
who may already have a large jobs table, giving them an operationally
safer alternative to use.
  • Loading branch information
brandur committed Jul 13, 2024
1 parent 18fbd7d commit 66bd785
Show file tree
Hide file tree
Showing 21 changed files with 1,289 additions and 221 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

The migration **includes a new index**. Users with a very large job table may want to consider raising the index separately using `CONCURRENTLY` (which must be run outside of a transaction), then run `river migrate-up` to finalize the process (it will tolerate an index that already exists):

```sql
ALTER TABLE river_job
ADD COLUMN unique_key bytea;

CREATE UNIQUE INDEX CONCURRENTLY river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL;
```

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

### Added

- Fully functional driver for `database/sql` for use with packages like Bun and GORM. [PR #351](https://github.com/riverqueue/river/pull/351).
Expand Down
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type Config struct {
// option then it's recommended to leave it unset because the prefix leaves
// only 32 bits of number space for advisory lock hashes, so it makes
// internally conflicting River-generated keys more likely.
//
// Advisory locks are currently only used for the fallback/slow path of
// unique job insertion where finalized states are included in a ByState
// configuration.
AdvisoryLockPrefix int32

// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
Expand Down
2 changes: 2 additions & 0 deletions cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ../../riverd

replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5

replace github.com/riverqueue/river/rivertype => ../../rivertype

require (
github.com/jackc/pgx/v5 v5.6.0
github.com/lmittmann/tint v1.0.4
Expand Down
6 changes: 6 additions & 0 deletions insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ type UniqueOpts struct {
// With this setting, any jobs of the same kind that have been completed or
// discarded, but not yet cleaned out by the system, won't count towards the
// uniqueness of a new insert.
//
// Warning: Any use of non-default states in ByState (any not in the list
// above) will force the unique inserter to fall back to a slower insertion
// path that takes an advisory lock and performs a look up before insertion.
// For best performance, it's recommended that the default set of states is
// used, or a subset of them.
ByState []rivertype.JobState
}

Expand Down
281 changes: 177 additions & 104 deletions internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"time"

"golang.org/x/crypto/sha3"

"github.com/riverqueue/river/internal/util/hashutil"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
Expand All @@ -20,14 +22,19 @@ import (
// determine uniqueness. So for example, a new unique job may be inserted even
// if another job already exists, as long as that other job is set `cancelled`
// or `discarded`.
var defaultUniqueStates = []string{ //nolint:gochecknoglobals
string(rivertype.JobStateAvailable),
string(rivertype.JobStateCompleted),
string(rivertype.JobStateRunning),
string(rivertype.JobStateRetryable),
string(rivertype.JobStateScheduled),
var defaultUniqueStates = []rivertype.JobState{ //nolint:gochecknoglobals
rivertype.JobStateAvailable,
rivertype.JobStateCompleted,
rivertype.JobStateRunning,
rivertype.JobStateRetryable,
rivertype.JobStateScheduled,
}

var (
defaultUniqueStatesMap = sliceutil.KeyBy(defaultUniqueStates, func(s rivertype.JobState) (rivertype.JobState, struct{}) { return s, struct{}{} }) //nolint:gochecknoglobals
defaultUniqueStatesStrings = sliceutil.Map(defaultUniqueStates, func(s rivertype.JobState) string { return string(s) }) //nolint:gochecknoglobals
)

type UniqueOpts struct {
ByArgs bool
ByPeriod time.Duration
Expand All @@ -48,115 +55,181 @@ type UniqueInserter struct {
}

func (i *UniqueInserter) JobInsert(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) (*rivertype.JobInsertResult, error) {
var tx riverdriver.ExecutorTx

if uniqueOpts != nil && !uniqueOpts.IsEmpty() {
// For uniqueness checks returns an advisory lock hash to use for lock,
// parameters to check for an existing unique job with the same
// properties, and a boolean indicating whether a uniqueness check
// should be performed at all (in some cases the check can be skipped if
// we can determine ahead of time that this insert will not violate
// uniqueness conditions).
buildUniqueParams := func() (*hashutil.AdvisoryLockHash, *riverdriver.JobGetByKindAndUniquePropertiesParams, bool) {
advisoryLockHash := hashutil.NewAdvisoryLockHash(i.AdvisoryLockPrefix)
advisoryLockHash.Write([]byte("unique_key"))
advisoryLockHash.Write([]byte("kind=" + params.Kind))

getParams := riverdriver.JobGetByKindAndUniquePropertiesParams{
Kind: params.Kind,
}

if uniqueOpts.ByArgs {
advisoryLockHash.Write([]byte("&args="))
advisoryLockHash.Write(params.EncodedArgs)

getParams.Args = params.EncodedArgs
getParams.ByArgs = true
}

if uniqueOpts.ByPeriod != time.Duration(0) {
lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod)

advisoryLockHash.Write([]byte("&period=" + lowerPeriodBound.Format(time.RFC3339)))

getParams.ByCreatedAt = true
getParams.CreatedAtBegin = lowerPeriodBound
getParams.CreatedAtEnd = lowerPeriodBound.Add(uniqueOpts.ByPeriod)
}

if uniqueOpts.ByQueue {
advisoryLockHash.Write([]byte("&queue=" + params.Queue))

getParams.ByQueue = true
getParams.Queue = params.Queue
}

{
stateSet := defaultUniqueStates
if len(uniqueOpts.ByState) > 0 {
stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) })
}

advisoryLockHash.Write([]byte("&state=" + strings.Join(stateSet, ",")))

if !slices.Contains(stateSet, string(params.State)) {
return nil, nil, false
}

getParams.ByState = true
getParams.State = stateSet
}

return advisoryLockHash, &getParams, true
// With no unique options set, do a normal non-unique insert.
if uniqueOpts == nil || uniqueOpts.IsEmpty() {
return insertNonUnique(ctx, exec, params)
}

// Build a unique key for use in either the `(kind, unique_key)` index or in
// an advisory lock prefix if we end up taking the slow path.
uniqueKey, doUniqueInsert := i.buildUniqueKey(params, uniqueOpts)
if !doUniqueInsert {
return insertNonUnique(ctx, exec, params)
}

// Fast path: as long uniqueness is entirely contingent on non-finalized
// states (i.e. either the default set of insert states, or any subset
// thereof), we can take the fast path wherein uniqueness is determined
// based on an upsert to a unique index on `(kind, unique_key)`. This works
// because when finalizing any jobs, the client and/or competer will zero
// the job's `unique_key` field, taking it out of consideration for future
// inserts given the same unique opts.
if uniqueOpts.ByState == nil || sliceIsSubset(defaultUniqueStatesMap, uniqueOpts.ByState) {
uniqueKeyHash := sha3.Sum256([]byte(uniqueKey))

insertRes, err := exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{
JobInsertFastParams: params,
UniqueKey: uniqueKeyHash[:],
})
if err != nil {
return nil, err
}

if advisoryLockHash, getParams, doUniquenessCheck := buildUniqueParams(); doUniquenessCheck {
// Begin a subtransaction
exec, err := exec.Begin(ctx)
if err != nil {
return nil, err
}
defer exec.Rollback(ctx)

// Make the subtransaction available at function scope so it can be
// committed in cases where we insert a job.
tx = exec

// The wrapping transaction should maintain snapshot consistency
// even if we were to only have a SELECT + INSERT, but given that a
// conflict is possible, obtain an advisory lock based on the
// parameters of the unique job first, and have contending inserts
// wait for it. This is a synchronous lock so we rely on context
// timeout in case something goes wrong and it's blocking for too
// long.
if _, err := exec.PGAdvisoryXactLock(ctx, advisoryLockHash.Key()); err != nil {
return nil, fmt.Errorf("error acquiring unique lock: %w", err)
}

existing, err := exec.JobGetByKindAndUniqueProperties(ctx, getParams)
if err != nil {
if !errors.Is(err, rivertype.ErrNotFound) {
return nil, fmt.Errorf("error getting unique job: %w", err)
}
}

if existing != nil {
// Insert skipped; returns an existing row.
return &rivertype.JobInsertResult{Job: existing, UniqueSkippedAsDuplicate: true}, nil
}
return (*rivertype.JobInsertResult)(insertRes), nil
}

// Slow path: open a subtransaction, take an advisory lock, check to see if
// a job with the given criteria exists, then either return an existing row
// or insert a new one.

advisoryLockHash := hashutil.NewAdvisoryLockHash(i.AdvisoryLockPrefix)
advisoryLockHash.Write([]byte("unique_key"))
advisoryLockHash.Write([]byte("kind=" + params.Kind))
advisoryLockHash.Write([]byte(uniqueKey))

getParams := i.buildGetParams(params, uniqueOpts)

// Begin a subtransaction
subExec, err := exec.Begin(ctx)
if err != nil {
return nil, err
}
defer subExec.Rollback(ctx)

// The wrapping transaction should maintain snapshot consistency even if we
// were to only have a SELECT + INSERT, but given that a conflict is
// possible, obtain an advisory lock based on the parameters of the unique
// job first, and have contending inserts wait for it. This is a synchronous
// lock so we rely on context timeout in case something goes wrong and it's
// blocking for too long.
if _, err := subExec.PGAdvisoryXactLock(ctx, advisoryLockHash.Key()); err != nil {
return nil, fmt.Errorf("error acquiring unique lock: %w", err)
}

existing, err := subExec.JobGetByKindAndUniqueProperties(ctx, getParams)
if err != nil {
if !errors.Is(err, rivertype.ErrNotFound) {
return nil, fmt.Errorf("error getting unique job: %w", err)
}
}

jobRow, err := exec.JobInsertFast(ctx, params)
if existing != nil {
// Insert skipped; returns an existing row.
return &rivertype.JobInsertResult{Job: existing, UniqueSkippedAsDuplicate: true}, nil
}

jobRow, err := subExec.JobInsertFast(ctx, params)
if err != nil {
return nil, err
}

if tx != nil {
if err := tx.Commit(ctx); err != nil {
return nil, err
if err := subExec.Commit(ctx); err != nil {
return nil, err
}

return &rivertype.JobInsertResult{Job: jobRow}, nil
}

// Builds a unique key made up of the unique options in place. The key is hashed
// to become a value for `unique_key` in the fast insertion path, or hashed and
// used for an advisory lock on the slow insertion path.
func (i *UniqueInserter) buildUniqueKey(params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) (string, bool) {
var sb strings.Builder

if uniqueOpts.ByArgs {
sb.WriteString("&args=")
sb.Write(params.EncodedArgs)
}

if uniqueOpts.ByPeriod != time.Duration(0) {
lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod)
sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339))
}

if uniqueOpts.ByQueue {
sb.WriteString("&queue=" + params.Queue)
}

{
stateSet := defaultUniqueStatesStrings
if len(uniqueOpts.ByState) > 0 {
stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) })
slices.Sort(stateSet)
}

sb.WriteString("&state=" + strings.Join(stateSet, ","))

if !slices.Contains(stateSet, string(params.State)) {
return "", false
}
}

return sb.String(), true
}

// Builds get parameters suitable for looking up a unique job on the slow unique
// insertion path.
func (i *UniqueInserter) buildGetParams(params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) *riverdriver.JobGetByKindAndUniquePropertiesParams {
getParams := riverdriver.JobGetByKindAndUniquePropertiesParams{
Kind: params.Kind,
}

if uniqueOpts.ByArgs {
getParams.Args = params.EncodedArgs
getParams.ByArgs = true
}

if uniqueOpts.ByPeriod != time.Duration(0) {
lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod)

getParams.ByCreatedAt = true
getParams.CreatedAtBegin = lowerPeriodBound
getParams.CreatedAtEnd = lowerPeriodBound.Add(uniqueOpts.ByPeriod)
}

if uniqueOpts.ByQueue {
getParams.ByQueue = true
getParams.Queue = params.Queue
}

{
stateSet := defaultUniqueStatesStrings
if len(uniqueOpts.ByState) > 0 {
stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) })
}

getParams.ByState = true
getParams.State = stateSet
}

return &getParams
}

// Shared shortcut for inserting a row without uniqueness.
func insertNonUnique(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastParams) (*rivertype.JobInsertResult, error) {
jobRow, err := exec.JobInsertFast(ctx, params)
if err != nil {
return nil, err
}
return &rivertype.JobInsertResult{Job: jobRow}, nil
}

// Returns true if all elements of the slice `subset` are in the map `set`.
func sliceIsSubset[T comparable, V any](set map[T]V, subset []T) bool {
for _, element := range subset {
if _, ok := set[element]; !ok {
return false
}
}
return true
}
Loading

0 comments on commit 66bd785

Please sign in to comment.