From 69bd8d1f1baf45bd64cb23eec4913088407da110 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sun, 1 Sep 2019 10:35:41 +0200 Subject: [PATCH] chore: merge upstream --- package.json | 2 +- src/classes/queue-scheduler.ts | 32 +++++++--- src/interfaces/advanced-opts.ts | 4 -- src/test/test_worker.ts | 106 +++++++++++++++++++++----------- 4 files changed, 94 insertions(+), 50 deletions(-) diff --git a/package.json b/package.json index b3e0833065..577100ea6b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bullmq", - "version": "0.0.0-development", + "version": "4.0.0-beta.0", "description": "Queue for messages and jobs based on Redis", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/classes/queue-scheduler.ts b/src/classes/queue-scheduler.ts index 29066f52d7..1ebd71963b 100644 --- a/src/classes/queue-scheduler.ts +++ b/src/classes/queue-scheduler.ts @@ -7,17 +7,17 @@ const MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed /** * This class is just used for some automatic bookkeeping of the queue, - * such as updating the delay set as well as moving stuck jobs back + * such as updating the delay set as well as moving stalled jobs back * to the waiting list. * - * Jobs are checked for stuckness once every "visibility window" seconds. - * Jobs are then marked as candidates for being stuck, in the next check, - * the candidates are marked as stuck and moved to wait. + * Jobs are checked for stallness once every "visibility window" seconds. + * Jobs are then marked as candidates for being stalled, in the next check, + * the candidates are marked as stalled and moved to wait. * Workers need to clean the candidate list with the jobs that they are working - * on, failing to update the list results in the job ending being stuck. + * on, failing to update the list results in the job ending being stalled. * * This class requires a dedicated redis connection, and at least one is needed - * to be running at a given time, otherwise delays, stuck jobs, retries, repeatable + * to be running at a given time, otherwise delays, stalled jobs, retries, repeatable * jobs, etc, will not work correctly or at all. * */ @@ -51,6 +51,9 @@ export class QueueScheduler extends QueueBase { let streamLastId = '0-0'; // TODO: updateDelaySet should also return the last event id while (!this.closing) { + // Check if at least the min stalled check time has passed. + await this.moveStalledJobsToWait(); + // Listen to the delay event stream from lastDelayStreamTimestamp // Can we use XGROUPS to reduce redundancy? const blockTime = Math.round( @@ -94,9 +97,6 @@ export class QueueScheduler extends QueueBase { this.nextTimestamp = Number.MAX_VALUE; } } - - // Check if at least the min stalled check time has passed. - // await this.moveStalledJobsToWait(); } } @@ -106,5 +106,19 @@ export class QueueScheduler extends QueueBase { } const [failed, stalled] = await Scripts.moveStalledJobsToWait(this); + + failed.forEach((jobId: string) => { + this.emit( + 'failed', + jobId, + new Error('job stalled more than allowable limit'), + 'active', + ); + }); + stalled.forEach((jobId: string) => { + this.emit('stalled', jobId); + }); + + console.log({ failed, stalled }); } } diff --git a/src/interfaces/advanced-opts.ts b/src/interfaces/advanced-opts.ts index a60f4b0f10..06316f4db0 100644 --- a/src/interfaces/advanced-opts.ts +++ b/src/interfaces/advanced-opts.ts @@ -1,7 +1,4 @@ export interface AdvancedOpts { - // Key expiration time for job locks. - lockDuration?: number; - // How often check for stalled jobs (use 0 for never checking). stalledInterval?: number; @@ -22,7 +19,6 @@ export interface AdvancedOpts { } export const AdvancedOptsDefaults: AdvancedOpts = { - lockDuration: 30000, stalledInterval: 30000, maxStalledCount: 1, guardInterval: 5000, diff --git a/src/test/test_worker.ts b/src/test/test_worker.ts index 83f3696904..76402ce04f 100644 --- a/src/test/test_worker.ts +++ b/src/test/test_worker.ts @@ -6,8 +6,11 @@ import { v4 } from 'node-uuid'; import { delay } from 'bluebird'; import { after, times, once } from 'lodash'; import { RetryErrors } from '@src/enums'; +import * as sinon from 'sinon' describe('workers', function() { + const sandbox = sinon.createSandbox(); + let queue: Queue; let queueEvents: QueueEvents; let queueName: string; @@ -26,6 +29,7 @@ describe('workers', function() { }); afterEach(async function() { + sandbox.restore(); await queue.close(); await queueEvents.close(); return client.quit(); @@ -649,39 +653,61 @@ describe('workers', function() { await worker.close(); }); +/* + it.only('process stalled jobs when starting a queue', function(done) { + this.timeout(6000); - /* - it('process stalled jobs when starting a queue', function(done) { + const onceRunning = once(afterJobsRunning); + + const worker = new Worker(queueName, async job => { + onceRunning(); + return delay(150); + }, {settings: { + stalledInterval: 100, + }, + }); + + queue2.on('completed', doneAfterFour); + queue2.on('stalled', stalledCallback); + + await worker.waitUntilReady(); + + const jobs = await Promise.all([ + queue.add('test', { bar: 'baz' }), + queue.add('test', { bar1: 'baz1' }), + queue.add('test', { bar2: 'baz2' }), + queue.add('test', { bar3: 'baz3' }), + ]); + }); + + it.only('process stalled jobs when starting a queue', function(done) { this.timeout(6000); - utils - .newQueue('test queue stalled', { - settings: { - lockDuration: 15, - lockRenewTime: 5, - stalledInterval: 100, - }, - }) - .then(queueStalled => { - const jobs = [ - queueStalled.add({ bar: 'baz' }), - queueStalled.add({ bar1: 'baz1' }), - queueStalled.add({ bar2: 'baz2' }), - queueStalled.add({ bar3: 'baz3' }), - ]; + + const worker = new Worker(queueName, async job => { + }, {settings: { + stalledInterval: 100, + }, + }); + + await worker.waitUntilReady(); + const jobs = [ + queue.add('test', { bar: 'baz' }), + queue.add('test', { bar1: 'baz1' }), + queue.add('test', { bar2: 'baz2' }), + queue.add('test', { bar3: 'baz3' }), + ]; Promise.all(jobs).then(() => { const afterJobsRunning = function() { const stalledCallback = sandbox.spy(); - return queueStalled - .close(true) - .then(() => { + await worker.close(); + return new Promise((resolve, reject) => { - utils - .newQueue('test queue stalled', { - settings: { - stalledInterval: 100, - }, - }) - .then(queue2 => { + const worker2 = new Worker(queueName, async job => { + }, {settings: { + stalledInterval: 100, + }, + }); + const doneAfterFour = _.after(4, () => { try { expect(stalledCallback.calledOnce).to.be.eql(true); @@ -712,7 +738,7 @@ describe('workers', function() { }); }); }); -*/ + it('processes jobs that were added before the worker started', async () => { const jobs = [ queue.add('test', { bar: 'baz' }), @@ -733,7 +759,7 @@ describe('workers', function() { await worker.close(); }); - +*/ /* it('processes several stalled jobs when starting several queues', function(done) { const queueScheduler = new QueueScheduler(queueName, { @@ -1028,12 +1054,19 @@ describe('workers', function() { }); await worker.waitUntilReady(); - worker.once('failed', async (job, err) => { - expect(job).to.be.ok; - expect(job.data.foo).to.be.eql('bar'); - expect(err).to.be.eql(notEvenErr); - failedOnce = true; - await job.retry(); + const failing = new Promise((resolve, reject) => { + worker.once('failed', async (job, err) => { + try { + expect(job).to.be.ok; + expect(job.data.foo).to.be.eql('bar'); + expect(err).to.be.eql(notEvenErr); + failedOnce = true; + await job.retry(); + } catch (err) { + reject(err); + } + resolve(); + }); }); const completing = new Promise(resolve => { @@ -1048,6 +1081,7 @@ describe('workers', function() { expect(job.data.foo).to.be.eql('bar'); }); + await failing; await completing; await worker.close(); @@ -1666,7 +1700,7 @@ describe('workers', function() { await queueScheduler.close(); }); - it('should not retry a job that is not failed', async () => { + it('should not retry a job that is active', async () => { const worker = new Worker(queueName, async job => { await delay(500); });