diff --git a/example_unique_job_test.go b/example_unique_job_test.go index 56aaa829..1570d394 100644 --- a/example_unique_job_test.go +++ b/example_unique_job_test.go @@ -14,44 +14,47 @@ import ( "github.com/riverqueue/river/riverdriver/riverpgxv5" ) -// Account represents a minimal account containing a unique identifier, recent -// expenditures, and a remaining total. +// Account represents a minimal account recent expenditures and remaining total. type Account struct { - ID int RecentExpenditures int AccountTotal int } -var allAccounts = []Account{ //nolint:gochecknoglobals - {ID: 1, RecentExpenditures: 100, AccountTotal: 1_000}, - {ID: 2, RecentExpenditures: 999, AccountTotal: 1_000}, +// Map of account ID -> account. +var allAccounts = map[int]Account{ //nolint:gochecknoglobals + 1: {RecentExpenditures: 100, AccountTotal: 1_000}, + 2: {RecentExpenditures: 999, AccountTotal: 1_000}, } -type ReconcileAllAccountsArgs struct{} +type ReconcileAccountArgs struct { + AccountID int `json:"account_id"` +} -func (ReconcileAllAccountsArgs) Kind() string { return "reconcile_all_accounts" } +func (ReconcileAccountArgs) Kind() string { return "reconcile_account" } // InsertOpts returns custom insert options that every job of this type will // inherit by default, including unique options. -func (ReconcileAllAccountsArgs) InsertOpts() river.InsertOpts { +func (ReconcileAccountArgs) InsertOpts() river.InsertOpts { return river.InsertOpts{ UniqueOpts: river.UniqueOpts{ + ByArgs: true, ByPeriod: 24 * time.Hour, }, } } -type ReconcileAllAccountsWorker struct { - river.WorkerDefaults[ReconcileAllAccountsArgs] +type ReconcileAccountWorker struct { + river.WorkerDefaults[ReconcileAccountArgs] } -func (w *ReconcileAllAccountsWorker) Work(ctx context.Context, job *river.Job[ReconcileAllAccountsArgs]) error { - for _, account := range allAccounts { - account.AccountTotal -= account.RecentExpenditures - account.RecentExpenditures = 0 +func (w *ReconcileAccountWorker) Work(ctx context.Context, job *river.Job[ReconcileAccountArgs]) error { + account := allAccounts[job.Args.AccountID] + + account.AccountTotal -= account.RecentExpenditures + account.RecentExpenditures = 0 + + fmt.Printf("Reconciled account %d; new total: %d\n", job.Args.AccountID, account.AccountTotal) - fmt.Printf("Reconciled account %d; new total: %d\n", account.ID, account.AccountTotal) - } return nil } @@ -72,7 +75,7 @@ func Example_uniqueJob() { } workers := river.NewWorkers() - river.AddWorker(workers, &ReconcileAllAccountsWorker{}) + river.AddWorker(workers, &ReconcileAccountWorker{}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), @@ -93,14 +96,24 @@ func Example_uniqueJob() { panic(err) } - _, err = riverClient.Insert(ctx, ReconcileAllAccountsArgs{}, nil) + _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil) if err != nil { panic(err) } // Job is inserted a second time, but it doesn't matter because its unique - // args ensure that it'll only run once in a 24 hour period. - _, err = riverClient.Insert(ctx, ReconcileAllAccountsArgs{}, nil) + // args ensure that it'll only run once per account per 24 hour period. + _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil) + if err != nil { + panic(err) + } + + // Cheat a little by waiting for the first job to come back so we can + // guarantee that this example's output comes out in order. + waitForNJobs(subscribeChan, 1) + + // Because the job is unique ByArgs, another job for account 2 is allowed. + _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 2}, nil) if err != nil { panic(err) }