Skip to content

Commit

Permalink
fix: rename err to error, fix typing of DefinitionProcessor, use debu…
Browse files Browse the repository at this point in the history
…g ins… (#9)

* rename err to error, fix typing of DefinitionProcessor, use debug instead of console, throw error when createIndex fails

* rename err to error

* more typings

* revert DefinitionProcessor

Co-authored-by: Aras Abbasi <a.abbasi@cognigy.com>
  • Loading branch information
Uzlopak and Aras Abbasi committed Oct 25, 2020
1 parent 5fd44b8 commit 39b598e
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 44 deletions.
41 changes: 19 additions & 22 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ export class Job<DATA = unknown | void> {
} else {
this.attrs.nextRunAt = null;
}
} catch (err) {
} catch (error) {
this.attrs.nextRunAt = null;
this.fail(err);
this.fail(error);
}

return this;
Expand Down Expand Up @@ -271,22 +271,19 @@ export class Job<DATA = unknown | void> {
log('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
await new Promise((resolve, reject) => {
try {
const result = (definition.fn as DefinitionProcessor<DATA, (err?) => void>)(
this,
err => {
if (err) {
reject(err);
return;
}
resolve();
const result = definition.fn(this, error => {
if (error) {
reject(error);
return;
}
);
resolve();
});

if (this.isPromise(result)) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(result as any).catch(err => reject(err));
result.catch((error: Error) => reject(error));
}
} catch (err) {
reject(err);
} catch (error) {
reject(error);
}
});
} else {
Expand All @@ -299,14 +296,14 @@ export class Job<DATA = unknown | void> {
this.agenda.emit('success', this);
this.agenda.emit(`success:${this.attrs.name}`, this);
log('[%s:%s] has succeeded', this.attrs.name, this.attrs._id);
} catch (err) {
} catch (error) {
log('[%s:%s] unknown error occurred', this.attrs.name, this.attrs._id);

this.fail(err);
this.fail(error);

this.agenda.emit('fail', err, this);
this.agenda.emit(`fail:${this.attrs.name}`, err, this);
log('[%s:%s] has failed [%s]', this.attrs.name, this.attrs._id, err.message);
this.agenda.emit('fail', error, this);
this.agenda.emit(`fail:${this.attrs.name}`, error, this);
log('[%s:%s] has failed [%s]', this.attrs.name, this.attrs._id, error.message);
} finally {
this.attrs.lockedAt = undefined;
await this.save();
Expand All @@ -323,7 +320,7 @@ export class Job<DATA = unknown | void> {
}
}

private isPromise(value): value is Promise<void> {
return !!(value && typeof value.then === 'function');
private isPromise(value: unknown): value is Promise<void> {
return !!(value && typeof (value as Promise<void>).then === 'function');
}
}
13 changes: 7 additions & 6 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ export class JobDbRepository {
throw new Error('invalid db config, or db config not found');
}

private hasMongoConnection(connectOptions): connectOptions is IMongoOptions {
return !!connectOptions.mongo;
private hasMongoConnection(connectOptions: unknown): connectOptions is IMongoOptions {
return !!(connectOptions as IMongoOptions)?.mongo;
}

private hasDatabaseConfig(connectOptions): connectOptions is IDatabaseOptions {
return !!connectOptions.db?.address;
private hasDatabaseConfig(connectOptions: unknown): connectOptions is IDatabaseOptions {
return !!(connectOptions as IDatabaseOptions)?.db?.address;
}

async getJobs(
Expand Down Expand Up @@ -193,8 +193,9 @@ export class JobDbRepository {
{ name: 'findAndLockNextJobIndex' }
);
log('index succesfully created', result);
} catch (err) {
console.error('db index creation failed', err);
} catch (error) {
log('db index creation failed', error);
throw error;
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ export class JobProcessor {
* @param {String} name fill a queue with specific job name
* @returns {undefined}
*/
private async jobQueueFilling(name) {
private async jobQueueFilling(name: string): Promise<void> {
// Don't lock because of a limit we have set (lockLimit, etc)
if (!this.shouldLock(name)) {
log.extend('jobQueueFilling')('lock limit reached in queue filling for [%s]', name);
Expand Down Expand Up @@ -529,16 +529,16 @@ export class JobProcessor {
`callback already called - job ${job.attrs.name} already marked complete`
);
}
} catch (err) {
} catch (error) {
// eslint-disable-next-line no-param-reassign
job.canceled = err;
job.canceled = error;
log.extend('runOrRetry')(
'[%s:%s] processing job failed',
job.attrs.name,
job.attrs._id,
err
error
);
this.agenda.emit('error', err);
this.agenda.emit('error', error);
} finally {
// Remove the job from the running queue
let runningJobIndex = this.runningJobs.indexOf(job);
Expand Down
22 changes: 12 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ export class Agenda extends EventEmitter {
// internally used
on(event: 'processJob', listener: (job: Job) => void): this;

on(event: 'fail', listener: (err: Error, job: Job) => void): this;
on(event: 'fail', listener: (error: Error, job: Job) => void): this;
on(event: 'success', listener: (job: Job) => void): this;
on(event: 'start', listener: (job: Job) => void): this;
on(event: 'complete', listener: (job: Job) => void): this;
on(event: string, listener: (job: Job) => void): this;
on(event: string, listener: (err: Error, job: Job) => void): this;
on(event: string, listener: (error: Error, job: Job) => void): this;
on(event: 'ready', listener: () => void): this;
on(event: 'error', listener: (err: Error) => void): this;
on(event: 'error', listener: (error: Error) => void): this;
on(event: string, listener: (...args) => void): this {
return super.on(event, listener);
}
Expand Down Expand Up @@ -84,7 +84,7 @@ export class Agenda extends EventEmitter {
// eslint-disable-next-line @typescript-eslint/ban-types
} & (IDatabaseOptions | IMongoOptions | {}) &
IDbConfig = {},
cb?: (err?: Error) => void
cb?: (error?: Error) => void
) {
super();

Expand Down Expand Up @@ -133,8 +133,10 @@ export class Agenda extends EventEmitter {
return this;
}

private hasDatabaseConfig(config): config is (IDatabaseOptions | IMongoOptions) & IDbConfig {
return !!(config.db?.address || config.mongo);
private hasDatabaseConfig(
config: unknown
): config is (IDatabaseOptions | IMongoOptions) & IDbConfig {
return !!((config as IDatabaseOptions)?.db?.address || (config as IMongoOptions)?.mongo);
}

async cancel(query: FilterQuery<IJobParameters>): Promise<number> {
Expand Down Expand Up @@ -215,7 +217,7 @@ export class Agenda extends EventEmitter {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
define<DATA = any>(
name: string,
processor: (agendaJob: Job<DATA>, done: (err?: Error) => void) => void,
processor: (agendaJob: Job<DATA>, done: (error?: Error) => void) => void,
options?: Partial<Pick<IJobDefinition, 'lockLimit' | 'lockLifetime' | 'concurrency'>> & {
priority?: JobPriority;
}
Expand All @@ -230,13 +232,13 @@ export class Agenda extends EventEmitter {
): void;
define(
name: string,
processor: ((job) => Promise<void>) | ((job, done) => void),
processor: ((job: Job) => Promise<void>) | ((job: Job, done) => void),
options?: Partial<Pick<IJobDefinition, 'lockLimit' | 'lockLifetime' | 'concurrency'>> & {
priority?: JobPriority;
}
): void {
if (this.definitions[name]) {
console.warn('overwriting already defined agenda job', name);
log('overwriting already defined agenda job', name);
}
this.definitions[name] = {
fn: processor,
Expand Down Expand Up @@ -355,7 +357,7 @@ export class Agenda extends EventEmitter {
names: string | string[],
data?: unknown
): Promise<Job | Job[]> {
const createJob = async name => {
const createJob = async (name: string) => {
const job = this.create(name, data);

await job.schedule(when).save();
Expand Down
2 changes: 1 addition & 1 deletion src/types/JobDefinition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export interface IJobDefinition<DATA = unknown> {
/** how many jobs of this kind can run in parallel/simultanously per Agenda instance */
concurrency?: number;

fn: DefinitionProcessor<DATA, void | ((err?: Error) => void)>;
fn: DefinitionProcessor<DATA, void | ((error?: Error) => void)>;
}

export type DefinitionProcessor<DATA, CB> = (
Expand Down
49 changes: 49 additions & 0 deletions test/agenda.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,55 @@ describe('Agenda', () => {
});
});

describe('ensureIndex findAndLockNextJobIndex', () => {
it('ensureIndex-Option false does not create index findAndLockNextJobIndex', async () => {
const agenda = new Agenda({
mongo: mongoDb,
ensureIndex: false
});

agenda.define('someJob', jobProcessor);
await agenda.create('someJob', 1).save();

const listIndex = await mongoDb.command({ listIndexes: 'agendaJobs' });
expect(listIndex.cursor.firstBatch).to.have.lengthOf(1);
expect(listIndex.cursor.firstBatch[0].name).to.be.equal('_id_');
});

it('ensureIndex-Option true does create index findAndLockNextJobIndex', async () => {
const agenda = new Agenda({
mongo: mongoDb,
ensureIndex: true
});

agenda.define('someJob', jobProcessor);
await agenda.create('someJob', 1).save();

const listIndex = await mongoDb.command({ listIndexes: 'agendaJobs' });
expect(listIndex.cursor.firstBatch).to.have.lengthOf(2);
expect(listIndex.cursor.firstBatch[0].name).to.be.equal('_id_');
expect(listIndex.cursor.firstBatch[1].name).to.be.equal('findAndLockNextJobIndex');
});

it('creating two agenda-instances with ensureIndex-Option true does not throw an error', async () => {
const agenda = new Agenda({
mongo: mongoDb,
ensureIndex: true
});

agenda.define('someJob', jobProcessor);
await agenda.create('someJob', 1).save();

const secondAgenda = new Agenda({
mongo: mongoDb,
ensureIndex: true
});

secondAgenda.define('someJob', jobProcessor);
await secondAgenda.create('someJob', 1).save();
});
});

describe('process jobs', () => {
// eslint-disable-line prefer-arrow-callback
it('should not cause unhandledRejection', async () => {
Expand Down

0 comments on commit 39b598e

Please sign in to comment.