Skip to content

Commit

Permalink
fix(jobprocessor): ensure set timeout is only called once for each jo…
Browse files Browse the repository at this point in the history
…b in the queue

as discussed in agenda/agenda#1146
  • Loading branch information
simllll committed Nov 19, 2020
1 parent 8aac567 commit 1590224
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
8 changes: 6 additions & 2 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ export class Job<DATA = unknown | void> {
*/
canceled: Error | undefined;

/** internal variable to ensure a job does not set unlimited numbers of setTimeouts if the job is not processed
* immediately */
gotTimerToExecute: boolean;

/**
* creates a new job object
* @param agenda
Expand All @@ -33,7 +37,7 @@ export class Job<DATA = unknown | void> {
name: string;
type: 'normal' | 'single';
},
byJobProcessor?
byJobProcessor?: boolean
);
constructor(
agenda: Agenda,
Expand All @@ -42,7 +46,7 @@ export class Job<DATA = unknown | void> {
type: 'normal' | 'single';
data: DATA;
},
byJobProcessor?
byJobProcessor?: boolean
);
constructor(
readonly agenda: Agenda,
Expand Down
1 change: 1 addition & 0 deletions src/JobProcessingQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export class JobProcessingQueue {
(!status || !def.concurrency || status.running < def.concurrency)
) {
if (!this._queue[i].attrs.nextRunAt) {
// eslint-disable-next-line no-console
console.log('this._queue[i]', this._queue[i].attrs);
throw new Error('no nextRunAt date');
}
Expand Down
21 changes: 13 additions & 8 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ export class JobProcessor {
'[%s:%s] there is a job to process (priority = %d)',
job.attrs.name,
job.attrs._id,
job.attrs.priority
job.attrs.priority,
job.gotTimerToExecute
);

this.jobQueue.remove(job);
Expand Down Expand Up @@ -430,13 +431,17 @@ export class JobProcessor {
);
// re add to queue (puts it at the right position in the queue)
this.jobQueue.insert(job);
setTimeout(
() => {
this.jobProcessing();
},
runIn > MAX_SAFE_32BIT_INTEGER ? MAX_SAFE_32BIT_INTEGER : runIn
); // check if runIn is higher than unsined 32 bit int, if so, use this time to recheck,
// because setTimeout will run in an overflow otherwise and reprocesses immediately
// ensure every job gets a timer to run at the near future time (but also ensure this time is set only once)
if (!job.gotTimerToExecute) {
job.gotTimerToExecute = true;
setTimeout(
() => {
this.jobProcessing();
},
runIn > MAX_SAFE_32BIT_INTEGER ? MAX_SAFE_32BIT_INTEGER : runIn
); // check if runIn is higher than unsined 32 bit int, if so, use this time to recheck,
// because setTimeout will run in an overflow otherwise and reprocesses immediately
}
}
}

Expand Down

0 comments on commit 1590224

Please sign in to comment.