Skip to content

Commit

Permalink
feat: move async initialization to constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Sep 29, 2019
1 parent 590a4a9 commit 3fbacd0
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 17 deletions.
8 changes: 7 additions & 1 deletion src/classes/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ export class QueueBase extends EventEmitter {
keys[key] = this.toKey(key);
});
this.keys = keys;

this.initializing = this.connection.init();

this.waitUntilReady()
.then(client => client.on('error', this.emit.bind(this)))
.catch(err => this.emit('error'));
}

toKey(type: string) {
Expand All @@ -59,7 +65,7 @@ export class QueueBase extends EventEmitter {
this.initializing = this.connection.init();
}

this.client = await this.initializing;
return (this.client = await this.initializing);
}

protected base64Name() {
Expand Down
5 changes: 3 additions & 2 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ export class QueueEvents extends QueueBase {
this.opts,
);

this.consumeEvents();
// tslint:disable: no-floating-promises
this.consumeEvents().catch(err => this.emit('error'));
}

private async consumeEvents() {
Expand Down Expand Up @@ -63,9 +64,9 @@ export class QueueEvents extends QueueBase {
}
} catch (err) {
if (err.message !== 'Connection is closed.') {
await delay(5000);
throw err;
}
await delay(5000);
}
}
}
Expand Down
24 changes: 11 additions & 13 deletions src/classes/queue-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,25 @@ export class QueueScheduler extends QueueBase {

constructor(protected name: string, opts: QueueSchedulerOptions = {}) {
super(name, { maxStalledCount: 1, stalledInterval: 30000, ...opts });

// tslint:disable: no-floating-promises
this.run();
}

async waitUntilReady() {
await super.waitUntilReady();
private async run() {
await this.waitUntilReady();

const [nextTimestamp, streamLastId] = await Scripts.updateDelaySet(
this,
Date.now(),
);
const key = this.keys.delay;
const opts = this.opts as QueueSchedulerOptions;
const delaySet = await Scripts.updateDelaySet(this, Date.now());

const [nextTimestamp] = delaySet;
let streamLastId = delaySet[1] || '0-0';

if (nextTimestamp) {
this.nextTimestamp = nextTimestamp;
}

this.run(streamLastId);
}

private async run(streamLastId = '0-0') {
const key = this.keys.delay;
const opts = this.opts as QueueSchedulerOptions;

while (!this.closing) {
// Check if at least the min stalled check time has passed.
await this.moveStalledJobsToWait();
Expand Down
1 change: 1 addition & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export class Queue extends QueueGetters {

this.jobsOpts = get(opts, 'defaultJobOptions');

// tslint:disable: no-floating-promises
this.waitUntilReady().then(() => {
this.client.hset(
this.keys.meta,
Expand Down
2 changes: 1 addition & 1 deletion src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class RedisConnection {
static minimumVersion = '5.0.0';
client: IORedis.Redis;

constructor(private opts?: ConnectionOptions) {
constructor(private opts?: ConnectionOptions | IORedis.Redis) {
if (!(opts instanceof IORedis)) {
this.opts = Object.assign(
{
Expand Down
1 change: 1 addition & 0 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export class Worker extends QueueBase {

this.repeat = new Repeat(name, opts);

/* tslint:disable: no-floating-promises */
this.run();
}

Expand Down

0 comments on commit 3fbacd0

Please sign in to comment.