From 3fbacd088bc3bfbd61ed8ff173e4401193ce48ec Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sun, 29 Sep 2019 12:38:26 +0200 Subject: [PATCH] feat: move async initialization to constructors --- src/classes/queue-base.ts | 8 +++++++- src/classes/queue-events.ts | 5 +++-- src/classes/queue-scheduler.ts | 24 +++++++++++------------- src/classes/queue.ts | 1 + src/classes/redis-connection.ts | 2 +- src/classes/worker.ts | 1 + 6 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 30739eebbc..302ac17455 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -48,6 +48,12 @@ export class QueueBase extends EventEmitter { keys[key] = this.toKey(key); }); this.keys = keys; + + this.initializing = this.connection.init(); + + this.waitUntilReady() + .then(client => client.on('error', this.emit.bind(this))) + .catch(err => this.emit('error')); } toKey(type: string) { @@ -59,7 +65,7 @@ export class QueueBase extends EventEmitter { this.initializing = this.connection.init(); } - this.client = await this.initializing; + return (this.client = await this.initializing); } protected base64Name() { diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index 45427634d8..e589ff8064 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -16,7 +16,8 @@ export class QueueEvents extends QueueBase { this.opts, ); - this.consumeEvents(); + // tslint:disable: no-floating-promises + this.consumeEvents().catch(err => this.emit('error')); } private async consumeEvents() { @@ -63,9 +64,9 @@ export class QueueEvents extends QueueBase { } } catch (err) { if (err.message !== 'Connection is closed.') { - await delay(5000); throw err; } + await delay(5000); } } } diff --git a/src/classes/queue-scheduler.ts b/src/classes/queue-scheduler.ts index 3231476276..cb11ad6728 100644 --- a/src/classes/queue-scheduler.ts +++ b/src/classes/queue-scheduler.ts @@ -26,27 +26,25 @@ export class QueueScheduler extends QueueBase { constructor(protected name: string, opts: QueueSchedulerOptions = {}) { super(name, { maxStalledCount: 1, stalledInterval: 30000, ...opts }); + + // tslint:disable: no-floating-promises + this.run(); } - async waitUntilReady() { - await super.waitUntilReady(); + private async run() { + await this.waitUntilReady(); - const [nextTimestamp, streamLastId] = await Scripts.updateDelaySet( - this, - Date.now(), - ); + const key = this.keys.delay; + const opts = this.opts as QueueSchedulerOptions; + const delaySet = await Scripts.updateDelaySet(this, Date.now()); + + const [nextTimestamp] = delaySet; + let streamLastId = delaySet[1] || '0-0'; if (nextTimestamp) { this.nextTimestamp = nextTimestamp; } - this.run(streamLastId); - } - - private async run(streamLastId = '0-0') { - const key = this.keys.delay; - const opts = this.opts as QueueSchedulerOptions; - while (!this.closing) { // Check if at least the min stalled check time has passed. await this.moveStalledJobsToWait(); diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 678c81d248..b912aee1af 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -25,6 +25,7 @@ export class Queue extends QueueGetters { this.jobsOpts = get(opts, 'defaultJobOptions'); + // tslint:disable: no-floating-promises this.waitUntilReady().then(() => { this.client.hset( this.keys.meta, diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 2bbe4562ec..eebd1dbfc7 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -7,7 +7,7 @@ export class RedisConnection { static minimumVersion = '5.0.0'; client: IORedis.Redis; - constructor(private opts?: ConnectionOptions) { + constructor(private opts?: ConnectionOptions | IORedis.Redis) { if (!(opts instanceof IORedis)) { this.opts = Object.assign( { diff --git a/src/classes/worker.ts b/src/classes/worker.ts index a1356c4887..f2ab345587 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -59,6 +59,7 @@ export class Worker extends QueueBase { this.repeat = new Repeat(name, opts); + /* tslint:disable: no-floating-promises */ this.run(); }