Skip to content

Commit

Permalink
issue-654: support time mocking for jobs with stopTime and change the…
Browse files Browse the repository at this point in the history
… timing of calling stopTimeReached.
  • Loading branch information
Higan committed Jul 19, 2024
1 parent 05c6315 commit 9fcc399
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
9 changes: 4 additions & 5 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,6 @@ func (e *executor) start() {
// safety check as it'd be strange bug if this occurred
return
}

if j.stopTimeReached() {
return
}

if j.singletonMode {
// for singleton mode, get the existing runner for the job
// or spin up a new one
Expand Down Expand Up @@ -363,6 +358,10 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
default:
}

if j.stopTimeReached(e.clock.Now()) {
return
}

if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutForRescheduling(&jIn)
Expand Down
23 changes: 12 additions & 11 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ func (j *internalJob) stop() {
j.cancel()
}

func (j *internalJob) stopTimeReached() bool {
func (j *internalJob) stopTimeReached(now time.Time) bool {
if j.stopTime.IsZero() {
return false
}
return j.stopTime.Before(time.Now())
return j.stopTime.Before(now)
}

// task stores the function and parameters
Expand Down Expand Up @@ -610,22 +610,23 @@ func WithStartDateTime(start time.Time) StartAtOption {
}
}

// WithStopAt sets the option for stopping the job at
// a specific datetime.
// WithStopAt sets the option for stopping the job from running
// after the specified time.
func WithStopAt(option StopAtOption) JobOption {
return func(j *internalJob) error {
return option(j)
return func(j *internalJob, now time.Time) error {
return option(j, now)
}
}

// StopAtOption defines options for stopping the job
type StopAtOption func(*internalJob) error
type StopAtOption func(*internalJob, time.Time) error

// WithStopDateTime sets the final data & time after which the job should stop
// This datetime must be in the future and should be after the startTime (if specified)
// WithStopDateTime sets the final date & time after which the job should stop.
// This must be in the future and should be after the startTime (if specified).
// The job's final run may be at the stop time, but not after.
func WithStopDateTime(end time.Time) StopAtOption {
return func(j *internalJob) error {
if end.IsZero() || end.Before(time.Now()) {
return func(j *internalJob, now time.Time) error {
if end.IsZero() || end.Before(now) {
return ErrWithStopDateTimePast
}
if end.Before(j.startTime) {
Expand Down
8 changes: 4 additions & 4 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
return
}

if j.stopTimeReached(s.now()) {
return
}

scheduleFrom := j.lastRun
if len(j.nextScheduled) > 0 {
// always grab the last element in the slice as that is the furthest
Expand Down Expand Up @@ -491,10 +495,6 @@ func (s *scheduler) selectStart() {
next = j.next(s.now())
}

if j.stopTimeReached() {
continue
}

jobID := id
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
select {
Expand Down

0 comments on commit 9fcc399

Please sign in to comment.