diff --git a/src/classes/job.ts b/src/classes/job.ts index c0119d944f..7562efed79 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -235,6 +235,7 @@ export class Job { /** * Moves a job to the failed queue. * @param err {Error} The jobs error message. + * @param token {string} Token to check job is locked by current worker * @param fetchNext {boolean} True when wanting to fetch the next job * @returns void */ @@ -252,7 +253,6 @@ export class Job { // Check if an automatic retry should be performed // let moveToFailed = false; - // FIXME why don't we moveToFailed with fetchNext = true? if (this.attemptsMade < this.opts.attempts && !this.discarded) { const opts = queue.opts as WorkerOptions; diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 305932b635..1e10b755af 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -91,12 +91,12 @@ export class Scripts { return (client).removeJob(keys.concat([queue.keys.events, jobId])); } - static async extendLock(queue: QueueBase, jobId: string, token: string) { - const client = await queue.client; - const opts: WorkerOptions = queue.opts; + static async extendLock(worker: Worker, jobId: string, token: string) { + const client = await worker.client; + const opts: WorkerOptions = worker.opts; const args = [ - queue.toKey(jobId) + ':lock', - queue.keys.stalled, + worker.toKey(jobId) + ':lock', + worker.keys.stalled, token, opts.lockDuration, jobId, @@ -362,11 +362,11 @@ export class Scripts { return (client).reprocessJob(keys.concat(args)); } - static async moveToActive(queue: Worker, token: string, jobId?: string) { - const client = await queue.client; - const opts: WorkerOptions = queue.opts; + static async moveToActive(worker: Worker, token: string, jobId?: string) { + const client = await worker.client; + const opts = worker.opts; - const queueKeys = queue.keys; + const queueKeys = worker.keys; const keys = [queueKeys.wait, queueKeys.active, queueKeys.priority]; keys[3] = queueKeys.events; diff --git a/src/classes/timer-manager.ts b/src/classes/timer-manager.ts new file mode 100644 index 0000000000..44bde84a7f --- /dev/null +++ b/src/classes/timer-manager.ts @@ -0,0 +1,51 @@ +import uuid from 'uuid'; + +/** + * Keeps track on timers created with setTimeout to help clearTimeout + * for all timers when no more delayed actions needed + */ +export class TimerManager { + private timers: any = {}; + + public setTimer(name: string, delay: number, fn: Function) { + const id = uuid.v4(); + const timer = setTimeout( + timeoutId => { + this.clearTimer(timeoutId); + try { + fn(); + } catch (err) { + console.error(err); + } + }, + delay, + id, + ); + + // XXX only the timer is used, but the + // other fields are useful for + // troubleshooting/debugging + this.timers[id] = { + name, + timer, + }; + + return id; + } + + public clearTimer(id: string) { + const timers = this.timers; + const timer = timers[id]; + if (!timer) { + return; + } + clearTimeout(timer.timer); + delete timers[id]; + } + + public clearAllTimers() { + Object.keys(this.timers).forEach(key => { + this.clearTimer(key); + }); + } +} diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 691d5b516d..edd05c99a6 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -10,6 +10,7 @@ import { RedisConnection } from './redis-connection'; import sandbox from './sandbox'; import { Scripts } from './scripts'; import uuid from 'uuid'; +import { TimerManager } from './timer-manager'; // note: sandboxed processors would also like to define concurrency per process // for better resource utilization. @@ -27,6 +28,7 @@ export class Worker extends QueueBase { private paused: Promise; private _repeat: Repeat; private childPool: ChildPool; + private timerManager: TimerManager; private blockingConnection: RedisConnection; @@ -73,9 +75,12 @@ export class Worker extends QueueBase { this.childPool = this.childPool || pool; this.processFn = sandbox(processor, this.childPool).bind(this); } + this.timerManager = new TimerManager(); /* tslint:disable: no-floating-promises */ - this.run(); // TODO catch, emit error + this.run().catch(error => { + console.error(error); + }); } get repeat() { @@ -122,12 +127,7 @@ export class Worker extends QueueBase { while (!this.closing) { if (processing.size < opts.concurrency) { const token = tokens.pop(); - processing.set( - this.getNextJob(token).catch(error => { - console.error('getNextJob failed', error); // TODO handle error properly - }), - token, - ); + processing.set(this.getNextJob(token), token); } /* @@ -144,12 +144,7 @@ export class Worker extends QueueBase { const job = await completed; if (job) { // reuse same token if next job is available to process - processing.set( - this.processJob(job, token).catch(error => { - console.error('processJob failed', error); // TODO handle error properly - }), - token, - ); + processing.set(this.processJob(job, token), token); } else { tokens.push(token); } @@ -251,7 +246,7 @@ export class Worker extends QueueBase { let lockRenewId: string; let timerStopped = false; const lockExtender = () => { - lockRenewId = this.setTimer( + lockRenewId = this.timerManager.setTimer( 'lockExtender', this.opts.lockRenewTime, async () => { @@ -260,7 +255,7 @@ export class Worker extends QueueBase { if (result && !timerStopped) { lockExtender(); } - // FIXME if result = 0, reject processFn promise to take next job? + // FIXME if result = 0 (missing lock), reject processFn promise to take next job? } catch (error) { console.error('Error extending lock ', error); // Somehow tell the worker this job should stop processing... @@ -268,9 +263,10 @@ export class Worker extends QueueBase { }, ); }; + const stopTimer = () => { timerStopped = true; - this.clearTimer(lockRenewId); + this.timerManager.clearTimer(lockRenewId); }; // end copy-paste from Bull3 @@ -282,7 +278,6 @@ export class Worker extends QueueBase { !(this.closing || this.paused), ); this.emit('completed', job, result, 'active'); - // FIXME should we call nextJobFromJobData here to emit drained event? return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null; }; @@ -297,7 +292,6 @@ export class Worker extends QueueBase { await job.moveToFailed(err, token); this.emit('failed', job, error, 'active'); - // FIXME can we also fetch next job right away as in handleCompleted? }; // TODO: how to cancel the processing? (null -> job.cancel() => throw CancelError()void) @@ -306,7 +300,7 @@ export class Worker extends QueueBase { lockExtender(); try { const result = await this.processFn(job); - return handleCompleted(result); + return await handleCompleted(result); } catch (err) { return handleFailed(err); } finally { @@ -366,7 +360,7 @@ export class Worker extends QueueBase { this.waiting && (await this.blockingConnection.disconnect()); if (this.processing) { - await Promise.all(this.processing); + await Promise.all(this.processing.keys()); } this.waiting && reconnect && (await this.blockingConnection.reconnect()); @@ -390,7 +384,7 @@ export class Worker extends QueueBase { } catch (err) { reject(err); } finally { - this.clearAllTimers(); + this.timerManager.clearAllTimers(); this.childPool && this.childPool.clean(); } this.emit('closed'); @@ -398,49 +392,4 @@ export class Worker extends QueueBase { }); } } - - // code from former TimerManager - private timers: any = {}; - - private setTimer(name: string, delay: number, fn: Function) { - const id = uuid.v4(); - const timer = setTimeout( - timeoutId => { - this.clearTimer(timeoutId); - try { - fn(); - } catch (err) { - console.error(err); - } - }, - delay, - id, - ); - - // XXX only the timer is used, but the - // other fields are useful for - // troubleshooting/debugging - this.timers[id] = { - name, - timer, - }; - - return id; - } - - private clearTimer(id: string) { - const timers = this.timers; - const timer = timers[id]; - if (!timer) { - return; - } - clearTimeout(timer.timer); - delete timers[id]; - } - - private clearAllTimers() { - Object.keys(this.timers).forEach(key => { - this.clearTimer(key); - }); - } } diff --git a/src/commands/moveToFinished-7.lua b/src/commands/moveToFinished-7.lua index 063d5aeeb3..ea52e7352e 100644 --- a/src/commands/moveToFinished-7.lua +++ b/src/commands/moveToFinished-7.lua @@ -90,7 +90,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists local lockKey = jobKey .. ':lock' -- get a lock - rcall("SET", lockKey, ARGV[10], "PX", ARGV[11]) + if ARGV[10] ~= "0" then + rcall("SET", lockKey, ARGV[10], "PX", ARGV[11]) + end rcall("ZREM", KEYS[5], jobId) -- remove from priority rcall("XADD", KEYS[6], "*", "event", "active", "jobId", jobId, diff --git a/src/test/test_delay.ts b/src/test/test_delay.ts index 825bf21768..59f48aaa75 100644 --- a/src/test/test_delay.ts +++ b/src/test/test_delay.ts @@ -32,6 +32,7 @@ describe('Delayed jobs', function() { it('should process a delayed job only after delayed time', async function() { const delay = 1000; const queueScheduler = new QueueScheduler(queueName); + await queueScheduler.waitUntilReady(); const queueEvents = new QueueEvents(queueName); await queueEvents.waitUntilReady(); @@ -41,7 +42,8 @@ describe('Delayed jobs', function() { let publishHappened = false; queueEvents.on('delayed', () => { - console.log('delayed!'), (publishHappened = true); + console.log('delayed!'); + publishHappened = true; }); const completed = new Promise((resolve, reject) => { @@ -75,28 +77,27 @@ describe('Delayed jobs', function() { it('should process delayed jobs in correct order', async function() { let order = 0; - let processor; const queueScheduler = new QueueScheduler(queueName); await queueScheduler.waitUntilReady(); const processing = new Promise((resolve, reject) => { - processor = async (job: Job) => { + const processor = async (job: Job) => { order++; try { expect(order).to.be.equal(job.data.order); if (order === 10) { - resolve(); + resolve(worker.close()); } } catch (err) { reject(err); } }; - }); - const worker = new Worker(queueName, processor); + const worker = new Worker(queueName, processor); - worker.on('failed', function(job, err) { - err.job = job; + worker.on('failed', function(job, err) { + err.job = job; + }); }); await Promise.all([ @@ -115,7 +116,6 @@ describe('Delayed jobs', function() { await processing; await queueScheduler.close(); - await worker.close(); }); /* @@ -171,15 +171,13 @@ describe('Delayed jobs', function() { const queueScheduler = new QueueScheduler(queueName); await queueScheduler.waitUntilReady(); - let processor; - const processing = new Promise((resolve, reject) => { - processor = async (job: Job) => { + const processor = async (job: Job) => { try { expect(order).to.be.equal(job.data.order); if (order === 12) { - resolve(); + resolve(worker.close()); } } catch (err) { reject(err); @@ -187,6 +185,12 @@ describe('Delayed jobs', function() { order++; }; + + const worker = new Worker(queueName, processor); + + worker.on('failed', function(job, err) { + err.job = job; + }); }); const now = Date.now(); @@ -205,17 +209,7 @@ describe('Delayed jobs', function() { ); } await Promise.all(promises); - - const worker = new Worker(queueName, processor); - - worker.on('failed', function(job, err) { - err.job = job; - }); - await processing; - await queueScheduler.close(); - - await worker.close(); }); }); diff --git a/src/test/test_job.ts b/src/test/test_job.ts index 47065d01ad..0016801bc2 100644 --- a/src/test/test_job.ts +++ b/src/test/test_job.ts @@ -126,11 +126,7 @@ describe('Job', function() { const job2 = await Job.create(queue, 'test', { baz: 'qux' }); const isCompleted = await job2.isCompleted(); expect(isCompleted).to.be.equal(false); - const job1Id = await job2.moveToCompleted( - 'succeeded', - 'test-token', - true, - ); + const job1Id = await job2.moveToCompleted('succeeded', '0', true); const isJob2Completed = await job2.isCompleted(); expect(isJob2Completed).to.be.equal(true); expect(job2.returnvalue).to.be.equal('succeeded'); @@ -143,7 +139,7 @@ describe('Job', function() { const job = await Job.create(queue, 'test', { foo: 'bar' }); const isFailed = await job.isFailed(); expect(isFailed).to.be.equal(false); - await job.moveToFailed(new Error('test error'), 'test-token', true); + await job.moveToFailed(new Error('test error'), '0', true); const isFailed2 = await job.isFailed(); expect(isFailed2).to.be.equal(true); expect(job.stacktrace).not.be.equal(null); @@ -167,7 +163,7 @@ describe('Job', function() { queueEvents.on('waiting', resolve); }); - await job.moveToFailed(new Error('test error'), 'test-token', true); + await job.moveToFailed(new Error('test error'), '0', true); await waiting; @@ -190,7 +186,7 @@ describe('Job', function() { ); const isFailed = await job.isFailed(); expect(isFailed).to.be.equal(false); - await job.moveToFailed(new Error('test error'), 'test-token', true); + await job.moveToFailed(new Error('test error'), '0', true); const isFailed2 = await job.isFailed(); expect(isFailed2).to.be.equal(true); expect(job.stacktrace).not.be.equal(null); @@ -206,7 +202,7 @@ describe('Job', function() { ); const isFailed = await job.isFailed(); expect(isFailed).to.be.equal(false); - await job.moveToFailed(new Error('test error'), 'test-token', true); + await job.moveToFailed(new Error('test error'), '0', true); const isFailed2 = await job.isFailed(); expect(isFailed2).to.be.equal(false); expect(job.stacktrace).not.be.equal(null); @@ -225,7 +221,7 @@ describe('Job', function() { ); const isFailed = await job.isFailed(); expect(isFailed).to.be.equal(false); - await job.moveToFailed(new Error('test error'), 'test-token', true); + await job.moveToFailed(new Error('test error'), '0', true); const isFailed2 = await job.isFailed(); expect(isFailed2).to.be.equal(true); expect(job.stacktrace).not.be.equal(null); diff --git a/src/test/test_stalled_jobs.ts b/src/test/test_stalled_jobs.ts index 3831c20c7c..0e3e710b60 100644 --- a/src/test/test_stalled_jobs.ts +++ b/src/test/test_stalled_jobs.ts @@ -40,9 +40,7 @@ describe('stalled jobs', function() { return delay(10000); }, { - settings: { - stalledInterval: 1000, - }, + lockDuration: 1000, concurrency, }, ); @@ -66,6 +64,7 @@ describe('stalled jobs', function() { stalledInterval: 100, }); await queueScheduler.waitUntilReady(); + await worker.close(true); const allStalled = new Promise(resolve => { queueScheduler.on('stalled', after(concurrency, resolve)); @@ -78,8 +77,6 @@ describe('stalled jobs', function() { await allStalled; await allStalledGlobalEvent; - await worker.close(true); - const worker2 = new Worker(queueName, async job => {}, { concurrency }); const allCompleted = new Promise(resolve => { @@ -91,6 +88,7 @@ describe('stalled jobs', function() { await queueEvents.close(); await queueScheduler.close(); + await worker2.close(); }); @@ -105,9 +103,7 @@ describe('stalled jobs', function() { return delay(10000); }, { - settings: { - stalledInterval: 100, - }, + lockDuration: 1000, concurrency, }, ); @@ -133,14 +129,14 @@ describe('stalled jobs', function() { }); await queueScheduler.waitUntilReady(); + await worker.close(true); + const allFailed = new Promise(resolve => { queueScheduler.on('failed', after(concurrency, resolve)); }); await allFailed; - await worker.close(true); - await queueScheduler.close(); }); });