Skip to content

Commit

Permalink
Add missing doc for JobRow.UniqueStates + reveal `rivertype.UniqueO…
Browse files Browse the repository at this point in the history
…ptsByStateDefault()`

Two small ones related to unique job insertion:

* Adding a missing doc comment on `JobRow.UniqueStates`.

* As discussed in #704, reveal a way for end users to get the default
  set of unique job states via `rivertype.UniqueOptsByStateDefault()`.
  Similar to `rivertype.JobStates()`, this is revealed as a function so
  that it's not possible to accidentally mutate a global slice. (Go:
  readonly variables would sure be pretty nice.)
  • Loading branch information
brandur committed Dec 27, 2024
1 parent fc61933 commit 8120584
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- The River CLI will now respect the standard set of `PG*` environment variables like `PGHOST`, `PGPORT`, `PGDATABASE`, `PGUSER`, `PGPASSWORD`, and `PGSSLMODE` to configure a target database when the `--database-url` parameter is omitted. [PR #702](https://github.com/riverqueue/river/pull/702).
- Add missing doc for `JobRow.UniqueStates` + reveal `rivertype.UniqueOptsByStateDefault()` to provide access to the default set of unique job states. [PR #707](https://github.com/riverqueue/river/pull/707).

### Changed

Expand Down
4 changes: 4 additions & 0 deletions insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ type UniqueOpts struct {
//
// ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted, rivertype.JobStatePending, rivertype.JobStateRunning, rivertype.JobStateRetryable, rivertype.JobStateScheduled}
//
// Or more succinctly:
//
// ByState: rivertype.UniqueOptsByStateDefault()
//
// With this setting, any jobs of the same kind that have been completed or
// discarded, but not yet cleaned out by the system, will still prevent a
// duplicate unique job from being inserted. For example, with the default
Expand Down
18 changes: 4 additions & 14 deletions internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,9 @@ import (
"github.com/riverqueue/river/rivertype"
)

// When a job has specified unique options, but has not set the ByState
// parameter explicitly, this is the set of default states that are used to
// determine uniqueness. So for example, a new unique job may be inserted even
// if another job already exists, as long as that other job is set `cancelled`
// or `discarded`.
var defaultUniqueStates = []rivertype.JobState{ //nolint:gochecknoglobals
rivertype.JobStateAvailable,
rivertype.JobStateCompleted,
rivertype.JobStatePending,
rivertype.JobStateRetryable,
rivertype.JobStateRunning,
rivertype.JobStateScheduled,
}
// Default job states for UniqueOpts.ByState. Stored here to a variable so we
// don't have to reallocate a slice over and over again.
var uniqueOptsByStateDefault = rivertype.UniqueOptsByStateDefault() //nolint:gochecknoglobals

var jobStateBitPositions = map[rivertype.JobState]uint{ //nolint:gochecknoglobals
rivertype.JobStateAvailable: 7,
Expand Down Expand Up @@ -56,7 +46,7 @@ func (o *UniqueOpts) IsEmpty() bool {
}

func (o *UniqueOpts) StateBitmask() byte {
states := defaultUniqueStates
states := uniqueOptsByStateDefault
if len(o.ByState) > 0 {
states = o.ByState
}
Expand Down
10 changes: 5 additions & 5 deletions internal/dbunique/db_unique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestUniqueKey(t *testing.T) {
encodedArgs, err := json.Marshal(args)
require.NoError(t, err)

states := defaultUniqueStates
states := uniqueOptsByStateDefault
if len(tt.uniqueOpts.ByState) > 0 {
states = tt.uniqueOpts.ByState
}
Expand Down Expand Up @@ -258,9 +258,9 @@ func TestUniqueKey(t *testing.T) {
func TestDefaultUniqueStatesSorted(t *testing.T) {
t.Parallel()

states := slices.Clone(defaultUniqueStates)
states := slices.Clone(uniqueOptsByStateDefault)
slices.Sort(states)
require.Equal(t, states, defaultUniqueStates, "Default unique states should be sorted")
require.Equal(t, states, uniqueOptsByStateDefault, "Default unique states should be sorted")
}

func TestUniqueOptsIsEmpty(t *testing.T) {
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestUniqueOptsStateBitmask(t *testing.T) {
t.Parallel()

emptyOpts := &UniqueOpts{}
require.Equal(t, UniqueStatesToBitmask(defaultUniqueStates), emptyOpts.StateBitmask(), "Empty unique options should have default bitmask")
require.Equal(t, UniqueStatesToBitmask(uniqueOptsByStateDefault), emptyOpts.StateBitmask(), "Empty unique options should have default bitmask")

otherStates := []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted}
nonEmptyOpts := &UniqueOpts{
Expand All @@ -301,7 +301,7 @@ func TestUniqueOptsStateBitmask(t *testing.T) {
func TestUniqueStatesToBitmask(t *testing.T) {
t.Parallel()

bitmask := UniqueStatesToBitmask(defaultUniqueStates)
bitmask := UniqueStatesToBitmask(uniqueOptsByStateDefault)
require.Equal(t, byte(0b11110101), bitmask, "Default unique states should be all set except cancelled and discarded")

for state, position := range jobStateBitPositions {
Expand Down
19 changes: 19 additions & 0 deletions rivertype/river_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ type JobRow struct {
// opts configuration.
UniqueKey []byte

// UniqueStates is the set of states where uniqueness is enforced for this
// job. Equivalent to the default set of unique states unless
// UniqueOpts.ByState was assigned a custom value.
UniqueStates []JobState
}

Expand Down Expand Up @@ -303,3 +306,19 @@ type Queue struct {
// deleted from the table by a maintenance process.
UpdatedAt time.Time
}

// UniqueOptsByStateDefault is the set of job states that are used to determine
// uniqueness unless unique job states have been overridden with
// UniqueOpts.ByState. So for example, with this default set a new unique job
// may be inserted even if another job already exists, as long as that other job
// is set `cancelled` or `discarded`.
func UniqueOptsByStateDefault() []JobState {
return []JobState{
JobStateAvailable,
JobStateCompleted,
JobStatePending,
JobStateRetryable,
JobStateRunning,
JobStateScheduled,
}
}

0 comments on commit 8120584

Please sign in to comment.