Skip to content

Commit

Permalink
chore: merge upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Sep 1, 2019
1 parent ea01775 commit 69bd8d1
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 50 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "0.0.0-development",
"version": "4.0.0-beta.0",
"description": "Queue for messages and jobs based on Redis",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
32 changes: 23 additions & 9 deletions src/classes/queue-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ const MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed

/**
* This class is just used for some automatic bookkeeping of the queue,
* such as updating the delay set as well as moving stuck jobs back
* such as updating the delay set as well as moving stalled jobs back
* to the waiting list.
*
* Jobs are checked for stuckness once every "visibility window" seconds.
* Jobs are then marked as candidates for being stuck, in the next check,
* the candidates are marked as stuck and moved to wait.
* Jobs are checked for stallness once every "visibility window" seconds.
* Jobs are then marked as candidates for being stalled, in the next check,
* the candidates are marked as stalled and moved to wait.
* Workers need to clean the candidate list with the jobs that they are working
* on, failing to update the list results in the job ending being stuck.
* on, failing to update the list results in the job ending being stalled.
*
* This class requires a dedicated redis connection, and at least one is needed
* to be running at a given time, otherwise delays, stuck jobs, retries, repeatable
* to be running at a given time, otherwise delays, stalled jobs, retries, repeatable
* jobs, etc, will not work correctly or at all.
*
*/
Expand Down Expand Up @@ -51,6 +51,9 @@ export class QueueScheduler extends QueueBase {
let streamLastId = '0-0'; // TODO: updateDelaySet should also return the last event id

while (!this.closing) {
// Check if at least the min stalled check time has passed.
await this.moveStalledJobsToWait();

// Listen to the delay event stream from lastDelayStreamTimestamp
// Can we use XGROUPS to reduce redundancy?
const blockTime = Math.round(
Expand Down Expand Up @@ -94,9 +97,6 @@ export class QueueScheduler extends QueueBase {
this.nextTimestamp = Number.MAX_VALUE;
}
}

// Check if at least the min stalled check time has passed.
// await this.moveStalledJobsToWait();
}
}

Expand All @@ -106,5 +106,19 @@ export class QueueScheduler extends QueueBase {
}

const [failed, stalled] = await Scripts.moveStalledJobsToWait(this);

failed.forEach((jobId: string) => {
this.emit(
'failed',
jobId,
new Error('job stalled more than allowable limit'),
'active',
);
});
stalled.forEach((jobId: string) => {
this.emit('stalled', jobId);
});

console.log({ failed, stalled });
}
}
4 changes: 0 additions & 4 deletions src/interfaces/advanced-opts.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
export interface AdvancedOpts {
// Key expiration time for job locks.
lockDuration?: number;

// How often check for stalled jobs (use 0 for never checking).
stalledInterval?: number;

Expand All @@ -22,7 +19,6 @@ export interface AdvancedOpts {
}

export const AdvancedOptsDefaults: AdvancedOpts = {
lockDuration: 30000,
stalledInterval: 30000,
maxStalledCount: 1,
guardInterval: 5000,
Expand Down
106 changes: 70 additions & 36 deletions src/test/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import { v4 } from 'node-uuid';
import { delay } from 'bluebird';
import { after, times, once } from 'lodash';
import { RetryErrors } from '@src/enums';
import * as sinon from 'sinon'

describe('workers', function() {
const sandbox = sinon.createSandbox();

let queue: Queue;
let queueEvents: QueueEvents;
let queueName: string;
Expand All @@ -26,6 +29,7 @@ describe('workers', function() {
});

afterEach(async function() {
sandbox.restore();
await queue.close();
await queueEvents.close();
return client.quit();
Expand Down Expand Up @@ -649,39 +653,61 @@ describe('workers', function() {

await worker.close();
});
/*
it.only('process stalled jobs when starting a queue', function(done) {
this.timeout(6000);
/*
it('process stalled jobs when starting a queue', function(done) {
const onceRunning = once(afterJobsRunning);
const worker = new Worker(queueName, async job => {
onceRunning();
return delay(150);
}, {settings: {
stalledInterval: 100,
},
});
queue2.on('completed', doneAfterFour);
queue2.on('stalled', stalledCallback);
await worker.waitUntilReady();
const jobs = await Promise.all([
queue.add('test', { bar: 'baz' }),
queue.add('test', { bar1: 'baz1' }),
queue.add('test', { bar2: 'baz2' }),
queue.add('test', { bar3: 'baz3' }),
]);
});
it.only('process stalled jobs when starting a queue', function(done) {
this.timeout(6000);
utils
.newQueue('test queue stalled', {
settings: {
lockDuration: 15,
lockRenewTime: 5,
stalledInterval: 100,
},
})
.then(queueStalled => {
const jobs = [
queueStalled.add({ bar: 'baz' }),
queueStalled.add({ bar1: 'baz1' }),
queueStalled.add({ bar2: 'baz2' }),
queueStalled.add({ bar3: 'baz3' }),
];
const worker = new Worker(queueName, async job => {
}, {settings: {
stalledInterval: 100,
},
});
await worker.waitUntilReady();
const jobs = [
queue.add('test', { bar: 'baz' }),
queue.add('test', { bar1: 'baz1' }),
queue.add('test', { bar2: 'baz2' }),
queue.add('test', { bar3: 'baz3' }),
];
Promise.all(jobs).then(() => {
const afterJobsRunning = function() {
const stalledCallback = sandbox.spy();
return queueStalled
.close(true)
.then(() => {
await worker.close();
return new Promise((resolve, reject) => {
utils
.newQueue('test queue stalled', {
settings: {
stalledInterval: 100,
},
})
.then(queue2 => {
const worker2 = new Worker(queueName, async job => {
}, {settings: {
stalledInterval: 100,
},
});
const doneAfterFour = _.after(4, () => {
try {
expect(stalledCallback.calledOnce).to.be.eql(true);
Expand Down Expand Up @@ -712,7 +738,7 @@ describe('workers', function() {
});
});
});
*/
it('processes jobs that were added before the worker started', async () => {
const jobs = [
queue.add('test', { bar: 'baz' }),
Expand All @@ -733,7 +759,7 @@ describe('workers', function() {
await worker.close();
});

*/
/*
it('processes several stalled jobs when starting several queues', function(done) {
const queueScheduler = new QueueScheduler(queueName, {
Expand Down Expand Up @@ -1028,12 +1054,19 @@ describe('workers', function() {
});
await worker.waitUntilReady();

worker.once('failed', async (job, err) => {
expect(job).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
expect(err).to.be.eql(notEvenErr);
failedOnce = true;
await job.retry();
const failing = new Promise((resolve, reject) => {
worker.once('failed', async (job, err) => {
try {
expect(job).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
expect(err).to.be.eql(notEvenErr);
failedOnce = true;
await job.retry();
} catch (err) {
reject(err);
}
resolve();
});
});

const completing = new Promise(resolve => {
Expand All @@ -1048,6 +1081,7 @@ describe('workers', function() {
expect(job.data.foo).to.be.eql('bar');
});

await failing;
await completing;

await worker.close();
Expand Down Expand Up @@ -1666,7 +1700,7 @@ describe('workers', function() {
await queueScheduler.close();
});

it('should not retry a job that is not failed', async () => {
it('should not retry a job that is active', async () => {
const worker = new Worker(queueName, async job => {
await delay(500);
});
Expand Down

0 comments on commit 69bd8d1

Please sign in to comment.