From cf707396707b36d293eb99a79fbc618b75a54900 Mon Sep 17 00:00:00 2001 From: simon Date: Mon, 23 May 2022 14:17:36 +0200 Subject: [PATCH] fix: bind correct context to process --- src/JobProcessor.ts | 22 +++++++++++++++++++--- src/index.ts | 4 ++-- test/agenda.test.ts | 3 ++- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/JobProcessor.ts b/src/JobProcessor.ts index a495bfb..24f11d3 100644 --- a/src/JobProcessor.ts +++ b/src/JobProcessor.ts @@ -121,7 +121,7 @@ export class JobProcessor { // Make sure an interval has actually been set // Prevents race condition with 'Agenda.stop' and already scheduled run if (!this.isRunning) { - log.extend('process')('JobProcessor got stopped already, returning', this); + log.extend('process')('JobProcessor got stopped already, returning'); return; } @@ -482,8 +482,7 @@ export class JobProcessor { // const a = new Error(); // console.log('STACK', a.stack); log.extend('runOrRetry')( - 'JobProcessor got stopped already while calling runOrRetry, returning!', - this + 'JobProcessor got stopped already while calling runOrRetry, returning!' ); return; } @@ -510,11 +509,22 @@ export class JobProcessor { setTimeout(async () => { // when job is not running anymore, just finish if (!jobIsRunning) { + log.extend('runOrRetry')( + '[%s:%s] checkIfJobIsStillAlive detected job is not running anymore. stopping check.', + job.attrs.name, + job.attrs._id + ); resolve(); return; } if (await job.isExpired()) { + log.extend('runOrRetry')( + '[%s:%s] checkIfJobIsStillAlive detected an expired job, killing it.', + job.attrs.name, + job.attrs._id + ); + reject( new Error( `execution of '${job.attrs.name}' canceled, execution took more than ${ @@ -526,6 +536,12 @@ export class JobProcessor { } if (!job.attrs.lockedAt) { + log.extend('runOrRetry')( + '[%s:%s] checkIfJobIsStillAlive detected a job without a lockedAt value, killing it.', + job.attrs.name, + job.attrs._id + ); + reject( new Error( `execution of '${job.attrs.name}' canceled, no lockedAt date found. Ensure to call touch() for long running jobs to keep them alive.` diff --git a/src/index.ts b/src/index.ts index a99fc57..b3b210c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -538,7 +538,7 @@ export class Agenda extends EventEmitter { this.attrs.processEvery ); - this.on('processJob', this.jobProcessor.process); + this.on('processJob', this.jobProcessor.process.bind(this.jobProcessor)); } /** @@ -562,7 +562,7 @@ export class Agenda extends EventEmitter { await this.db.unlockJobs(jobIds); } - this.off('processJob', this.jobProcessor.process); + this.off('processJob', this.jobProcessor.process.bind(this.jobProcessor)); this.jobProcessor = undefined; } diff --git a/test/agenda.test.ts b/test/agenda.test.ts index 577cdb1..60f088c 100644 --- a/test/agenda.test.ts +++ b/test/agenda.test.ts @@ -744,6 +744,7 @@ describe('Agenda', () => { }); */ let j1processes = 0; + globalAgenda.define('j1', (_job, done) => { j1processes += 1; done(); @@ -766,7 +767,7 @@ describe('Agenda', () => { await globalAgenda.every('10 seconds', 'j2'); await globalAgenda.every('15 seconds', 'j3'); - await delay(5001); + await delay(3001); process.removeListener('unhandledRejection', rejectionsHandler);