Skip to content

Commit

Permalink
fix: check closing after resuming from pause
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Sep 1, 2019
1 parent 13b2df2 commit 7b2cef3
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ export class Worker extends QueueBase {
Returns a promise that resolves to the next job in queue.
*/
async getNextJob() {
if (this.closing) {
return;
}

if (this.paused) {
await this.paused;
}

if (this.closing) {
return;
}

if (this.drained) {
try {
const jobId = await this.waitForJob();
Expand Down Expand Up @@ -259,22 +259,31 @@ export class Worker extends QueueBase {
*
* @returns {Promise}
*/
private async whenCurrentJobsFinished() {
private async whenCurrentJobsFinished(reconnect = true) {
//
// Force reconnection of blocking connection to abort blocking redis call immediately.
//
this.waiting && (await redisClientDisconnect(this.client));

// If we are disconnected, how are we going to update the completed/failed sets?
if (this.processing) {
await Promise.all(this.processing);
}
this.waiting && (await this.client.connect());

this.waiting && reconnect && (await this.client.connect());
}

async close() {
async close(force = false) {
this.emit('closing', 'closing queue');
await super.close();

try {
await super.close();
await this.resume();
if (!force) {
await this.whenCurrentJobsFinished(false);
} else {
await redisClientDisconnect(this.client);
}
} finally {
this.childPool && this.childPool.clean();
}
Expand Down

0 comments on commit 7b2cef3

Please sign in to comment.