Skip to content

Commit

Permalink
fix: bind correct context to process
Browse files Browse the repository at this point in the history
  • Loading branch information
simllll committed May 23, 2022
1 parent 8e950b2 commit cf70739
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
22 changes: 19 additions & 3 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class JobProcessor {
// Make sure an interval has actually been set
// Prevents race condition with 'Agenda.stop' and already scheduled run
if (!this.isRunning) {
log.extend('process')('JobProcessor got stopped already, returning', this);
log.extend('process')('JobProcessor got stopped already, returning');
return;
}

Expand Down Expand Up @@ -482,8 +482,7 @@ export class JobProcessor {
// const a = new Error();
// console.log('STACK', a.stack);
log.extend('runOrRetry')(
'JobProcessor got stopped already while calling runOrRetry, returning!',
this
'JobProcessor got stopped already while calling runOrRetry, returning!'
);
return;
}
Expand All @@ -510,11 +509,22 @@ export class JobProcessor {
setTimeout(async () => {
// when job is not running anymore, just finish
if (!jobIsRunning) {
log.extend('runOrRetry')(
'[%s:%s] checkIfJobIsStillAlive detected job is not running anymore. stopping check.',
job.attrs.name,
job.attrs._id
);
resolve();
return;
}

if (await job.isExpired()) {
log.extend('runOrRetry')(
'[%s:%s] checkIfJobIsStillAlive detected an expired job, killing it.',
job.attrs.name,
job.attrs._id
);

reject(
new Error(
`execution of '${job.attrs.name}' canceled, execution took more than ${
Expand All @@ -526,6 +536,12 @@ export class JobProcessor {
}

if (!job.attrs.lockedAt) {
log.extend('runOrRetry')(
'[%s:%s] checkIfJobIsStillAlive detected a job without a lockedAt value, killing it.',
job.attrs.name,
job.attrs._id
);

reject(
new Error(
`execution of '${job.attrs.name}' canceled, no lockedAt date found. Ensure to call touch() for long running jobs to keep them alive.`
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ export class Agenda extends EventEmitter {
this.attrs.processEvery
);

this.on('processJob', this.jobProcessor.process);
this.on('processJob', this.jobProcessor.process.bind(this.jobProcessor));
}

/**
Expand All @@ -562,7 +562,7 @@ export class Agenda extends EventEmitter {
await this.db.unlockJobs(jobIds);
}

this.off('processJob', this.jobProcessor.process);
this.off('processJob', this.jobProcessor.process.bind(this.jobProcessor));

this.jobProcessor = undefined;
}
Expand Down
3 changes: 2 additions & 1 deletion test/agenda.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ describe('Agenda', () => {
}); */

let j1processes = 0;

globalAgenda.define('j1', (_job, done) => {
j1processes += 1;
done();
Expand All @@ -766,7 +767,7 @@ describe('Agenda', () => {
await globalAgenda.every('10 seconds', 'j2');
await globalAgenda.every('15 seconds', 'j3');

await delay(5001);
await delay(3001);

process.removeListener('unhandledRejection', rejectionsHandler);

Expand Down

0 comments on commit cf70739

Please sign in to comment.