Skip to content

Commit

Permalink
fix(jobprocessor): ensure returnNextConcurrencyFreeJob is not returni…
Browse files Browse the repository at this point in the history
…ng same job each time
  • Loading branch information
simllll committed Oct 20, 2020
1 parent a3d4203 commit 11d6606
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 110 deletions.
18 changes: 11 additions & 7 deletions src/JobProcessingQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,16 @@ export class JobProcessingQueue {
* @param {Object} jobStatus current status of jobs
* @returns {Job} Next Job to be processed
*/
returnNextConcurrencyFreeJob(jobStatus: {
[jobName: string]:
| {
running: number;
}
| undefined;
}): (Job & { attrs: IJobParameters & { nextRunAt: Date } }) | undefined {
returnNextConcurrencyFreeJob(
jobStatus: {
[jobName: string]:
| {
running: number;
}
| undefined;
},
handledJobs: IJobParameters['_id'][]
): (Job & { attrs: IJobParameters & { nextRunAt: Date } }) | undefined {
const next = ((Object.keys(this._queue) as unknown) as number[]).reverse().find(i => {
const def = this.agenda.definitions[this._queue[i].attrs.name];
const status = jobStatus[this._queue[i].attrs.name];
Expand All @@ -109,6 +112,7 @@ export class JobProcessingQueue {
if (
def &&
this._queue[i].attrs.nextRunAt &&
!handledJobs.includes(this._queue[i].attrs._id) &&
(!status || !def.concurrency || status.running < def.concurrency)
) {
return true;
Expand Down
203 changes: 100 additions & 103 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { IAgendaStatus } from './types/AgendaStatus';
import { IJobDefinition } from './types/JobDefinition';
import { JobProcessingQueue } from './JobProcessingQueue';
import type { Agenda } from './index';
import type { IJobParameters } from './types/JobParameters';

const log = debug('agenda:jobProcessor');

Expand Down Expand Up @@ -333,7 +334,6 @@ export class JobProcessor {

this.enqueueJob(job);
await this.jobQueueFilling(name);
// this.jobProcessing();
} else {
log.extend('jobQueueFilling')('Cannot lock job [%s]', name);
}
Expand All @@ -344,22 +344,22 @@ export class JobProcessor {

/**
* Internal method that processes any jobs in the local queue (array)
* handledJobs keeps list of already processed jobs
* @returns {undefined}
*/
private async jobProcessing() {
private async jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
// Ensure we have jobs
if (this.jobQueue.length === 0) {
return;
}

this.localQueueProcessing += 1;

let jobEnqueued = false;
try {
const now = new Date();

// Check if there is any job that is not blocked by concurrency
const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus);
const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus, handledJobs);

if (!job) {
log.extend('jobProcessing')('[%s:%s] there is no job to process');
Expand All @@ -380,7 +380,7 @@ export class JobProcessor {
job.attrs.name,
job.attrs._id
);
jobEnqueued = await this.runOrRetry(job);
this.runOrRetry(job);
} else {
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
log.extend('jobProcessing')(
Expand All @@ -392,11 +392,12 @@ export class JobProcessor {
setTimeout(() => {
this.jobProcessing();
}, runIn);
jobEnqueued = true;
}
if (this.localQueueProcessing < this.maxConcurrency && jobEnqueued) {

if (this.localQueueProcessing < this.maxConcurrency) {
// additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
setImmediate(() => this.jobProcessing());
handledJobs.push(job.attrs._id);
setImmediate(() => this.jobProcessing(handledJobs));
}
} finally {
this.localQueueProcessing -= 1;
Expand All @@ -407,15 +408,15 @@ export class JobProcessor {
* Internal method that tries to run a job and if it fails, retries again!
* @returns {boolean} processed a job or not
*/
private async runOrRetry(job: Job): Promise<boolean> {
private async runOrRetry(job: Job): Promise<void> {
if (!this.isRunning) {
// const a = new Error();
// console.log('STACK', a.stack);
log.extend('runOrRetry')(
'JobProcessor got stopped already while calling runOrRetry, returning!',
this
);
return false;
return;
}

this.jobQueue.remove(job);
Expand Down Expand Up @@ -448,109 +449,106 @@ export class JobProcessor {

this.lockedJobs.splice(lockedJobIndex, 1);
this.updateStatus(job.attrs.name, 'locked', -1);
return true;
return;
}

const runJob = async () => {
// Add to local "running" queue
this.runningJobs.push(job);
this.updateStatus(job.attrs.name, 'running', 1);

try {
log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id);

// check if the job is still alive
const checkIfJobIsStillAlive = () => {
// check every "this.agenda.definitions[job.attrs.name].lockLifetime / 2"" (or at mininum every processEvery)
return new Promise((resolve, reject) =>
setTimeout(() => {
// when job is not running anymore, just finish
if (!job.isRunning()) {
resolve();
return;
}

if (job.isDead()) {
reject(
new Error(
`execution of '${job.attrs.name}' canceled, execution took more than ${
this.agenda.definitions[job.attrs.name].lockLifetime
}ms. Call touch() for long running jobs to keep them alive.`
)
);
return;
}

resolve(checkIfJobIsStillAlive());
}, Math.max(this.processEvery, this.agenda.definitions[job.attrs.name].lockLifetime / 2))
);
};
// Add to local "running" queue
this.runningJobs.push(job);
this.updateStatus(job.attrs.name, 'running', 1);

try {
log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id);

// check if the job is still alive
const checkIfJobIsStillAlive = () => {
// check every "this.agenda.definitions[job.attrs.name].lockLifetime / 2"" (or at mininum every processEvery)
return new Promise((resolve, reject) =>
setTimeout(() => {
// when job is not running anymore, just finish
if (!job.isRunning()) {
resolve();
return;
}

if (job.isDead()) {
reject(
new Error(
`execution of '${job.attrs.name}' canceled, execution took more than ${
this.agenda.definitions[job.attrs.name].lockLifetime
}ms. Call touch() for long running jobs to keep them alive.`
)
);
return;
}

resolve(checkIfJobIsStillAlive());
}, Math.max(this.processEvery, this.agenda.definitions[job.attrs.name].lockLifetime / 2))
);
};

// CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
await Promise.race([job.run(), checkIfJobIsStillAlive()]);

// CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
await Promise.race([job.run(), checkIfJobIsStillAlive()]);
log.extend('runOrRetry')(
'[%s:%s] processing job successfull',
job.attrs.name,
job.attrs._id
);

// Job isn't in running jobs so throw an error
if (!this.runningJobs.includes(job)) {
log.extend('runOrRetry')(
'[%s:%s] processing job successfull',
job.attrs.name,
'[%s] callback was called, job must have been marked as complete already',
job.attrs._id
);
throw new Error(
`callback already called - job ${job.attrs.name} already marked complete`
);
}
} catch (err) {
job.canceled = err;
log.extend('runOrRetry')(
'[%s:%s] processing job failed',
job.attrs.name,
job.attrs._id,
err
);
job.agenda.emit('error', err);
} finally {
// Remove the job from the running queue
let runningJobIndex = this.runningJobs.indexOf(job);
if (runningJobIndex === -1) {
// lookup by id
runningJobIndex = this.runningJobs.findIndex(
j => j.attrs._id?.toString() === job.attrs._id?.toString()
);
}
if (runningJobIndex === -1) {
// eslint-disable-next-line no-unsafe-finally
throw new Error(`cannot find job ${job.attrs._id} in running jobs queue?`);
}
this.runningJobs.splice(runningJobIndex, 1);
this.updateStatus(job.attrs.name, 'running', -1);

// Job isn't in running jobs so throw an error
if (!this.runningJobs.includes(job)) {
log.extend('runOrRetry')(
'[%s] callback was called, job must have been marked as complete already',
job.attrs._id
);
throw new Error(
`callback already called - job ${job.attrs.name} already marked complete`
);
}
} catch (err) {
job.canceled = err;
log.extend('runOrRetry')(
'[%s:%s] processing job failed',
job.attrs.name,
job.attrs._id,
err
// Remove the job from the locked queue
let lockedJobIndex = this.lockedJobs.indexOf(job);
if (lockedJobIndex === -1) {
// lookup by id
lockedJobIndex = this.lockedJobs.findIndex(
j => j.attrs._id?.toString() === job.attrs._id?.toString()
);
job.agenda.emit('error', err);
} finally {
// Remove the job from the running queue
let runningJobIndex = this.runningJobs.indexOf(job);
if (runningJobIndex === -1) {
// lookup by id
runningJobIndex = this.runningJobs.findIndex(
j => j.attrs._id?.toString() === job.attrs._id?.toString()
);
}
if (runningJobIndex === -1) {
// eslint-disable-next-line no-unsafe-finally
throw new Error(`cannot find job ${job.attrs._id} in running jobs queue?`);
}
this.runningJobs.splice(runningJobIndex, 1);
this.updateStatus(job.attrs.name, 'running', -1);

// Remove the job from the locked queue
let lockedJobIndex = this.lockedJobs.indexOf(job);
if (lockedJobIndex === -1) {
// lookup by id
lockedJobIndex = this.lockedJobs.findIndex(
j => j.attrs._id?.toString() === job.attrs._id?.toString()
);
}
if (lockedJobIndex === -1) {
// eslint-disable-next-line no-unsafe-finally
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
}
this.lockedJobs.splice(lockedJobIndex, 1);
this.updateStatus(job.attrs.name, 'locked', -1);
}
if (lockedJobIndex === -1) {
// eslint-disable-next-line no-unsafe-finally
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
}
this.lockedJobs.splice(lockedJobIndex, 1);
this.updateStatus(job.attrs.name, 'locked', -1);
}

// Re-process jobs now that one has finished
setImmediate(() => this.jobProcessing());
};
runJob();
return true;
// Re-process jobs now that one has finished
setImmediate(() => this.jobProcessing());
return;
}

// Run the job later
Expand All @@ -560,7 +558,6 @@ export class JobProcessor {
job.attrs._id
);
this.enqueueJob(job);
return false;
}

private updateStatus(name: string, key: 'locked' | 'running', number: -1 | 1) {
Expand Down

0 comments on commit 11d6606

Please sign in to comment.