Skip to content

Commit

Permalink
fix: fix failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stansv committed Nov 1, 2019
1 parent 1d4fa38 commit 824eb6b
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 120 deletions.
2 changes: 1 addition & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ export class Job {
/**
* Moves a job to the failed queue.
* @param err {Error} The jobs error message.
* @param token {string} Token to check job is locked by current worker
* @param fetchNext {boolean} True when wanting to fetch the next job
* @returns void
*/
Expand All @@ -252,7 +253,6 @@ export class Job {
// Check if an automatic retry should be performed
//
let moveToFailed = false;
// FIXME why don't we moveToFailed with fetchNext = true?
if (this.attemptsMade < this.opts.attempts && !this.discarded) {
const opts = queue.opts as WorkerOptions;

Expand Down
18 changes: 9 additions & 9 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ export class Scripts {
return (<any>client).removeJob(keys.concat([queue.keys.events, jobId]));
}

static async extendLock(queue: QueueBase, jobId: string, token: string) {
const client = await queue.client;
const opts: WorkerOptions = <WorkerOptions>queue.opts;
static async extendLock(worker: Worker, jobId: string, token: string) {
const client = await worker.client;
const opts: WorkerOptions = worker.opts;
const args = [
queue.toKey(jobId) + ':lock',
queue.keys.stalled,
worker.toKey(jobId) + ':lock',
worker.keys.stalled,
token,
opts.lockDuration,
jobId,
Expand Down Expand Up @@ -362,11 +362,11 @@ export class Scripts {
return (<any>client).reprocessJob(keys.concat(args));
}

static async moveToActive(queue: Worker, token: string, jobId?: string) {
const client = await queue.client;
const opts: WorkerOptions = <WorkerOptions>queue.opts;
static async moveToActive(worker: Worker, token: string, jobId?: string) {
const client = await worker.client;
const opts = worker.opts;

const queueKeys = queue.keys;
const queueKeys = worker.keys;
const keys = [queueKeys.wait, queueKeys.active, queueKeys.priority];

keys[3] = queueKeys.events;
Expand Down
51 changes: 51 additions & 0 deletions src/classes/timer-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import uuid from 'uuid';

/**
* Keeps track on timers created with setTimeout to help clearTimeout
* for all timers when no more delayed actions needed
*/
export class TimerManager {
private timers: any = {};

public setTimer(name: string, delay: number, fn: Function) {
const id = uuid.v4();
const timer = setTimeout(
timeoutId => {
this.clearTimer(timeoutId);
try {
fn();
} catch (err) {
console.error(err);
}
},
delay,
id,
);

// XXX only the timer is used, but the
// other fields are useful for
// troubleshooting/debugging
this.timers[id] = {
name,
timer,
};

return id;
}

public clearTimer(id: string) {
const timers = this.timers;
const timer = timers[id];
if (!timer) {
return;
}
clearTimeout(timer.timer);
delete timers[id];
}

public clearAllTimers() {
Object.keys(this.timers).forEach(key => {
this.clearTimer(key);
});
}
}
81 changes: 15 additions & 66 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { RedisConnection } from './redis-connection';
import sandbox from './sandbox';
import { Scripts } from './scripts';
import uuid from 'uuid';
import { TimerManager } from './timer-manager';

// note: sandboxed processors would also like to define concurrency per process
// for better resource utilization.
Expand All @@ -27,6 +28,7 @@ export class Worker extends QueueBase {
private paused: Promise<void>;
private _repeat: Repeat;
private childPool: ChildPool;
private timerManager: TimerManager;

private blockingConnection: RedisConnection;

Expand Down Expand Up @@ -73,9 +75,12 @@ export class Worker extends QueueBase {
this.childPool = this.childPool || pool;
this.processFn = sandbox(processor, this.childPool).bind(this);
}
this.timerManager = new TimerManager();

/* tslint:disable: no-floating-promises */
this.run(); // TODO catch, emit error
this.run().catch(error => {
console.error(error);
});
}

get repeat() {
Expand Down Expand Up @@ -122,12 +127,7 @@ export class Worker extends QueueBase {
while (!this.closing) {
if (processing.size < opts.concurrency) {
const token = tokens.pop();
processing.set(
this.getNextJob(token).catch(error => {
console.error('getNextJob failed', error); // TODO handle error properly
}),
token,
);
processing.set(this.getNextJob(token), token);
}

/*
Expand All @@ -144,12 +144,7 @@ export class Worker extends QueueBase {
const job = await completed;
if (job) {
// reuse same token if next job is available to process
processing.set(
this.processJob(job, token).catch(error => {
console.error('processJob failed', error); // TODO handle error properly
}),
token,
);
processing.set(this.processJob(job, token), token);
} else {
tokens.push(token);
}
Expand Down Expand Up @@ -251,7 +246,7 @@ export class Worker extends QueueBase {
let lockRenewId: string;
let timerStopped = false;
const lockExtender = () => {
lockRenewId = this.setTimer(
lockRenewId = this.timerManager.setTimer(
'lockExtender',
this.opts.lockRenewTime,
async () => {
Expand All @@ -260,17 +255,18 @@ export class Worker extends QueueBase {
if (result && !timerStopped) {
lockExtender();
}
// FIXME if result = 0, reject processFn promise to take next job?
// FIXME if result = 0 (missing lock), reject processFn promise to take next job?
} catch (error) {
console.error('Error extending lock ', error);
// Somehow tell the worker this job should stop processing...
}
},
);
};

const stopTimer = () => {
timerStopped = true;
this.clearTimer(lockRenewId);
this.timerManager.clearTimer(lockRenewId);
};

// end copy-paste from Bull3
Expand All @@ -282,7 +278,6 @@ export class Worker extends QueueBase {
!(this.closing || this.paused),
);
this.emit('completed', job, result, 'active');
// FIXME should we call nextJobFromJobData here to emit drained event?
return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null;
};

Expand All @@ -297,7 +292,6 @@ export class Worker extends QueueBase {

await job.moveToFailed(err, token);
this.emit('failed', job, error, 'active');
// FIXME can we also fetch next job right away as in handleCompleted?
};

// TODO: how to cancel the processing? (null -> job.cancel() => throw CancelError()void)
Expand All @@ -306,7 +300,7 @@ export class Worker extends QueueBase {
lockExtender();
try {
const result = await this.processFn(job);
return handleCompleted(result);
return await handleCompleted(result);
} catch (err) {
return handleFailed(err);
} finally {
Expand Down Expand Up @@ -366,7 +360,7 @@ export class Worker extends QueueBase {
this.waiting && (await this.blockingConnection.disconnect());

if (this.processing) {
await Promise.all(this.processing);
await Promise.all(this.processing.keys());
}

this.waiting && reconnect && (await this.blockingConnection.reconnect());
Expand All @@ -390,57 +384,12 @@ export class Worker extends QueueBase {
} catch (err) {
reject(err);
} finally {
this.clearAllTimers();
this.timerManager.clearAllTimers();
this.childPool && this.childPool.clean();
}
this.emit('closed');
resolve();
});
}
}

// code from former TimerManager
private timers: any = {};

private setTimer(name: string, delay: number, fn: Function) {
const id = uuid.v4();
const timer = setTimeout(
timeoutId => {
this.clearTimer(timeoutId);
try {
fn();
} catch (err) {
console.error(err);
}
},
delay,
id,
);

// XXX only the timer is used, but the
// other fields are useful for
// troubleshooting/debugging
this.timers[id] = {
name,
timer,
};

return id;
}

private clearTimer(id: string) {
const timers = this.timers;
const timer = timers[id];
if (!timer) {
return;
}
clearTimeout(timer.timer);
delete timers[id];
}

private clearAllTimers() {
Object.keys(this.timers).forEach(key => {
this.clearTimer(key);
});
}
}
4 changes: 3 additions & 1 deletion src/commands/moveToFinished-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
local lockKey = jobKey .. ':lock'

-- get a lock
rcall("SET", lockKey, ARGV[10], "PX", ARGV[11])
if ARGV[10] ~= "0" then
rcall("SET", lockKey, ARGV[10], "PX", ARGV[11])
end

rcall("ZREM", KEYS[5], jobId) -- remove from priority
rcall("XADD", KEYS[6], "*", "event", "active", "jobId", jobId,
Expand Down
Loading

0 comments on commit 824eb6b

Please sign in to comment.