Skip to content

Commit

Permalink
fix(jobprocessor): check for object.fromEntries for node 10 support (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
simllll committed Oct 20, 2020
1 parent b13d054 commit b8cc61f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
43 changes: 22 additions & 21 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ export class JobProcessor {
// eslint-disable-next-line @typescript-eslint/no-var-requires,global-require
const { version } = require('../package.json');

const jobStatus =
(typeof Object.fromEntries === 'function' &&
Object.fromEntries(
Object.keys(this.jobStatus).map(job => [
job,
{
...this.jobStatus[job]!,
config: this.agenda.definitions[job]
}
])
)) ||
undefined;

if (typeof Object.fromEntries !== 'function') {
console.warn('job status not available due to too old node version');
}

return {
version,
queueName: this.agenda.attrs.name,
Expand All @@ -34,15 +51,7 @@ export class JobProcessor {
maxConcurrency: this.maxConcurrency,
processEvery: this.processEvery
},
jobStatus: Object.fromEntries(
Object.keys(this.jobStatus).map(job => [
job,
{
...this.jobStatus[job]!,
config: this.agenda.definitions[job]
}
])
),
jobStatus,
queuedJobs: this.jobQueue.length,
runningJobs: !fullDetails ? this.runningJobs.length : this.runningJobs,
lockedJobs: !fullDetails ? this.lockedJobs.length : this.lockedJobs,
Expand Down Expand Up @@ -384,17 +393,10 @@ export class JobProcessor {
this.jobProcessing();
}, runIn);
}
// console.log('this.localQueueProcessing', this.localQueueProcessing);
if (this.localQueueProcessing < this.maxConcurrency && jobEnqueued) {
// additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
setImmediate(() => this.jobProcessing());
} /* else {
console.log(
'NOT CALLING JOB PROCESSING AGAIN DUE TO',
this.localQueueProcessing,
this.maxConcurrency
);
} */
}
} finally {
this.localQueueProcessing -= 1;
}
Expand Down Expand Up @@ -424,9 +426,9 @@ export class JobProcessor {
(!jobDefinition.concurrency || !status || status.running < jobDefinition.concurrency) &&
this.runningJobs.length < this.maxConcurrency
) {
// NOTE: Shouldn't we update the 'lockedAt' value in MongoDB so it can be picked up on restart?
// -> not needed, as the timeout has been reached, and it will be picked up anyways again
if (job.isDead()) {
// not needed to update lockedAt in databsase, as the timeout has been reached,
// and it will be picked up anyways again
log.extend('runOrRetry')(
'[%s:%s] job lock has expired, freeing it up',
job.attrs.name,
Expand All @@ -445,8 +447,7 @@ export class JobProcessor {

this.lockedJobs.splice(lockedJobIndex, 1);
this.updateStatus(job.attrs.name, 'locked', -1);
this.jobProcessing();
return false;
return true;
}

const runJob = async () => {
Expand Down
2 changes: 1 addition & 1 deletion src/types/AgendaStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export interface IAgendaStatus {
maxConcurrency: number;
processEvery: string | number;
};
jobStatus: { [name: string]: { running: number; locked: number } | undefined };
jobStatus?: { [name: string]: { running: number; locked: number } };
queuedJobs: number;
runningJobs: number | Job[];
lockedJobs: number | Job[];
Expand Down
9 changes: 7 additions & 2 deletions test/jobprocessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ describe('JobProcessor', () => {

await agenda.start();

let runningJobs = 0;
const allJobsStarted = new Promise(async resolve => {
let runningJobs = 0;
do {
runningJobs = (await agenda.getRunningStats()).runningJobs as number;
await new Promise(wait => setTimeout(wait, 50));
Expand All @@ -170,7 +170,12 @@ describe('JobProcessor', () => {
});

expect(
await Promise.race([allJobsStarted, new Promise(resolve => setTimeout(resolve, 1500))])
await Promise.race([
allJobsStarted,
new Promise(resolve =>
setTimeout(() => resolve(`not all jobs started, currently running: ${runningJobs}`), 1500)
)
])
).to.equal('all started');
});
});

0 comments on commit b8cc61f

Please sign in to comment.