Skip to content

Commit

Permalink
Rename RunImmediately to RunOnStart
Browse files Browse the repository at this point in the history
This is another bikeshed issue, but I was thinking about the naming of
`RunImmediately` today and I think it might be too ambiguous. It sounds
a little too much like it could mean "run immediately when scheduled".

I propose we rename it to `RunOnStart` to make it clear that it's
expected to run on the scheduler's start (this is also said in the
property's docblock). `RunOnStart` is also a little more succinct which
is always nice.

The repository is technically public now and this is technically a
breaking change, but I think it's fair for us to make a few API tweaks
in the early days before we've done anything to send the project around
(and maybe even a little after).

This property in mentioned in our public docs already [1]. If this gets
merged I'll follow up by updating it there as well.

[1] https://riverqueue.com/docs/periodic-jobs
  • Loading branch information
brandur committed Nov 11, 2023
1 parent 250ce47 commit f62853c
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 15 deletions.
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
ConstructorFunc: func() (*dbadapter.JobInsertParams, error) {
return insertParamsFromArgsAndOptions(periodicJob.constructorFunc())
},
RunImmediately: opts.RunImmediately,
ScheduleFunc: periodicJob.scheduleFunc.Next,
RunOnStart: opts.RunOnStart,
ScheduleFunc: periodicJob.scheduleFunc.Next,
})
}

Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ func Test_Client_Maintenance(t *testing.T) {
config.PeriodicJobs = []*PeriodicJob{
NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) {
return periodicJobArgs{}, nil
}, &PeriodicJobOpts{RunImmediately: true}),
}, &PeriodicJobOpts{RunOnStart: true}),
}

client := runNewTestClient(ctx, t, config)
Expand Down Expand Up @@ -1070,7 +1070,7 @@ func Test_Client_Maintenance(t *testing.T) {
svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
svc.TestSignals.EnteredLoop.WaitOrTimeout()

// No jobs yet because the RunImmediately option was not specified.
// No jobs yet because the RunOnStart option was not specified.
jobs, err := queries.JobGetByKind(ctx, client.driver.GetDBPool(), (periodicJobArgs{}).Kind())
require.NoError(t, err)
require.Len(t, jobs, 0)
Expand Down
2 changes: 1 addition & 1 deletion example_cron_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Example_cronJob() {
func() (river.JobArgs, *river.InsertOpts) {
return CronJobArgs{}, nil
},
&river.PeriodicJobOpts{RunImmediately: true},
&river.PeriodicJobOpts{RunOnStart: true},
),
},
Queues: map[string]river.QueueConfig{
Expand Down
2 changes: 1 addition & 1 deletion example_periodic_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Example_periodicJob() {
func() (river.JobArgs, *river.InsertOpts) {
return PeriodicJobArgs{}, nil
},
&river.PeriodicJobOpts{RunImmediately: true},
&river.PeriodicJobOpts{RunOnStart: true},
),
},
Queues: map[string]river.QueueConfig{
Expand Down
4 changes: 2 additions & 2 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (ts *PeriodicJobEnqueuerTestSignals) Init() {
// subpackage.
type PeriodicJob struct {
ConstructorFunc func() (*dbadapter.JobInsertParams, error)
RunImmediately bool
RunOnStart bool
ScheduleFunc func(time.Time) time.Time

nextRunAt time.Time // set on service start
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {

periodicJob.nextRunAt = periodicJob.ScheduleFunc(now)

if periodicJob.RunImmediately {
if periodicJob.RunOnStart {
if insertParams, ok := s.insertParamsFromConstructor(ctx, periodicJob.ConstructorFunc); ok {
insertParamsMany = append(insertParamsMany, insertParams)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/maintenance/periodic_job_enqueuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
requireNJobs(t, bundle.dbPool, "periodic_job_1500ms", 1)
})

t.Run("RunImmediately", func(t *testing.T) {
t.Run("RunOnStart", func(t *testing.T) {
t.Parallel()

svc, bundle := setup(t)

svc.periodicJobs = []*PeriodicJob{
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s"), RunImmediately: true},
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s"), RunOnStart: true},
}

start := time.Now()
Expand All @@ -138,7 +138,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) {

svc.periodicJobs = []*PeriodicJob{
// skip this insert when it returns nil:
{ScheduleFunc: periodicIntervalSchedule(time.Second), ConstructorFunc: func() (*dbadapter.JobInsertParams, error) { return nil, ErrNoJobToInsert }, RunImmediately: true},
{ScheduleFunc: periodicIntervalSchedule(time.Second), ConstructorFunc: func() (*dbadapter.JobInsertParams, error) { return nil, ErrNoJobToInsert }, RunOnStart: true},
}

require.NoError(t, svc.Start(ctx))
Expand Down
8 changes: 4 additions & 4 deletions periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ type PeriodicJob struct {

// PeriodicJobOpts are options for a periodic job.
type PeriodicJobOpts struct {
// RunImmediately can be used to indicate that a periodic job should insert
// an initial job as a new scheduler is started. This can be used as a hedge
// RunOnStart can be used to indicate that a periodic job should insert an
// initial job as a new scheduler is started. This can be used as a hedge
// for jobs with longer scheduled durations that may not get to expiry
// before a new scheduler is elected.
RunImmediately bool
RunOnStart bool
}

// NewPeriodicJob returns a new PeriodicJob given a schedule and a constructor
Expand All @@ -53,7 +53,7 @@ type PeriodicJobOpts struct {
// are scheduled each time a job's target run time is reached and a new job
// inserted. However, each scheduler only retains in-memory state, so anytime a
// process quits or a new leader is elected, the whole process starts over
// without regard for the state of the last scheduler. The RunImmediately option
// without regard for the state of the last scheduler. The RunOnStart option
// can be used as a hedge to make sure that jobs with long run durations are
// guaranteed to occasionally run.
func NewPeriodicJob(scheduleFunc PeriodicSchedule, constructorFunc PeriodicJobConstructor, opts *PeriodicJobOpts) *PeriodicJob {
Expand Down

0 comments on commit f62853c

Please sign in to comment.