Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/0002 more typings #5

Merged
merged 8 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/Job.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as date from 'date.js';
import * as debug from 'debug';
import { JobPriority, parsePriority } from './utils/priority';
import type { Agenda } from './index';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';
import { IJobParameters } from './types/JobParameters';
import type { IJobParameters } from './types/JobParameters';
import type { DefinitionProcessor } from './types/JobDefinition';
import { JobPriority, parsePriority } from './utils/priority';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';

const log = debug('agenda:job');

Expand Down Expand Up @@ -56,7 +56,7 @@ export class Job<DATA = unknown | void> {
};
}

toJson(): Partial<IJobParameters> {
toJson(): IJobParameters {
const attrs = this.attrs || {};
const result = {};

Expand All @@ -74,8 +74,7 @@ export class Job<DATA = unknown | void> {
}
});

// console.log('toJson', this.attrs, result);
return result;
return result as IJobParameters;
}

repeatEvery(
Expand Down
50 changes: 29 additions & 21 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import {
MongoClientOptions,
UpdateQuery,
ObjectId,
SortOptionObject
SortOptionObject,
FindOneAndUpdateOption
} from 'mongodb';
import type { Job } from './Job';
import { hasMongoProtocol } from './utils/hasMongoProtocol';
import type { Agenda } from './index';
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import { IJobParameters } from './types/JobParameters';
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import type { IJobParameters } from './types/JobParameters';
import { hasMongoProtocol } from './utils/hasMongoProtocol';

const log = debug('agenda:db');

Expand Down Expand Up @@ -81,7 +82,7 @@ export class JobDbRepository {

async lockJob(job: Job): Promise<IJobParameters | undefined> {
// Query to run against collection to see if we need to lock it
const criteria = {
const criteria: FilterQuery<Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }> = {
_id: job.attrs._id,
name: job.attrs.name,
lockedAt: null,
Expand All @@ -90,11 +91,16 @@ export class JobDbRepository {
};

// Update / options for the MongoDB query
const update = { $set: { lockedAt: new Date() } };
const options = { returnOriginal: false };
const update: UpdateQuery<IJobParameters> = { $set: { lockedAt: new Date() } };
const options: FindOneAndUpdateOption<IJobParameters> = { returnOriginal: false };

// Lock the job in MongoDB!
const resp = await this.collection.findOneAndUpdate(criteria, update, options);
const resp = await this.collection.findOneAndUpdate(
criteria as FilterQuery<IJobParameters>,
update,
options
);

return resp?.value;
}

Expand All @@ -104,11 +110,13 @@ export class JobDbRepository {
lockDeadline: Date,
now: Date = new Date()
): Promise<IJobParameters | undefined> {
// /**
// * Query used to find job to run
// * @type {{$and: [*]}}
// */
const JOB_PROCESS_WHERE_QUERY = {
/**
* Query used to find job to run
* @type {{$and: [*]}}
*/
const JOB_PROCESS_WHERE_QUERY: FilterQuery<
Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }
> = {
$and: [
{
name: jobName,
Expand All @@ -132,13 +140,16 @@ export class JobDbRepository {
* Query used to set a job as locked
* @type {{$set: {lockedAt: Date}}}
*/
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };
const JOB_PROCESS_SET_QUERY: UpdateQuery<IJobParameters> = { $set: { lockedAt: now } };

/**
* Query used to affect what gets returned
* @type {{returnOriginal: boolean, sort: object}}
*/
const JOB_RETURN_QUERY = { returnOriginal: false, sort: this.connectOptions.sort };
const JOB_RETURN_QUERY: FindOneAndUpdateOption<IJobParameters> = {
returnOriginal: false,
sort: this.connectOptions.sort
};

// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
const result = await this.collection.findOneAndUpdate(
Expand Down Expand Up @@ -208,7 +219,7 @@ export class JobDbRepository {

private processDbResult<DATA = unknown | void>(
job: Job<DATA>,
res: IJobParameters<DATA>
res?: IJobParameters<DATA>
simllll marked this conversation as resolved.
Show resolved Hide resolved
): Job<DATA> {
log(
'processDbResult() called with success, checking whether to process job immediately or not'
Expand Down Expand Up @@ -243,7 +254,6 @@ export class JobDbRepository {

// Grab information needed to save job but that we don't want to persist in MongoDB
const id = job.attrs._id;
// const { unique, uniqueOpts } = job.attrs;

// Store job as JSON and remove props we don't want to store from object
// _id, unique, uniqueOpts
Expand Down Expand Up @@ -284,7 +294,7 @@ export class JobDbRepository {
if (props.nextRunAt && props.nextRunAt <= now) {
log('job has a scheduled nextRunAt time, protecting that field from upsert');
protect.nextRunAt = props.nextRunAt;
delete props.nextRunAt;
delete (props as Partial<IJobParameters>).nextRunAt;
}

// If we have things to protect, set them in MongoDB using $setOnInsert
Expand All @@ -309,7 +319,7 @@ export class JobDbRepository {
update,
{
upsert: true,
returnOriginal: false // same as new: true -> returns the final document
returnOriginal: false
}
);
log(
Expand All @@ -330,8 +340,6 @@ export class JobDbRepository {
update = { $setOnInsert: props };
}

// console.log('update', query, update, uniqueOpts);

// Use the 'unique' query object to find an existing job or create a new one
log('calling findOneAndUpdate() with unique object as query: \n%O', query);
const result = await this.collection.findOneAndUpdate(query, update, {
Expand Down
2 changes: 1 addition & 1 deletion src/JobProcessingQueue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// eslint-disable-next-line prettier/prettier
import type { Job } from './Job';
import { IJobParameters } from './types/JobParameters';
import type { IJobParameters } from './types/JobParameters';
import type { Agenda } from './index';
/**
* @class
Expand Down
8 changes: 4 additions & 4 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as debug from 'debug';
import { Job } from './Job';
import { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus';
import { IJobDefinition } from './types/JobDefinition';
import { JobProcessingQueue } from './JobProcessingQueue';
import type { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus';
import type { IJobDefinition } from './types/JobDefinition';
import type { Agenda } from './index';
import type { IJobParameters } from './types/JobParameters';
import { Job } from './Job';
import { JobProcessingQueue } from './JobProcessingQueue';

const log = debug('agenda:jobProcessor');

Expand Down
16 changes: 8 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { EventEmitter } from 'events';
import * as debug from 'debug';

import { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb';
import { Job } from './Job';
import { JobProcessor } from './JobProcessor';
import type { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb';
import type { IJobDefinition } from './types/JobDefinition';
import { IAgendaConfig } from './types/AgendaConfig';
import type { IAgendaConfig } from './types/AgendaConfig';
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import type { IAgendaStatus } from './types/AgendaStatus';
import type { IJobParameters } from './types/JobParameters';
import { Job } from './Job';
import { JobDbRepository } from './JobDbRepository';
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import { filterUndefined } from './utils/filterUndefined';
import { JobPriority, parsePriority } from './utils/priority';
import { IAgendaStatus } from './types/AgendaStatus';
import { IJobParameters } from './types/JobParameters';
import { JobProcessor } from './JobProcessor';
import { calculateProcessEvery } from './utils/processEvery';
import { filterUndefined } from './utils/filterUndefined';

const log = debug('agenda');

Expand Down
2 changes: 1 addition & 1 deletion src/types/AgendaStatus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Job } from '../Job';
import type { Job } from '../Job';

export interface IAgendaJobStatus {
[name: string]: { running: number; locked: number };
Expand Down
4 changes: 2 additions & 2 deletions src/types/DbOptions.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Db, MongoClientOptions, SortOptionObject } from 'mongodb';
import { IJobParameters } from './JobParameters';
import type { Db, MongoClientOptions, SortOptionObject } from 'mongodb';
import type { IJobParameters } from './JobParameters';

export interface IDatabaseOptions {
db: {
Expand Down
9 changes: 3 additions & 6 deletions src/types/JobDefinition.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Job } from '../Job';
import type { Job } from '../Job';

export interface IJobDefinition<DATA = unknown> {
/** max number of locked jobs of this kind */
Expand All @@ -7,13 +7,10 @@ export interface IJobDefinition<DATA = unknown> {
lockLifetime: number;
/** Higher priority jobs will run first. */
priority?: number;
/** how many jobs of this kind can run in parallel/simultanously */
/** how many jobs of this kind can run in parallel/simultanously per Agenda instance */
concurrency?: number;

// running: number;
// locked: number;

fn: DefinitionProcessor<DATA, void | ((err?) => void)>;
fn: DefinitionProcessor<DATA, void | ((err?: Error) => void)>;
}

export type DefinitionProcessor<DATA, CB> = (
Expand Down
3 changes: 1 addition & 2 deletions src/types/JobParameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ export interface IJobParameters<DATA = unknown | void> {
priority: number;
nextRunAt: Date | null;
/**
* // once: the job is just queued in the database --> this does not really exists, it's just fallback
* normal: job is queued and will be processed (regular case when the user adds a new job)
* single: job with this name is only queued once, if there is an exisitn gentry in the database, the job is just updated, but not newly inserted (this is used for .every())
*/
type: /* 'once' | */ 'normal' | 'single';
type: 'normal' | 'single';

lockedAt?: Date;
lastFinishedAt?: Date;
Expand Down
15 changes: 7 additions & 8 deletions src/utils/priority.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
/**
* Internal method to turn priority into a number
* @name Job#priority
* @function
* @param {String|Number} priority string to parse into number
* @returns {Number} priority that was parsed
*/

export type JobPriority = number | keyof typeof priorityMap;

const priorityMap = {
Expand All @@ -16,6 +8,13 @@ const priorityMap = {
highest: 20
};

/**
* Internal method to turn priority into a number
* @name Job#priority
* @function
* @param {String|Number} priority string to parse into number
* @returns {Number} priority that was parsed
*/
export function parsePriority(priority?: JobPriority): number {
if (typeof priority === 'number') {
return priority;
Expand Down
2 changes: 1 addition & 1 deletion test/job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ describe('Job', () => {
]);
expect(results).to.eql([10, 0, -10]);
} catch (err) {
console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
// console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
}
});

Expand Down
4 changes: 2 additions & 2 deletions test/retry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const clearJobs = async (): Promise<void> => {
};

const jobType = 'do work';
const jobProcessor = () => {};
const jobProcessor = () => { };

describe('Retry', () => {
beforeEach(async () => {
Expand Down Expand Up @@ -79,5 +79,5 @@ describe('Retry', () => {

await agenda.start();
await successPromise;
});
}).timeout(100000);
});