diff --git a/CHANGELOG.md b/CHANGELOG.md index 33d8e128..f11ddc04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,20 @@ go install github.com/riverqueue/river/cmd/river@latest river migrate-up --database-url "$DATABASE_URL" ``` +The migration **includes a new index**. Users with a very large job table may want to consider raising the index separately using `CONCURRENTLY` (which must be run outside of a transaction), then run `river migrate-up` to finalize the process (it will tolerate an index that already exists): + +```sql +ALTER TABLE river_job + ADD COLUMN unique_key bytea; + +CREATE UNIQUE INDEX CONCURRENTLY river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; +``` + +```shell +go install github.com/riverqueue/river/cmd/river@latest +river migrate-up --database-url "$DATABASE_URL" +``` + ### Added - Fully functional driver for `database/sql` for use with packages like Bun and GORM. [PR #351](https://github.com/riverqueue/river/pull/351). diff --git a/client.go b/client.go index c3d0cb5e..babc796d 100644 --- a/client.go +++ b/client.go @@ -65,6 +65,10 @@ type Config struct { // option then it's recommended to leave it unset because the prefix leaves // only 32 bits of number space for advisory lock hashes, so it makes // internally conflicting River-generated keys more likely. + // + // Advisory locks are currently only used for the fallback/slow path of + // unique job insertion where finalized states are included in a ByState + // configuration. AdvisoryLockPrefix int32 // CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs diff --git a/cmd/river/go.mod b/cmd/river/go.mod index d20620a6..bc6069d9 100644 --- a/cmd/river/go.mod +++ b/cmd/river/go.mod @@ -10,6 +10,8 @@ replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ../../riverd replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5 +replace github.com/riverqueue/river/rivertype => ../../rivertype + require ( github.com/jackc/pgx/v5 v5.6.0 github.com/lmittmann/tint v1.0.4 diff --git a/driver_test.go b/driver_test.go index 54abbfa6..7280fc81 100644 --- a/driver_test.go +++ b/driver_test.go @@ -192,3 +192,62 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { }) }) } + +func BenchmarkDriverRiverPgxV5Insert(b *testing.B) { + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + tx pgx.Tx + } + + setup := func(b *testing.B) (*riverpgxv5.Driver, *testBundle) { + b.Helper() + + var ( + driver = riverpgxv5.New(nil) + tx = riverinternaltest.TestTx(ctx, b) + ) + + bundle := &testBundle{ + exec: driver.UnwrapExecutor(tx), + tx: tx, + } + + return driver, bundle + } + + b.Run("InsertFast", func(b *testing.B) { + _, bundle := setup(b) + + for n := 0; n < b.N; n++ { + _, err := bundle.exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + }) + require.NoError(b, err) + } + }) + + b.Run("InsertUnique", func(b *testing.B) { + _, bundle := setup(b) + + for n := 0; n < b.N; n++ { + _, err := bundle.exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + }, + }) + require.NoError(b, err) + } + }) +} diff --git a/insert_opts.go b/insert_opts.go index fcaa8f15..6e671331 100644 --- a/insert_opts.go +++ b/insert_opts.go @@ -136,6 +136,12 @@ type UniqueOpts struct { // With this setting, any jobs of the same kind that have been completed or // discarded, but not yet cleaned out by the system, won't count towards the // uniqueness of a new insert. + // + // Warning: Any use of non-default states in ByState (any not in the list + // above) will force the unique inserter to fall back to a slower insertion + // path that takes an advisory lock and performs a look up before insertion. + // For best performance, it's recommended that the default set of states is + // used, or a subset of them. ByState []rivertype.JobState } diff --git a/internal/dbunique/db_unique.go b/internal/dbunique/db_unique.go index 34bcb41e..edaf1b12 100644 --- a/internal/dbunique/db_unique.go +++ b/internal/dbunique/db_unique.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "golang.org/x/crypto/sha3" + "github.com/riverqueue/river/internal/util/hashutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" @@ -20,14 +22,19 @@ import ( // determine uniqueness. So for example, a new unique job may be inserted even // if another job already exists, as long as that other job is set `cancelled` // or `discarded`. -var defaultUniqueStates = []string{ //nolint:gochecknoglobals - string(rivertype.JobStateAvailable), - string(rivertype.JobStateCompleted), - string(rivertype.JobStateRunning), - string(rivertype.JobStateRetryable), - string(rivertype.JobStateScheduled), +var defaultUniqueStates = []rivertype.JobState{ //nolint:gochecknoglobals + rivertype.JobStateAvailable, + rivertype.JobStateCompleted, + rivertype.JobStateRunning, + rivertype.JobStateRetryable, + rivertype.JobStateScheduled, } +var ( + defaultUniqueStatesMap = sliceutil.KeyBy(defaultUniqueStates, func(s rivertype.JobState) (rivertype.JobState, struct{}) { return s, struct{}{} }) //nolint:gochecknoglobals + defaultUniqueStatesStrings = sliceutil.Map(defaultUniqueStates, func(s rivertype.JobState) string { return string(s) }) //nolint:gochecknoglobals +) + type UniqueOpts struct { ByArgs bool ByPeriod time.Duration @@ -48,115 +55,181 @@ type UniqueInserter struct { } func (i *UniqueInserter) JobInsert(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) (*rivertype.JobInsertResult, error) { - var tx riverdriver.ExecutorTx - - if uniqueOpts != nil && !uniqueOpts.IsEmpty() { - // For uniqueness checks returns an advisory lock hash to use for lock, - // parameters to check for an existing unique job with the same - // properties, and a boolean indicating whether a uniqueness check - // should be performed at all (in some cases the check can be skipped if - // we can determine ahead of time that this insert will not violate - // uniqueness conditions). - buildUniqueParams := func() (*hashutil.AdvisoryLockHash, *riverdriver.JobGetByKindAndUniquePropertiesParams, bool) { - advisoryLockHash := hashutil.NewAdvisoryLockHash(i.AdvisoryLockPrefix) - advisoryLockHash.Write([]byte("unique_key")) - advisoryLockHash.Write([]byte("kind=" + params.Kind)) - - getParams := riverdriver.JobGetByKindAndUniquePropertiesParams{ - Kind: params.Kind, - } - - if uniqueOpts.ByArgs { - advisoryLockHash.Write([]byte("&args=")) - advisoryLockHash.Write(params.EncodedArgs) - - getParams.Args = params.EncodedArgs - getParams.ByArgs = true - } - - if uniqueOpts.ByPeriod != time.Duration(0) { - lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) - - advisoryLockHash.Write([]byte("&period=" + lowerPeriodBound.Format(time.RFC3339))) - - getParams.ByCreatedAt = true - getParams.CreatedAtBegin = lowerPeriodBound - getParams.CreatedAtEnd = lowerPeriodBound.Add(uniqueOpts.ByPeriod) - } - - if uniqueOpts.ByQueue { - advisoryLockHash.Write([]byte("&queue=" + params.Queue)) - - getParams.ByQueue = true - getParams.Queue = params.Queue - } - - { - stateSet := defaultUniqueStates - if len(uniqueOpts.ByState) > 0 { - stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) }) - } - - advisoryLockHash.Write([]byte("&state=" + strings.Join(stateSet, ","))) - - if !slices.Contains(stateSet, string(params.State)) { - return nil, nil, false - } - - getParams.ByState = true - getParams.State = stateSet - } - - return advisoryLockHash, &getParams, true + // With no unique options set, do a normal non-unique insert. + if uniqueOpts == nil || uniqueOpts.IsEmpty() { + return insertNonUnique(ctx, exec, params) + } + + // Build a unique key for use in either the `(kind, unique_key)` index or in + // an advisory lock prefix if we end up taking the slow path. + uniqueKey, doUniqueInsert := i.buildUniqueKey(params, uniqueOpts) + if !doUniqueInsert { + return insertNonUnique(ctx, exec, params) + } + + // Fast path: as long uniqueness is entirely contingent on non-finalized + // states (i.e. either the default set of insert states, or any subset + // thereof), we can take the fast path wherein uniqueness is determined + // based on an upsert to a unique index on `(kind, unique_key)`. This works + // because when finalizing any jobs, the client and/or competer will zero + // the job's `unique_key` field, taking it out of consideration for future + // inserts given the same unique opts. + if uniqueOpts.ByState == nil || sliceIsSubset(defaultUniqueStatesMap, uniqueOpts.ByState) { + uniqueKeyHash := sha3.Sum256([]byte(uniqueKey)) + + insertRes, err := exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: params, + UniqueKey: uniqueKeyHash[:], + }) + if err != nil { + return nil, err } - if advisoryLockHash, getParams, doUniquenessCheck := buildUniqueParams(); doUniquenessCheck { - // Begin a subtransaction - exec, err := exec.Begin(ctx) - if err != nil { - return nil, err - } - defer exec.Rollback(ctx) - - // Make the subtransaction available at function scope so it can be - // committed in cases where we insert a job. - tx = exec - - // The wrapping transaction should maintain snapshot consistency - // even if we were to only have a SELECT + INSERT, but given that a - // conflict is possible, obtain an advisory lock based on the - // parameters of the unique job first, and have contending inserts - // wait for it. This is a synchronous lock so we rely on context - // timeout in case something goes wrong and it's blocking for too - // long. - if _, err := exec.PGAdvisoryXactLock(ctx, advisoryLockHash.Key()); err != nil { - return nil, fmt.Errorf("error acquiring unique lock: %w", err) - } - - existing, err := exec.JobGetByKindAndUniqueProperties(ctx, getParams) - if err != nil { - if !errors.Is(err, rivertype.ErrNotFound) { - return nil, fmt.Errorf("error getting unique job: %w", err) - } - } - - if existing != nil { - // Insert skipped; returns an existing row. - return &rivertype.JobInsertResult{Job: existing, UniqueSkippedAsDuplicate: true}, nil - } + return (*rivertype.JobInsertResult)(insertRes), nil + } + + // Slow path: open a subtransaction, take an advisory lock, check to see if + // a job with the given criteria exists, then either return an existing row + // or insert a new one. + + advisoryLockHash := hashutil.NewAdvisoryLockHash(i.AdvisoryLockPrefix) + advisoryLockHash.Write([]byte("unique_key")) + advisoryLockHash.Write([]byte("kind=" + params.Kind)) + advisoryLockHash.Write([]byte(uniqueKey)) + + getParams := i.buildGetParams(params, uniqueOpts) + + // Begin a subtransaction + subExec, err := exec.Begin(ctx) + if err != nil { + return nil, err + } + defer subExec.Rollback(ctx) + + // The wrapping transaction should maintain snapshot consistency even if we + // were to only have a SELECT + INSERT, but given that a conflict is + // possible, obtain an advisory lock based on the parameters of the unique + // job first, and have contending inserts wait for it. This is a synchronous + // lock so we rely on context timeout in case something goes wrong and it's + // blocking for too long. + if _, err := subExec.PGAdvisoryXactLock(ctx, advisoryLockHash.Key()); err != nil { + return nil, fmt.Errorf("error acquiring unique lock: %w", err) + } + + existing, err := subExec.JobGetByKindAndUniqueProperties(ctx, getParams) + if err != nil { + if !errors.Is(err, rivertype.ErrNotFound) { + return nil, fmt.Errorf("error getting unique job: %w", err) } } - jobRow, err := exec.JobInsertFast(ctx, params) + if existing != nil { + // Insert skipped; returns an existing row. + return &rivertype.JobInsertResult{Job: existing, UniqueSkippedAsDuplicate: true}, nil + } + + jobRow, err := subExec.JobInsertFast(ctx, params) if err != nil { return nil, err } - if tx != nil { - if err := tx.Commit(ctx); err != nil { - return nil, err + if err := subExec.Commit(ctx); err != nil { + return nil, err + } + + return &rivertype.JobInsertResult{Job: jobRow}, nil +} + +// Builds a unique key made up of the unique options in place. The key is hashed +// to become a value for `unique_key` in the fast insertion path, or hashed and +// used for an advisory lock on the slow insertion path. +func (i *UniqueInserter) buildUniqueKey(params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) (string, bool) { + var sb strings.Builder + + if uniqueOpts.ByArgs { + sb.WriteString("&args=") + sb.Write(params.EncodedArgs) + } + + if uniqueOpts.ByPeriod != time.Duration(0) { + lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) + sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339)) + } + + if uniqueOpts.ByQueue { + sb.WriteString("&queue=" + params.Queue) + } + + { + stateSet := defaultUniqueStatesStrings + if len(uniqueOpts.ByState) > 0 { + stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) }) + slices.Sort(stateSet) + } + + sb.WriteString("&state=" + strings.Join(stateSet, ",")) + + if !slices.Contains(stateSet, string(params.State)) { + return "", false + } + } + + return sb.String(), true +} + +// Builds get parameters suitable for looking up a unique job on the slow unique +// insertion path. +func (i *UniqueInserter) buildGetParams(params *riverdriver.JobInsertFastParams, uniqueOpts *UniqueOpts) *riverdriver.JobGetByKindAndUniquePropertiesParams { + getParams := riverdriver.JobGetByKindAndUniquePropertiesParams{ + Kind: params.Kind, + } + + if uniqueOpts.ByArgs { + getParams.Args = params.EncodedArgs + getParams.ByArgs = true + } + + if uniqueOpts.ByPeriod != time.Duration(0) { + lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) + + getParams.ByCreatedAt = true + getParams.CreatedAtBegin = lowerPeriodBound + getParams.CreatedAtEnd = lowerPeriodBound.Add(uniqueOpts.ByPeriod) + } + + if uniqueOpts.ByQueue { + getParams.ByQueue = true + getParams.Queue = params.Queue + } + + { + stateSet := defaultUniqueStatesStrings + if len(uniqueOpts.ByState) > 0 { + stateSet = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) string { return string(s) }) } + + getParams.ByState = true + getParams.State = stateSet } + return &getParams +} + +// Shared shortcut for inserting a row without uniqueness. +func insertNonUnique(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastParams) (*rivertype.JobInsertResult, error) { + jobRow, err := exec.JobInsertFast(ctx, params) + if err != nil { + return nil, err + } return &rivertype.JobInsertResult{Job: jobRow}, nil } + +// Returns true if all elements of the slice `subset` are in the map `set`. +func sliceIsSubset[T comparable, V any](set map[T]V, subset []T) bool { + for _, element := range subset { + if _, ok := set[element]; !ok { + return false + } + } + return true +} diff --git a/internal/dbunique/db_unique_test.go b/internal/dbunique/db_unique_test.go index 065669ad..5f618609 100644 --- a/internal/dbunique/db_unique_test.go +++ b/internal/dbunique/db_unique_test.go @@ -2,6 +2,7 @@ package dbunique import ( "context" + "fmt" "runtime" "sync" "testing" @@ -21,6 +22,22 @@ import ( "github.com/riverqueue/river/rivertype" ) +const queueAlternate = "alternate_queue" + +func makeInsertParams(createdAt *time.Time) *riverdriver.JobInsertFastParams { + return &riverdriver.JobInsertFastParams{ + CreatedAt: createdAt, + EncodedArgs: []byte(`{}`), + Kind: "fake_job", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: nil, + State: rivertype.JobStateAvailable, + } +} + func TestUniqueInserter_JobInsert(t *testing.T) { t.Parallel() @@ -60,26 +77,12 @@ func TestUniqueInserter_JobInsert(t *testing.T) { return inserter, bundle } - makeInsertParams := func(bundle *testBundle) *riverdriver.JobInsertFastParams { - return &riverdriver.JobInsertFastParams{ - CreatedAt: &bundle.baselineTime, - EncodedArgs: []byte(`{}`), - Kind: "fake_job", - MaxAttempts: rivercommon.MaxAttemptsDefault, - Metadata: []byte(`{}`), - Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, - ScheduledAt: nil, - State: rivertype.JobStateAvailable, - } - } - t.Run("Success", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) res, err := inserter.JobInsert(ctx, bundle.exec, insertParams, nil) require.NoError(t, err) @@ -111,7 +114,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { const maxJobsToFetch = 8 - res, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(bundle), nil) + res, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(&bundle.baselineTime), nil) require.NoError(t, err) require.NotEqual(t, 0, res.Job.ID, "expected job ID to be set, got %d", res.Job.ID) require.WithinDuration(t, time.Now(), res.Job.ScheduledAt, 1*time.Second) @@ -128,7 +131,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { "expected selected job to be in running state, got %q", jobs[0].State) for i := 1; i < 10; i++ { - _, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(bundle), nil) + _, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(&bundle.baselineTime), nil) require.NoError(t, err) } @@ -155,18 +158,53 @@ func TestUniqueInserter_JobInsert(t *testing.T) { "expected to fetch 1 remaining job but fetched %d jobs:\n%+v", len(jobs), jobs) }) - t.Run("UniqueJobByArgs", func(t *testing.T) { + t.Run("UniqueJobByArgsFastPath", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByArgs: true, } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) require.NoError(t, err) + require.NotNil(t, res0.Job.UniqueKey) + require.False(t, res0.UniqueSkippedAsDuplicate) + + // Insert a second job with the same args, but expect that the same job + // ID to come back because we're still within its unique parameters. + res1, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Equal(t, res0.Job.ID, res1.Job.ID) + require.True(t, res1.UniqueSkippedAsDuplicate) + + insertParams.EncodedArgs = []byte(`{"key":"different"}`) + + // Same operation again, except that because we've modified the unique + // dimension, another job is allowed to be queued, so the new ID is + // not the same. + res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) + }) + + t.Run("UniqueJobByArgsSlowPath", func(t *testing.T) { + t.Parallel() + + inserter, bundle := setup(t) + + insertParams := makeInsertParams(&bundle.baselineTime) + uniqueOpts := &UniqueOpts{ + ByArgs: true, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path + } + + res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Nil(t, res0.Job.UniqueKey) require.False(t, res0.UniqueSkippedAsDuplicate) // Insert a second job with the same args, but expect that the same job @@ -187,17 +225,52 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.False(t, res2.UniqueSkippedAsDuplicate) }) - t.Run("UniqueJobByPeriod", func(t *testing.T) { + t.Run("UniqueJobByPeriodFastPath", func(t *testing.T) { + t.Parallel() + + inserter, bundle := setup(t) + + insertParams := makeInsertParams(&bundle.baselineTime) + uniqueOpts := &UniqueOpts{ + ByPeriod: 15 * time.Minute, + } + + res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotNil(t, res0.Job.UniqueKey) + require.False(t, res0.UniqueSkippedAsDuplicate) + + // Insert a second job with the same args, but expect that the same job + // ID to come back because we're still within its unique parameters. + res1, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Equal(t, res0.Job.ID, res1.Job.ID) + require.True(t, res1.UniqueSkippedAsDuplicate) + + inserter.Time.StubNowUTC(bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second)) + + // Same operation again, except that because we've advanced time passed + // the period within unique bounds, another job is allowed to be queued, + // so the new ID is not the same. + res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) + }) + + t.Run("UniqueJobByPeriodSlowPath", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByPeriod: 15 * time.Minute, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.Nil(t, res0.Job.UniqueKey) require.NoError(t, err) require.False(t, res0.UniqueSkippedAsDuplicate) @@ -219,18 +292,53 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.False(t, res2.UniqueSkippedAsDuplicate) }) - t.Run("UniqueJobByQueue", func(t *testing.T) { + t.Run("UniqueJobByQueueFastPath", func(t *testing.T) { + t.Parallel() + + inserter, bundle := setup(t) + + insertParams := makeInsertParams(&bundle.baselineTime) + uniqueOpts := &UniqueOpts{ + ByQueue: true, + } + + res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotNil(t, res0.Job.UniqueKey) + require.False(t, res0.UniqueSkippedAsDuplicate) + + // Insert a second job with the same args, but expect that the same job + // ID to come back because we're still within its unique parameters. + res1, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Equal(t, res0.Job.ID, res1.Job.ID) + require.True(t, res1.UniqueSkippedAsDuplicate) + + insertParams.Queue = queueAlternate + + // Same operation again, except that because we've modified the unique + // dimension, another job is allowed to be queued, so the new ID is + // not the same. + res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) + }) + + t.Run("UniqueJobByQueueSlowPath", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByQueue: true, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) require.NoError(t, err) + require.Nil(t, res0.Job.UniqueKey) require.False(t, res0.UniqueSkippedAsDuplicate) // Insert a second job with the same args, but expect that the same job @@ -240,7 +348,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.Equal(t, res0.Job.ID, res1.Job.ID) require.True(t, res1.UniqueSkippedAsDuplicate) - insertParams.Queue = "alternate_queue" + insertParams.Queue = queueAlternate // Same operation again, except that because we've modified the unique // dimension, another job is allowed to be queued, so the new ID is @@ -256,13 +364,14 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRunning}, } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) require.NoError(t, err) + require.NotNil(t, res0.Job.UniqueKey) // fast path because states are a subset of defaults require.False(t, res0.UniqueSkippedAsDuplicate) // Insert a second job with the same args, but expect that the same job @@ -272,6 +381,14 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.Equal(t, res0.Job.ID, res1.Job.ID) require.True(t, res1.UniqueSkippedAsDuplicate) + // States are the same, but ordered differently. Still counts as the same job. + res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, &UniqueOpts{ + ByState: []rivertype.JobState{rivertype.JobStateRunning, rivertype.JobStateAvailable}, + }) + require.NoError(t, err) + require.Equal(t, res0.Job.ID, res2.Job.ID) + require.True(t, res2.UniqueSkippedAsDuplicate) + // A new job is allowed if we're inserting the job with a state that's // not included in the unique state set. { @@ -293,6 +410,8 @@ func TestUniqueInserter_JobInsert(t *testing.T) { FinalizedAt: ptrutil.Ptr(bundle.baselineTime), StateDoUpdate: true, State: rivertype.JobStateCompleted, + UniqueKeyDoUpdate: true, // `unique_key` is normally NULLed by the client or completer + UniqueKey: nil, }) require.NoError(t, err) @@ -310,13 +429,14 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByQueue: true, } res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) require.NoError(t, err) + require.NotNil(t, res0.Job.UniqueKey) // fast path because states are a subset of defaults require.False(t, res0.UniqueSkippedAsDuplicate) // Insert a second job with the same args, but expect that the same job @@ -363,6 +483,8 @@ func TestUniqueInserter_JobInsert(t *testing.T) { FinalizedAt: ptrutil.Ptr(bundle.baselineTime), StateDoUpdate: true, State: rivertype.JobStateDiscarded, + UniqueKeyDoUpdate: true, // `unique_key` is normally NULLed by the client or completer + UniqueKey: nil, }) require.NoError(t, err) @@ -375,12 +497,65 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.False(t, res2.UniqueSkippedAsDuplicate) }) + t.Run("UniqueJobByStateSlowPath", func(t *testing.T) { + t.Parallel() + + inserter, bundle := setup(t) + + insertParams := makeInsertParams(&bundle.baselineTime) + uniqueOpts := &UniqueOpts{ + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, + } + + res0, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Nil(t, res0.Job.UniqueKey) // slow path because states are *not* a subset of defaults + require.False(t, res0.UniqueSkippedAsDuplicate) + + // Insert a second job with the same args, but expect that the same job + // ID to come back because we're still within its unique parameters. + res1, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.Equal(t, res0.Job.ID, res1.Job.ID) + require.True(t, res1.UniqueSkippedAsDuplicate) + + // A new job is allowed if we're inserting the job with a state that's + // not included in the unique state set. + { + insertParams := *insertParams // dup + insertParams.State = rivertype.JobStateRunning + + res2, err := inserter.JobInsert(ctx, bundle.exec, &insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) + } + + // A new job is also allowed if the state of the originally inserted job + // changes to one that's not included in the unique state set. + { + _, err := bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + ID: res0.Job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: ptrutil.Ptr(bundle.baselineTime), + StateDoUpdate: true, + State: rivertype.JobStateCompleted, + }) + require.NoError(t, err) + + res2, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(t, err) + require.NotEqual(t, res0.Job.ID, res2.Job.ID) + require.False(t, res2.UniqueSkippedAsDuplicate) + } + }) + t.Run("UniqueJobAllOptions", func(t *testing.T) { t.Parallel() inserter, bundle := setup(t) - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByArgs: true, ByPeriod: 15 * time.Minute, @@ -429,7 +604,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { // With queue modified { insertParams := *insertParams // dup - insertParams.Queue = "alternate_queue" + insertParams.Queue = queueAlternate // New job because a unique dimension has changed. res2, err := inserter.JobInsert(ctx, bundle.exec, &insertParams, uniqueOpts) @@ -459,7 +634,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { bundle.driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, t)) bundle.exec = bundle.driver.GetExecutor() - insertParams := makeInsertParams(bundle) + insertParams := makeInsertParams(&bundle.baselineTime) uniqueOpts := &UniqueOpts{ ByPeriod: 15 * time.Minute, } @@ -500,3 +675,144 @@ func TestUniqueInserter_JobInsert(t *testing.T) { } }) } + +func TestSliceIsSubset(t *testing.T) { + t.Parallel() + + evenNumbersMap := map[int]struct{}{ + 0: {}, + 2: {}, + 4: {}, + 6: {}, + 8: {}, + 10: {}, + } + + require.True(t, sliceIsSubset(evenNumbersMap, []int{0})) + require.True(t, sliceIsSubset(evenNumbersMap, []int{0, 2})) + require.True(t, sliceIsSubset(evenNumbersMap, []int{0, 2, 4})) + require.True(t, sliceIsSubset(evenNumbersMap, []int{2, 4, 6})) + require.True(t, sliceIsSubset(evenNumbersMap, []int{2, 6, 8})) + require.True(t, sliceIsSubset(evenNumbersMap, []int{0, 2, 4, 6, 8, 10})) + require.True(t, sliceIsSubset(evenNumbersMap, []int{})) + + require.False(t, sliceIsSubset(evenNumbersMap, []int{1})) + require.False(t, sliceIsSubset(evenNumbersMap, []int{1, 3})) + require.False(t, sliceIsSubset(evenNumbersMap, []int{1, 3, 5})) + require.False(t, sliceIsSubset(evenNumbersMap, []int{1, 2, 4})) + require.False(t, sliceIsSubset(evenNumbersMap, []int{2, 4, 5})) + require.False(t, sliceIsSubset(evenNumbersMap, []int{0, 2, 4, 6, 8, 10, 11})) +} + +func BenchmarkUniqueInserter(b *testing.B) { + ctx := context.Background() + + type testBundle struct { + driver riverdriver.Driver[pgx.Tx] + exec riverdriver.Executor + tx pgx.Tx + } + + setup := func(b *testing.B) (*UniqueInserter, *testBundle) { + b.Helper() + + var ( + driver = riverpgxv5.New(nil) + tx = riverinternaltest.TestTx(ctx, b) + ) + + bundle := &testBundle{ + driver: driver, + exec: driver.UnwrapExecutor(tx), + tx: tx, + } + + inserter := baseservice.Init(riversharedtest.BaseServiceArchetype(b), &UniqueInserter{}) + + return inserter, bundle + } + + // Simulates the case where many existing jobs are in the database already. + // Useful as a benchmark because the advisory lock strategy's look up get + // slow with many existing jobs. + generateManyExistingJobs := func(b *testing.B, inserter *UniqueInserter, bundle *testBundle) { + b.Helper() + + insertParams := makeInsertParams(nil) + + for i := 0; i < 10_000; i++ { + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, nil) + require.NoError(b, err) + } + } + + b.Run("FastPathEmptyDatabase", func(b *testing.B) { + inserter, bundle := setup(b) + + insertParams := makeInsertParams(nil) + uniqueOpts := &UniqueOpts{ByArgs: true} + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + insertParams.EncodedArgs = []byte(fmt.Sprintf(`{"job_num":%d}`, n%1000)) + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(b, err) + } + }) + + b.Run("FastPathManyExistingJobs", func(b *testing.B) { + inserter, bundle := setup(b) + + generateManyExistingJobs(b, inserter, bundle) + + insertParams := makeInsertParams(nil) + uniqueOpts := &UniqueOpts{ByArgs: true} + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + insertParams.EncodedArgs = []byte(fmt.Sprintf(`{"job_num":%d}`, n%1000)) + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(b, err) + } + }) + + b.Run("SlowPathEmptyDatabase", func(b *testing.B) { + inserter, bundle := setup(b) + + insertParams := makeInsertParams(nil) + uniqueOpts := &UniqueOpts{ + ByArgs: true, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + insertParams.EncodedArgs = []byte(fmt.Sprintf(`{"job_num":%d}`, n%1000)) + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(b, err) + } + }) + + b.Run("SlowPathManyExistingJobs", func(b *testing.B) { + inserter, bundle := setup(b) + + generateManyExistingJobs(b, inserter, bundle) + + insertParams := makeInsertParams(nil) + uniqueOpts := &UniqueOpts{ + ByArgs: true, + ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCancelled}, // use of non-standard states triggers slow path + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + insertParams.EncodedArgs = []byte(fmt.Sprintf(`{"job_num":%d}`, n%1000)) + _, err := inserter.JobInsert(ctx, bundle.exec, insertParams, uniqueOpts) + require.NoError(b, err) + } + }) +} diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index e7885192..aece3262 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -995,6 +995,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, ScheduledAt: &now, State: rivertype.JobStateCompleted, Tags: []string{"tag"}, + UniqueKey: []byte("unique-key"), }) require.NoError(t, err) require.Equal(t, 3, job.Attempt) @@ -1011,6 +1012,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, requireEqualTime(t, now, job.ScheduledAt) require.Equal(t, rivertype.JobStateCompleted, job.State) require.Equal(t, []string{"tag"}, job.Tags) + require.Equal(t, []byte("unique-key"), job.UniqueKey) }) t.Run("JobFinalizedAtConstraint", func(t *testing.T) { @@ -1094,6 +1096,111 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) + t.Run("JobInsertUnique", func(t *testing.T) { + t.Parallel() + + t.Run("MinimalArgsWithDefaults", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + insertRes, err := exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + }, + UniqueKey: []byte("unique-key"), + }) + require.NoError(t, err) + require.Equal(t, 0, insertRes.Job.Attempt) + require.Nil(t, insertRes.Job.AttemptedAt) + require.WithinDuration(t, now, insertRes.Job.CreatedAt, 2*time.Second) + require.Equal(t, []byte(`{"encoded": "args"}`), insertRes.Job.EncodedArgs) + require.Empty(t, insertRes.Job.Errors) + require.Nil(t, insertRes.Job.FinalizedAt) + require.Equal(t, "test_kind", insertRes.Job.Kind) + require.Equal(t, rivercommon.MaxAttemptsDefault, insertRes.Job.MaxAttempts) + require.Equal(t, []byte(`{}`), insertRes.Job.Metadata) + require.Equal(t, rivercommon.PriorityDefault, insertRes.Job.Priority) + require.Equal(t, rivercommon.QueueDefault, insertRes.Job.Queue) + require.WithinDuration(t, now, insertRes.Job.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateAvailable, insertRes.Job.State) + require.Equal(t, []string{}, insertRes.Job.Tags) + require.Equal(t, []byte("unique-key"), insertRes.Job.UniqueKey) + }) + + t.Run("AllArgs", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + targetTime := time.Now().UTC().Add(-15 * time.Minute) + + insertRes, err := exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: &riverdriver.JobInsertFastParams{ + CreatedAt: &targetTime, + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: 6, + Metadata: []byte(`{"meta": "data"}`), + Priority: 2, + Queue: "queue_name", + ScheduledAt: &targetTime, + State: rivertype.JobStateRunning, + Tags: []string{"tag"}, + }, + UniqueKey: []byte("unique-key"), + }) + require.NoError(t, err) + require.Equal(t, 0, insertRes.Job.Attempt) + require.Nil(t, insertRes.Job.AttemptedAt) + requireEqualTime(t, targetTime, insertRes.Job.CreatedAt) + require.Equal(t, []byte(`{"encoded": "args"}`), insertRes.Job.EncodedArgs) + require.Empty(t, insertRes.Job.Errors) + require.Nil(t, insertRes.Job.FinalizedAt) + require.Equal(t, "test_kind", insertRes.Job.Kind) + require.Equal(t, 6, insertRes.Job.MaxAttempts) + require.Equal(t, []byte(`{"meta": "data"}`), insertRes.Job.Metadata) + require.Equal(t, 2, insertRes.Job.Priority) + require.Equal(t, "queue_name", insertRes.Job.Queue) + requireEqualTime(t, targetTime, insertRes.Job.ScheduledAt) + require.Equal(t, rivertype.JobStateRunning, insertRes.Job.State) + require.Equal(t, []string{"tag"}, insertRes.Job.Tags) + require.Equal(t, []byte("unique-key"), insertRes.Job.UniqueKey) + }) + + t.Run("ReturnsExistingOnConflict", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + params := &riverdriver.JobInsertUniqueParams{ + JobInsertFastParams: &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + }, + UniqueKey: []byte("unique-key"), + } + + insertRes1, err := exec.JobInsertUnique(ctx, params) + require.NoError(t, err) + + insertRes2, err := exec.JobInsertUnique(ctx, params) + require.NoError(t, err) + require.Equal(t, insertRes1.Job.ID, insertRes2.Job.ID) + }) + }) + t.Run("JobList", func(t *testing.T) { t.Parallel() @@ -1392,7 +1499,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) - job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), UniqueKey: []byte("unique-key")}) // Running, but won't be completed. otherJob := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) @@ -1420,6 +1527,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) require.Equal(t, rivertype.JobStateCompleted, job3Updated.State) require.WithinDuration(t, finalizedAt3, *job3Updated.FinalizedAt, time.Microsecond) + require.Equal(t, "unique-key", string(job3Updated.UniqueKey)) otherJobUpdated, err := exec.JobGetByID(ctx, otherJob.ID) require.NoError(t, err) @@ -1512,7 +1620,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC() job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: ptrutil.Ptr(rivertype.JobStateRunning), + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), }) jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateCompleted(job.ID, now)) @@ -1523,6 +1632,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, jobUpdated, err := exec.JobGetByID(ctx, job.ID) require.NoError(t, err) require.Equal(t, rivertype.JobStateCompleted, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) }) t.Run("DoesNotCompleteARetryableJob", func(t *testing.T) { @@ -1533,7 +1643,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC() job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: ptrutil.Ptr(rivertype.JobStateRetryable), + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key"), }) jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateCompleted(job.ID, now)) @@ -1544,21 +1655,22 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, jobUpdated, err := exec.JobGetByID(ctx, job.ID) require.NoError(t, err) require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) }) }) - t.Run("JobSetStateIfRunning_JobSetStateErrored", func(t *testing.T) { - t.Parallel() + makeErrPayload := func(t *testing.T, now time.Time) []byte { + t.Helper() - makeErrPayload := func(t *testing.T, now time.Time) []byte { - t.Helper() + errPayload, err := json.Marshal(rivertype.AttemptError{ + Attempt: 1, At: now, Error: "fake error", Trace: "foo.go:123\nbar.go:456", + }) + require.NoError(t, err) + return errPayload + } - errPayload, err := json.Marshal(rivertype.AttemptError{ - Attempt: 1, At: now, Error: "fake error", Trace: "foo.go:123\nbar.go:456", - }) - require.NoError(t, err) - return errPayload - } + t.Run("JobSetStateIfRunning_JobSetStateErrored", func(t *testing.T) { + t.Parallel() t.Run("SetsARunningJobToRetryable", func(t *testing.T) { t.Parallel() @@ -1568,7 +1680,8 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC() job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - State: ptrutil.Ptr(rivertype.JobStateRunning), + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), }) jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateErrorRetryable(job.ID, now, makeErrPayload(t, now))) @@ -1579,6 +1692,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, jobUpdated, err := exec.JobGetByID(ctx, job.ID) require.NoError(t, err) require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) // validate error payload: require.Len(t, jobAfter.Errors, 1) @@ -1639,6 +1753,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.WithinDuration(t, time.Now().UTC(), *jobAfter.FinalizedAt, 2*time.Second) // ScheduledAt should not be touched: require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + // Errors should still be appended to: require.Len(t, jobAfter.Errors, 1) require.Contains(t, jobAfter.Errors[0].Error, "fake error") @@ -1650,6 +1765,60 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) + t.Run("JobSetStateIfRunning_JobSetStateCancelled", func(t *testing.T) { //nolint:dupl + t.Parallel() + + t.Run("DiscardsARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + }) + + jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateCancelled(job.ID, now, makeErrPayload(t, now))) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, jobUpdated.State) + require.Nil(t, jobUpdated.UniqueKey) + }) + }) + + t.Run("JobSetStateIfRunning_JobSetStateDiscarded", func(t *testing.T) { //nolint:dupl + t.Parallel() + + t.Run("DiscardsARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + }) + + jobAfter, err := exec.JobSetStateIfRunning(ctx, riverdriver.JobSetStateDiscarded(job.ID, now, makeErrPayload(t, now))) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, jobUpdated.State) + require.Nil(t, jobUpdated.UniqueKey) + }) + }) + t.Run("JobUpdate", func(t *testing.T) { t.Parallel() diff --git a/internal/riverinternaltest/testfactory/test_factory.go b/internal/riverinternaltest/testfactory/test_factory.go index faba3a5c..be73ece1 100644 --- a/internal/riverinternaltest/testfactory/test_factory.go +++ b/internal/riverinternaltest/testfactory/test_factory.go @@ -33,6 +33,7 @@ type JobOpts struct { ScheduledAt *time.Time State *rivertype.JobState Tags []string + UniqueKey []byte } func Job(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts *JobOpts) *rivertype.JobRow { @@ -76,6 +77,7 @@ func Job_Build(tb testing.TB, opts *JobOpts) *riverdriver.JobInsertFullParams { ScheduledAt: opts.ScheduledAt, State: ptrutil.ValOrDefault(opts.State, rivertype.JobStateAvailable), Tags: tags, + UniqueKey: opts.UniqueKey, } } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 925b48d9..51e00efd 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -118,6 +118,7 @@ type Executor interface { JobInsertFast(ctx context.Context, params *JobInsertFastParams) (*rivertype.JobRow, error) JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) (int, error) JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error) + JobInsertUnique(ctx context.Context, params *JobInsertUniqueParams) (*JobInsertUniqueResult, error) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) JobListFields() string JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error) @@ -259,6 +260,16 @@ type JobInsertFastParams struct { Tags []string } +type JobInsertUniqueParams struct { + *JobInsertFastParams + UniqueKey []byte +} + +type JobInsertUniqueResult struct { + Job *rivertype.JobRow + UniqueSkippedAsDuplicate bool +} + type JobInsertFullParams struct { Attempt int AttemptedAt *time.Time @@ -274,6 +285,7 @@ type JobInsertFullParams struct { ScheduledAt *time.Time State rivertype.JobState Tags []string + UniqueKey []byte } type JobRescueManyParams struct { @@ -353,6 +365,8 @@ type JobUpdateParams struct { FinalizedAt *time.Time StateDoUpdate bool State rivertype.JobState + UniqueKeyDoUpdate bool + UniqueKey []byte } // Leader represents a River leader. diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/models.go b/riverdriver/riverdatabasesql/internal/dbsqlc/models.go index 7e7b0976..792115b4 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/models.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/models.go @@ -75,6 +75,7 @@ type RiverJob struct { State RiverJobState ScheduledAt time.Time Tags []string + UniqueKey []byte } type RiverLeader struct { diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index a492c81f..c9a36162 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -45,14 +45,14 @@ updated_job AS ( metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true) FROM notification WHERE river_job.id = notification.id - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -82,6 +82,7 @@ func (q *Queries) JobCancel(ctx context.Context, db DBTX, arg *JobCancelParams) &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -113,14 +114,14 @@ deleted_job AS ( WHERE river_job.id = job_to_delete.id -- Do not touch running jobs: AND river_job.state != 'running' - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM deleted_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM deleted_job ` @@ -144,6 +145,7 @@ func (q *Queries) JobDelete(ctx context.Context, db DBTX, id int64) (*RiverJob, &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -161,7 +163,7 @@ WITH deleted_jobs AS ( ORDER BY id LIMIT $4::bigint ) - RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ) SELECT count(*) FROM deleted_jobs @@ -189,7 +191,7 @@ func (q *Queries) JobDeleteBefore(ctx context.Context, db DBTX, arg *JobDeleteBe const jobGetAvailable = `-- name: JobGetAvailable :many WITH locked_jobs AS ( SELECT - id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE @@ -216,7 +218,7 @@ FROM WHERE river_job.id = locked_jobs.id RETURNING - river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ` type JobGetAvailableParams struct { @@ -251,6 +253,7 @@ func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvail &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -266,7 +269,7 @@ func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvail } const jobGetByID = `-- name: JobGetByID :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1 LIMIT 1 @@ -292,12 +295,13 @@ func (q *Queries) JobGetByID(ctx context.Context, db DBTX, id int64) (*RiverJob, &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } const jobGetByIDMany = `-- name: JobGetByIDMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = any($1::bigint[]) ORDER BY id @@ -329,6 +333,7 @@ func (q *Queries) JobGetByIDMany(ctx context.Context, db DBTX, id []int64) ([]*R &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -344,7 +349,7 @@ func (q *Queries) JobGetByIDMany(ctx context.Context, db DBTX, id []int64) ([]*R } const jobGetByKindAndUniqueProperties = `-- name: JobGetByKindAndUniqueProperties :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE kind = $1 AND CASE WHEN $2::boolean THEN args = $3 ELSE true END @@ -397,12 +402,13 @@ func (q *Queries) JobGetByKindAndUniqueProperties(ctx context.Context, db DBTX, &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } const jobGetByKindMany = `-- name: JobGetByKindMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE kind = any($1::text[]) ORDER BY id @@ -434,6 +440,7 @@ func (q *Queries) JobGetByKindMany(ctx context.Context, db DBTX, kind []string) &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -449,7 +456,7 @@ func (q *Queries) JobGetByKindMany(ctx context.Context, db DBTX, kind []string) } const jobGetStuck = `-- name: JobGetStuck :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE state = 'running' AND attempted_at < $1::timestamptz @@ -488,6 +495,7 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -527,7 +535,7 @@ INSERT INTO river_job( coalesce($9::timestamptz, now()), $10, coalesce($11::varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobInsertFastParams struct { @@ -576,6 +584,7 @@ func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFast &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -653,7 +662,8 @@ INSERT INTO river_job( queue, scheduled_at, state, - tags + tags, + unique_key ) VALUES ( $1::jsonb, coalesce($2::smallint, 0), @@ -668,8 +678,9 @@ INSERT INTO river_job( $11, coalesce($12::timestamptz, now()), $13, - coalesce($14::varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + coalesce($14::varchar(255)[], '{}'), + $15 +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobInsertFullParams struct { @@ -687,6 +698,7 @@ type JobInsertFullParams struct { ScheduledAt *time.Time State RiverJobState Tags []string + UniqueKey []byte } func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFullParams) (*RiverJob, error) { @@ -705,6 +717,7 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull arg.ScheduledAt, arg.State, pq.Array(arg.Tags), + arg.UniqueKey, ) var i RiverJob err := row.Scan( @@ -724,6 +737,116 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, + ) + return &i, err +} + +const jobInsertUnique = `-- name: JobInsertUnique :one +INSERT INTO river_job( + args, + created_at, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags, + unique_key +) VALUES ( + $1, + coalesce($2::timestamptz, now()), + $3, + $4, + $5, + coalesce($6::jsonb, '{}'), + $7, + $8, + coalesce($9::timestamptz, now()), + $10, + coalesce($11::varchar(255)[], '{}'), + $12 +) +ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, (xmax != 0) AS unique_skipped_as_duplicate +` + +type JobInsertUniqueParams struct { + Args string + CreatedAt *time.Time + FinalizedAt *time.Time + Kind string + MaxAttempts int16 + Metadata string + Priority int16 + Queue string + ScheduledAt *time.Time + State RiverJobState + Tags []string + UniqueKey []byte +} + +type JobInsertUniqueRow struct { + ID int64 + Args string + Attempt int16 + AttemptedAt *time.Time + AttemptedBy []string + CreatedAt time.Time + Errors []string + FinalizedAt *time.Time + Kind string + MaxAttempts int16 + Metadata string + Priority int16 + Queue string + State RiverJobState + ScheduledAt time.Time + Tags []string + UniqueKey []byte + UniqueSkippedAsDuplicate bool +} + +func (q *Queries) JobInsertUnique(ctx context.Context, db DBTX, arg *JobInsertUniqueParams) (*JobInsertUniqueRow, error) { + row := db.QueryRowContext(ctx, jobInsertUnique, + arg.Args, + arg.CreatedAt, + arg.FinalizedAt, + arg.Kind, + arg.MaxAttempts, + arg.Metadata, + arg.Priority, + arg.Queue, + arg.ScheduledAt, + arg.State, + pq.Array(arg.Tags), + arg.UniqueKey, + ) + var i JobInsertUniqueRow + err := row.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + &i.UniqueSkippedAsDuplicate, ) return &i, err } @@ -786,14 +909,14 @@ updated_job AS ( AND river_job.state != 'running' -- If the job is already available with a prior scheduled_at, leave it alone. AND NOT (river_job.state = 'available' AND river_job.scheduled_at < now()) - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -817,6 +940,7 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -844,7 +968,7 @@ river_job_scheduled AS ( WHERE river_job.id = jobs_to_schedule.id RETURNING river_job.id ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id IN (SELECT id FROM river_job_scheduled) ` @@ -880,6 +1004,7 @@ func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobSchedulePara &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -914,13 +1039,13 @@ updated_job AS ( state = 'completed' FROM job_to_update WHERE river_job.id = job_to_update.id - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id IN (SELECT id FROM job_to_finalized_at EXCEPT SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -955,6 +1080,7 @@ func (q *Queries) JobSetCompleteIfRunningMany(ctx context.Context, db DBTX, arg &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ); err != nil { return nil, err } @@ -991,18 +1117,20 @@ updated_job AS ( max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 ELSE max_attempts END, scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END + ELSE scheduled_at END, + unique_key = CASE WHEN $1 IN ('cancelled', 'discarded') THEN NULL + ELSE unique_key END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running' - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $2::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -1050,6 +1178,7 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } @@ -1061,9 +1190,10 @@ SET attempted_at = CASE WHEN $3::boolean THEN $4 ELSE attempted_at END, errors = CASE WHEN $5::boolean THEN $6::jsonb[] ELSE errors END, finalized_at = CASE WHEN $7::boolean THEN $8 ELSE finalized_at END, - state = CASE WHEN $9::boolean THEN $10 ELSE state END -WHERE id = $11 -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + state = CASE WHEN $9::boolean THEN $10 ELSE state END, + unique_key = CASE WHEN $11::boolean THEN $12 ELSE unique_key END +WHERE id = $13 +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobUpdateParams struct { @@ -1077,6 +1207,8 @@ type JobUpdateParams struct { FinalizedAt *time.Time StateDoUpdate bool State RiverJobState + UniqueKeyDoUpdate bool + UniqueKey []byte ID int64 } @@ -1094,6 +1226,8 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) arg.FinalizedAt, arg.StateDoUpdate, arg.State, + arg.UniqueKeyDoUpdate, + arg.UniqueKey, arg.ID, ) var i RiverJob @@ -1114,6 +1248,7 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) &i.State, &i.ScheduledAt, pq.Array(&i.Tags), + &i.UniqueKey, ) return &i, err } diff --git a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.up.sql b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.up.sql deleted file mode 100644 index f01e5f92..00000000 --- a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.up.sql +++ /dev/null @@ -1,18 +0,0 @@ -ALTER TABLE river_migration - RENAME TO river_migration_old; - -CREATE TABLE river_migration( - line TEXT NOT NULL, - version bigint NOT NULL, - created_at timestamptz NOT NULL DEFAULT NOW(), - CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), - CONSTRAINT version_gte_1 CHECK (version >= 1), - PRIMARY KEY (line, version) -); - -INSERT INTO river_migration - (created_at, line, version) -SELECT created_at, 'main', version -FROM river_migration_old; - -DROP TABLE river_migration_old; \ No newline at end of file diff --git a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.down.sql b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql similarity index 74% rename from riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.down.sql rename to riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql index ffdf1240..f2136ede 100644 --- a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line.down.sql +++ b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql @@ -1,5 +1,7 @@ -- --- If any non-main migration are present, 005 is considered irreversible. +-- Revert to migration table based only on `(version)`. +-- +-- If any non-main migrations are present, 005 is considered irreversible. -- DO @@ -33,4 +35,11 @@ INSERT INTO river_migration SELECT created_at, version FROM river_migration_old; -DROP TABLE river_migration_old; \ No newline at end of file +DROP TABLE river_migration_old; + +-- +-- Drop `river_job.unique_key`. +-- + +ALTER TABLE river_job + DROP COLUMN unique_key; diff --git a/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql new file mode 100644 index 00000000..155ce30d --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql @@ -0,0 +1,34 @@ +-- +-- Rebuild the migration table so it's based on `(line, version)`. +-- + +ALTER TABLE river_migration + RENAME TO river_migration_old; + +CREATE TABLE river_migration( + line TEXT NOT NULL, + version bigint NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), + PRIMARY KEY (line, version) +); + +INSERT INTO river_migration + (created_at, line, version) +SELECT created_at, 'main', version +FROM river_migration_old; + +DROP TABLE river_migration_old; + +-- +-- Add `river_job.unique_key` and bring up an index on it. +-- + +-- These statements use `IF NOT EXISTS` to allow users with a `river_job` table +-- of non-trivial size to build the index `CONCURRENTLY` out of band of this +-- migration, then follow by completing the migration. +ALTER TABLE river_job + ADD COLUMN IF NOT EXISTS unique_key bytea; + +CREATE UNIQUE INDEX IF NOT EXISTS river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; diff --git a/riverdriver/riverdatabasesql/river_database_sql.go b/riverdriver/riverdatabasesql/river_database_sql.go index c8b790b3..eb4c6ac5 100644 --- a/riverdriver/riverdatabasesql/river_database_sql.go +++ b/riverdriver/riverdatabasesql/river_database_sql.go @@ -278,6 +278,7 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns ScheduledAt: params.ScheduledAt, State: dbsqlc.RiverJobState(params.State), Tags: params.Tags, + UniqueKey: params.UniqueKey, }) if err != nil { return nil, interpretError(err) @@ -285,6 +286,53 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } +func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { + insertRes, err := e.queries.JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{ + Args: string(params.EncodedArgs), + CreatedAt: params.CreatedAt, + Kind: params.Kind, + MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), + Metadata: valutil.ValOrDefault(string(params.Metadata), "{}"), + Priority: int16(min(params.Priority, math.MaxInt16)), + Queue: params.Queue, + ScheduledAt: params.ScheduledAt, + State: dbsqlc.RiverJobState(params.State), + Tags: params.Tags, + UniqueKey: params.UniqueKey, + }) + if err != nil { + return nil, interpretError(err) + } + + jobRow, err := jobRowFromInternal(&dbsqlc.RiverJob{ + ID: insertRes.ID, + Args: insertRes.Args, + Attempt: insertRes.Attempt, + AttemptedAt: insertRes.AttemptedAt, + AttemptedBy: insertRes.AttemptedBy, + CreatedAt: insertRes.CreatedAt, + Errors: insertRes.Errors, + FinalizedAt: insertRes.FinalizedAt, + Kind: insertRes.Kind, + MaxAttempts: insertRes.MaxAttempts, + Metadata: insertRes.Metadata, + Priority: insertRes.Priority, + Queue: insertRes.Queue, + ScheduledAt: insertRes.ScheduledAt, + State: insertRes.State, + Tags: insertRes.Tags, + UniqueKey: insertRes.UniqueKey, + }) + if err != nil { + return nil, err + } + + return &riverdriver.JobInsertUniqueResult{ + Job: jobRow, + UniqueSkippedAsDuplicate: insertRes.UniqueSkippedAsDuplicate, + }, nil +} + func (e *Executor) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) { // `database/sql` has an `sql.Named` system that should theoretically work // for named parameters, but neither Pgx or lib/pq implement it, so just use @@ -421,6 +469,8 @@ func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateP FinalizedAt: params.FinalizedAt, StateDoUpdate: params.StateDoUpdate, State: dbsqlc.RiverJobState(params.State), + UniqueKeyDoUpdate: params.UniqueKeyDoUpdate, + UniqueKey: params.UniqueKey, }) if err != nil { return nil, interpretError(err) @@ -802,6 +852,7 @@ func jobRowFromInternal(internal *dbsqlc.RiverJob) (*rivertype.JobRow, error) { ScheduledAt: internal.ScheduledAt.UTC(), State: rivertype.JobState(internal.State), Tags: internal.Tags, + UniqueKey: internal.UniqueKey, }, nil } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/models.go b/riverdriver/riverpgxv5/internal/dbsqlc/models.go index 0e852e34..5cfbe281 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/models.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/models.go @@ -75,6 +75,7 @@ type RiverJob struct { State RiverJobState ScheduledAt time.Time Tags []string + UniqueKey []byte } type RiverLeader struct { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index ba4d8012..464aadc0 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -26,6 +26,7 @@ CREATE TABLE river_job( state river_job_state NOT NULL DEFAULT 'available', scheduled_at timestamptz NOT NULL DEFAULT NOW(), tags varchar(255)[] NOT NULL DEFAULT '{}', + unique_key bytea, CONSTRAINT finalized_or_finalized_at_null CHECK ( (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) @@ -261,7 +262,8 @@ INSERT INTO river_job( queue, scheduled_at, state, - tags + tags, + unique_key ) VALUES ( @args::jsonb, coalesce(@attempt::smallint, 0), @@ -276,9 +278,43 @@ INSERT INTO river_job( @queue, coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), @state, - coalesce(@tags::varchar(255)[], '{}') + coalesce(@tags::varchar(255)[], '{}'), + @unique_key ) RETURNING *; +-- name: JobInsertUnique :one +INSERT INTO river_job( + args, + created_at, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags, + unique_key +) VALUES ( + @args, + coalesce(sqlc.narg('created_at')::timestamptz, now()), + @finalized_at, + @kind, + @max_attempts, + coalesce(@metadata::jsonb, '{}'), + @priority, + @queue, + coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), + @state, + coalesce(@tags::varchar(255)[], '{}'), + @unique_key +) +ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING *, (xmax != 0) AS unique_skipped_as_duplicate; + -- Run by the rescuer to queue for retry or discard depending on job state. -- name: JobRescueMany :exec UPDATE river_job @@ -405,7 +441,9 @@ updated_job AS ( max_attempts = CASE WHEN NOT should_cancel AND @max_attempts_update::boolean THEN @max_attempts ELSE max_attempts END, scheduled_at = CASE WHEN NOT should_cancel AND @scheduled_at_do_update::boolean THEN sqlc.narg('scheduled_at')::timestamptz - ELSE scheduled_at END + ELSE scheduled_at END, + unique_key = CASE WHEN @state IN ('cancelled', 'discarded') THEN NULL + ELSE unique_key END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running' @@ -428,6 +466,7 @@ SET attempted_at = CASE WHEN @attempted_at_do_update::boolean THEN @attempted_at ELSE attempted_at END, errors = CASE WHEN @errors_do_update::boolean THEN @errors::jsonb[] ELSE errors END, finalized_at = CASE WHEN @finalized_at_do_update::boolean THEN @finalized_at ELSE finalized_at END, - state = CASE WHEN @state_do_update::boolean THEN @state ELSE state END + state = CASE WHEN @state_do_update::boolean THEN @state ELSE state END, + unique_key = CASE WHEN @unique_key_do_update::boolean THEN @unique_key ELSE unique_key END WHERE id = @id RETURNING *; diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 3e2b14ad..dbbf4843 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -43,14 +43,14 @@ updated_job AS ( metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], $3::jsonb, true) FROM notification WHERE river_job.id = notification.id - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -80,6 +80,7 @@ func (q *Queries) JobCancel(ctx context.Context, db DBTX, arg *JobCancelParams) &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -111,14 +112,14 @@ deleted_job AS ( WHERE river_job.id = job_to_delete.id -- Do not touch running jobs: AND river_job.state != 'running' - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM deleted_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM deleted_job ` @@ -142,6 +143,7 @@ func (q *Queries) JobDelete(ctx context.Context, db DBTX, id int64) (*RiverJob, &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -159,7 +161,7 @@ WITH deleted_jobs AS ( ORDER BY id LIMIT $4::bigint ) - RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ) SELECT count(*) FROM deleted_jobs @@ -187,7 +189,7 @@ func (q *Queries) JobDeleteBefore(ctx context.Context, db DBTX, arg *JobDeleteBe const jobGetAvailable = `-- name: JobGetAvailable :many WITH locked_jobs AS ( SELECT - id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE @@ -214,7 +216,7 @@ FROM WHERE river_job.id = locked_jobs.id RETURNING - river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ` type JobGetAvailableParams struct { @@ -249,6 +251,7 @@ func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvail &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -261,7 +264,7 @@ func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvail } const jobGetByID = `-- name: JobGetByID :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1 LIMIT 1 @@ -287,12 +290,13 @@ func (q *Queries) JobGetByID(ctx context.Context, db DBTX, id int64) (*RiverJob, &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } const jobGetByIDMany = `-- name: JobGetByIDMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = any($1::bigint[]) ORDER BY id @@ -324,6 +328,7 @@ func (q *Queries) JobGetByIDMany(ctx context.Context, db DBTX, id []int64) ([]*R &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -336,7 +341,7 @@ func (q *Queries) JobGetByIDMany(ctx context.Context, db DBTX, id []int64) ([]*R } const jobGetByKindAndUniqueProperties = `-- name: JobGetByKindAndUniqueProperties :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE kind = $1 AND CASE WHEN $2::boolean THEN args = $3 ELSE true END @@ -389,12 +394,13 @@ func (q *Queries) JobGetByKindAndUniqueProperties(ctx context.Context, db DBTX, &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } const jobGetByKindMany = `-- name: JobGetByKindMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE kind = any($1::text[]) ORDER BY id @@ -426,6 +432,7 @@ func (q *Queries) JobGetByKindMany(ctx context.Context, db DBTX, kind []string) &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -438,7 +445,7 @@ func (q *Queries) JobGetByKindMany(ctx context.Context, db DBTX, kind []string) } const jobGetStuck = `-- name: JobGetStuck :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE state = 'running' AND attempted_at < $1::timestamptz @@ -477,6 +484,7 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -513,7 +521,7 @@ INSERT INTO river_job( coalesce($9::timestamptz, now()), $10, coalesce($11::varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobInsertFastParams struct { @@ -562,6 +570,7 @@ func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFast &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -639,7 +648,8 @@ INSERT INTO river_job( queue, scheduled_at, state, - tags + tags, + unique_key ) VALUES ( $1::jsonb, coalesce($2::smallint, 0), @@ -654,8 +664,9 @@ INSERT INTO river_job( $11, coalesce($12::timestamptz, now()), $13, - coalesce($14::varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + coalesce($14::varchar(255)[], '{}'), + $15 +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobInsertFullParams struct { @@ -673,6 +684,7 @@ type JobInsertFullParams struct { ScheduledAt *time.Time State RiverJobState Tags []string + UniqueKey []byte } func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFullParams) (*RiverJob, error) { @@ -691,6 +703,7 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull arg.ScheduledAt, arg.State, arg.Tags, + arg.UniqueKey, ) var i RiverJob err := row.Scan( @@ -710,6 +723,116 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, + ) + return &i, err +} + +const jobInsertUnique = `-- name: JobInsertUnique :one +INSERT INTO river_job( + args, + created_at, + finalized_at, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags, + unique_key +) VALUES ( + $1, + coalesce($2::timestamptz, now()), + $3, + $4, + $5, + coalesce($6::jsonb, '{}'), + $7, + $8, + coalesce($9::timestamptz, now()), + $10, + coalesce($11::varchar(255)[], '{}'), + $12 +) +ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, (xmax != 0) AS unique_skipped_as_duplicate +` + +type JobInsertUniqueParams struct { + Args []byte + CreatedAt *time.Time + FinalizedAt *time.Time + Kind string + MaxAttempts int16 + Metadata []byte + Priority int16 + Queue string + ScheduledAt *time.Time + State RiverJobState + Tags []string + UniqueKey []byte +} + +type JobInsertUniqueRow struct { + ID int64 + Args []byte + Attempt int16 + AttemptedAt *time.Time + AttemptedBy []string + CreatedAt time.Time + Errors [][]byte + FinalizedAt *time.Time + Kind string + MaxAttempts int16 + Metadata []byte + Priority int16 + Queue string + State RiverJobState + ScheduledAt time.Time + Tags []string + UniqueKey []byte + UniqueSkippedAsDuplicate bool +} + +func (q *Queries) JobInsertUnique(ctx context.Context, db DBTX, arg *JobInsertUniqueParams) (*JobInsertUniqueRow, error) { + row := db.QueryRow(ctx, jobInsertUnique, + arg.Args, + arg.CreatedAt, + arg.FinalizedAt, + arg.Kind, + arg.MaxAttempts, + arg.Metadata, + arg.Priority, + arg.Queue, + arg.ScheduledAt, + arg.State, + arg.Tags, + arg.UniqueKey, + ) + var i JobInsertUniqueRow + err := row.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + &i.UniqueSkippedAsDuplicate, ) return &i, err } @@ -772,14 +895,14 @@ updated_job AS ( AND river_job.state != 'running' -- If the job is already available with a prior scheduled_at, leave it alone. AND NOT (river_job.state = 'available' AND river_job.scheduled_at < now()) - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $1::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -803,6 +926,7 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -830,7 +954,7 @@ river_job_scheduled AS ( WHERE river_job.id = jobs_to_schedule.id RETURNING river_job.id ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id IN (SELECT id FROM river_job_scheduled) ` @@ -866,6 +990,7 @@ func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobSchedulePara &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -897,13 +1022,13 @@ updated_job AS ( state = 'completed' FROM job_to_update WHERE river_job.id = job_to_update.id - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id IN (SELECT id FROM job_to_finalized_at EXCEPT SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -938,6 +1063,7 @@ func (q *Queries) JobSetCompleteIfRunningMany(ctx context.Context, db DBTX, arg &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ); err != nil { return nil, err } @@ -971,18 +1097,20 @@ updated_job AS ( max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 ELSE max_attempts END, scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END + ELSE scheduled_at END, + unique_key = CASE WHEN $1 IN ('cancelled', 'discarded') THEN NULL + ELSE unique_key END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running' - RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key ) -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM river_job WHERE id = $2::bigint AND id NOT IN (SELECT id FROM updated_job) UNION -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key FROM updated_job ` @@ -1030,6 +1158,7 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } @@ -1041,9 +1170,10 @@ SET attempted_at = CASE WHEN $3::boolean THEN $4 ELSE attempted_at END, errors = CASE WHEN $5::boolean THEN $6::jsonb[] ELSE errors END, finalized_at = CASE WHEN $7::boolean THEN $8 ELSE finalized_at END, - state = CASE WHEN $9::boolean THEN $10 ELSE state END -WHERE id = $11 -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags + state = CASE WHEN $9::boolean THEN $10 ELSE state END, + unique_key = CASE WHEN $11::boolean THEN $12 ELSE unique_key END +WHERE id = $13 +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key ` type JobUpdateParams struct { @@ -1057,6 +1187,8 @@ type JobUpdateParams struct { FinalizedAt *time.Time StateDoUpdate bool State RiverJobState + UniqueKeyDoUpdate bool + UniqueKey []byte ID int64 } @@ -1074,6 +1206,8 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) arg.FinalizedAt, arg.StateDoUpdate, arg.State, + arg.UniqueKeyDoUpdate, + arg.UniqueKey, arg.ID, ) var i RiverJob @@ -1094,6 +1228,7 @@ func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) &i.State, &i.ScheduledAt, &i.Tags, + &i.UniqueKey, ) return &i, err } diff --git a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.up.sql b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.up.sql deleted file mode 100644 index f01e5f92..00000000 --- a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.up.sql +++ /dev/null @@ -1,18 +0,0 @@ -ALTER TABLE river_migration - RENAME TO river_migration_old; - -CREATE TABLE river_migration( - line TEXT NOT NULL, - version bigint NOT NULL, - created_at timestamptz NOT NULL DEFAULT NOW(), - CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), - CONSTRAINT version_gte_1 CHECK (version >= 1), - PRIMARY KEY (line, version) -); - -INSERT INTO river_migration - (created_at, line, version) -SELECT created_at, 'main', version -FROM river_migration_old; - -DROP TABLE river_migration_old; \ No newline at end of file diff --git a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.down.sql b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql similarity index 74% rename from riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.down.sql rename to riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql index ffdf1240..f2136ede 100644 --- a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line.down.sql +++ b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.down.sql @@ -1,5 +1,7 @@ -- --- If any non-main migration are present, 005 is considered irreversible. +-- Revert to migration table based only on `(version)`. +-- +-- If any non-main migrations are present, 005 is considered irreversible. -- DO @@ -33,4 +35,11 @@ INSERT INTO river_migration SELECT created_at, version FROM river_migration_old; -DROP TABLE river_migration_old; \ No newline at end of file +DROP TABLE river_migration_old; + +-- +-- Drop `river_job.unique_key`. +-- + +ALTER TABLE river_job + DROP COLUMN unique_key; diff --git a/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql new file mode 100644 index 00000000..155ce30d --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/005_river_migration_add_line_river_job_add_unique_key.up.sql @@ -0,0 +1,34 @@ +-- +-- Rebuild the migration table so it's based on `(line, version)`. +-- + +ALTER TABLE river_migration + RENAME TO river_migration_old; + +CREATE TABLE river_migration( + line TEXT NOT NULL, + version bigint NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), + PRIMARY KEY (line, version) +); + +INSERT INTO river_migration + (created_at, line, version) +SELECT created_at, 'main', version +FROM river_migration_old; + +DROP TABLE river_migration_old; + +-- +-- Add `river_job.unique_key` and bring up an index on it. +-- + +-- These statements use `IF NOT EXISTS` to allow users with a `river_job` table +-- of non-trivial size to build the index `CONCURRENTLY` out of band of this +-- migration, then follow by completing the migration. +ALTER TABLE river_job + ADD COLUMN IF NOT EXISTS unique_key bytea; + +CREATE UNIQUE INDEX IF NOT EXISTS river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 1ec73106..d29af1c9 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -272,6 +272,7 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns ScheduledAt: params.ScheduledAt, State: dbsqlc.RiverJobState(params.State), Tags: params.Tags, + UniqueKey: params.UniqueKey, }) if err != nil { return nil, interpretError(err) @@ -279,6 +280,53 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } +func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { + insertRes, err := e.queries.JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{ + Args: params.EncodedArgs, + CreatedAt: params.CreatedAt, + Kind: params.Kind, + MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), + Metadata: params.Metadata, + Priority: int16(min(params.Priority, math.MaxInt16)), + Queue: params.Queue, + ScheduledAt: params.ScheduledAt, + State: dbsqlc.RiverJobState(params.State), + Tags: params.Tags, + UniqueKey: params.UniqueKey, + }) + if err != nil { + return nil, interpretError(err) + } + + jobRow, err := jobRowFromInternal(&dbsqlc.RiverJob{ + ID: insertRes.ID, + Args: insertRes.Args, + Attempt: insertRes.Attempt, + AttemptedAt: insertRes.AttemptedAt, + AttemptedBy: insertRes.AttemptedBy, + CreatedAt: insertRes.CreatedAt, + Errors: insertRes.Errors, + FinalizedAt: insertRes.FinalizedAt, + Kind: insertRes.Kind, + MaxAttempts: insertRes.MaxAttempts, + Metadata: insertRes.Metadata, + Priority: insertRes.Priority, + Queue: insertRes.Queue, + ScheduledAt: insertRes.ScheduledAt, + State: insertRes.State, + Tags: insertRes.Tags, + UniqueKey: insertRes.UniqueKey, + }) + if err != nil { + return nil, err + } + + return &riverdriver.JobInsertUniqueResult{ + Job: jobRow, + UniqueSkippedAsDuplicate: insertRes.UniqueSkippedAsDuplicate, + }, nil +} + func (e *Executor) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) { rows, err := e.dbtx.Query(ctx, query, pgx.NamedArgs(namedArgs)) if err != nil { @@ -397,6 +445,8 @@ func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateP FinalizedAt: params.FinalizedAt, StateDoUpdate: params.StateDoUpdate, State: dbsqlc.RiverJobState(params.State), + UniqueKeyDoUpdate: params.UniqueKeyDoUpdate, + UniqueKey: params.UniqueKey, }) if err != nil { return nil, interpretError(err) @@ -780,6 +830,7 @@ func jobRowFromInternal(internal *dbsqlc.RiverJob) (*rivertype.JobRow, error) { ScheduledAt: internal.ScheduledAt.UTC(), State: rivertype.JobState(internal.State), Tags: internal.Tags, + UniqueKey: internal.UniqueKey, }, nil } diff --git a/rivertype/river_type.go b/rivertype/river_type.go index df4f6038..0f8f309c 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -113,6 +113,11 @@ type JobRow struct { // functional behavior and are meant entirely as a user-specified construct // to help group and categorize jobs. Tags []string + + // UniqueKey is a unique key for the job within its kind that's used for + // unique job insertions. It's generated by hashing an inserted job's unique + // opts configuration. + UniqueKey []byte } // JobState is the state of a job. Jobs start their lifecycle as either