From 7b2cef3677e2b3af0370e0023aec4b971ad313fe Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sun, 1 Sep 2019 10:37:16 +0200 Subject: [PATCH] fix: check closing after resuming from pause --- src/classes/worker.ts | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 128e1e777b..a57b8118fd 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -114,14 +114,14 @@ export class Worker extends QueueBase { Returns a promise that resolves to the next job in queue. */ async getNextJob() { - if (this.closing) { - return; - } - if (this.paused) { await this.paused; } + if (this.closing) { + return; + } + if (this.drained) { try { const jobId = await this.waitForJob(); @@ -259,22 +259,31 @@ export class Worker extends QueueBase { * * @returns {Promise} */ - private async whenCurrentJobsFinished() { + private async whenCurrentJobsFinished(reconnect = true) { // // Force reconnection of blocking connection to abort blocking redis call immediately. // this.waiting && (await redisClientDisconnect(this.client)); + // If we are disconnected, how are we going to update the completed/failed sets? if (this.processing) { await Promise.all(this.processing); } - this.waiting && (await this.client.connect()); + + this.waiting && reconnect && (await this.client.connect()); } - async close() { + async close(force = false) { this.emit('closing', 'closing queue'); + await super.close(); + try { - await super.close(); + await this.resume(); + if (!force) { + await this.whenCurrentJobsFinished(false); + } else { + await redisClientDisconnect(this.client); + } } finally { this.childPool && this.childPool.clean(); }