Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/0002 more typings #5

Merged
merged 8 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/Job.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as date from 'date.js';
import * as debug from 'debug';
import { JobPriority, parsePriority } from './utils/priority';
import type { Agenda } from './index';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';
import { IJobParameters } from './types/JobParameters';
import type { IJobParameters } from './types/JobParameters';
import type { DefinitionProcessor } from './types/JobDefinition';
import { JobPriority, parsePriority } from './utils/priority';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';

const log = debug('agenda:job');

Expand Down Expand Up @@ -56,7 +56,7 @@ export class Job<DATA = unknown | void> {
};
}

toJson(): Partial<IJobParameters> {
toJson(): IJobParameters {
const attrs = this.attrs || {};
const result = {};

Expand All @@ -75,7 +75,7 @@ export class Job<DATA = unknown | void> {
});

// console.log('toJson', this.attrs, result);
return result;
return result as IJobParameters;
}

repeatEvery(
Expand Down
48 changes: 28 additions & 20 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import {
MongoClientOptions,
UpdateQuery,
ObjectId,
SortOptionObject
SortOptionObject,
FindOneAndUpdateOption
} from 'mongodb';
import type { Job } from './Job';
import { hasMongoProtocol } from './utils/hasMongoProtocol';
import type { Agenda } from './index';
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import { IJobParameters } from './types/JobParameters';
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import type { IJobParameters } from './types/JobParameters';
import { hasMongoProtocol } from './utils/hasMongoProtocol';

const log = debug('agenda:db');

Expand Down Expand Up @@ -81,7 +82,7 @@ export class JobDbRepository {

async lockJob(job: Job): Promise<IJobParameters | undefined> {
// Query to run against collection to see if we need to lock it
const criteria = {
const criteria: FilterQuery<Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }> = {
_id: job.attrs._id,
name: job.attrs.name,
lockedAt: null,
Expand All @@ -90,11 +91,16 @@ export class JobDbRepository {
};

// Update / options for the MongoDB query
const update = { $set: { lockedAt: new Date() } };
const options = { returnOriginal: false };
const update: UpdateQuery<IJobParameters> = { $set: { lockedAt: new Date() } };
const options: FindOneAndUpdateOption<IJobParameters> = { returnOriginal: false };

// Lock the job in MongoDB!
const resp = await this.collection.findOneAndUpdate(criteria, update, options);
const resp = await this.collection.findOneAndUpdate(
criteria as FilterQuery<IJobParameters>,
update,
options
);

return resp?.value;
}

Expand All @@ -104,11 +110,13 @@ export class JobDbRepository {
lockDeadline: Date,
now: Date = new Date()
): Promise<IJobParameters | undefined> {
// /**
// * Query used to find job to run
// * @type {{$and: [*]}}
// */
const JOB_PROCESS_WHERE_QUERY = {
/**
* Query used to find job to run
* @type {{$and: [*]}}
*/
const JOB_PROCESS_WHERE_QUERY: FilterQuery<
Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }
> = {
$and: [
{
name: jobName,
Expand All @@ -132,13 +140,16 @@ export class JobDbRepository {
* Query used to set a job as locked
* @type {{$set: {lockedAt: Date}}}
*/
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };
const JOB_PROCESS_SET_QUERY: UpdateQuery<IJobParameters> = { $set: { lockedAt: now } };

/**
* Query used to affect what gets returned
* @type {{returnOriginal: boolean, sort: object}}
*/
const JOB_RETURN_QUERY = { returnOriginal: false, sort: this.connectOptions.sort };
const JOB_RETURN_QUERY: FindOneAndUpdateOption<IJobParameters> = {
returnOriginal: false,
sort: this.connectOptions.sort
};

// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
const result = await this.collection.findOneAndUpdate(
Expand Down Expand Up @@ -208,7 +219,7 @@ export class JobDbRepository {

private processDbResult<DATA = unknown | void>(
job: Job<DATA>,
res: IJobParameters<DATA>
res?: IJobParameters<DATA>
simllll marked this conversation as resolved.
Show resolved Hide resolved
): Job<DATA> {
log(
'processDbResult() called with success, checking whether to process job immediately or not'
Expand Down Expand Up @@ -243,7 +254,6 @@ export class JobDbRepository {

// Grab information needed to save job but that we don't want to persist in MongoDB
const id = job.attrs._id;
// const { unique, uniqueOpts } = job.attrs;

// Store job as JSON and remove props we don't want to store from object
// _id, unique, uniqueOpts
Expand Down Expand Up @@ -309,7 +319,7 @@ export class JobDbRepository {
update,
{
upsert: true,
returnOriginal: false // same as new: true -> returns the final document
returnOriginal: false
}
);
log(
Expand All @@ -330,8 +340,6 @@ export class JobDbRepository {
update = { $setOnInsert: props };
}

// console.log('update', query, update, uniqueOpts);

// Use the 'unique' query object to find an existing job or create a new one
log('calling findOneAndUpdate() with unique object as query: \n%O', query);
const result = await this.collection.findOneAndUpdate(query, update, {
Expand Down
2 changes: 1 addition & 1 deletion src/JobProcessingQueue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// eslint-disable-next-line prettier/prettier
import type { Job } from './Job';
import { IJobParameters } from './types/JobParameters';
import type { IJobParameters } from './types/JobParameters';
import type { Agenda } from './index';
/**
* @class
Expand Down
8 changes: 4 additions & 4 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as debug from 'debug';
import { Job } from './Job';
import { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus';
import { IJobDefinition } from './types/JobDefinition';
import { JobProcessingQueue } from './JobProcessingQueue';
import type { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus';
import type { IJobDefinition } from './types/JobDefinition';
import type { Agenda } from './index';
import type { IJobParameters } from './types/JobParameters';
import { Job } from './Job';
import { JobProcessingQueue } from './JobProcessingQueue';

const log = debug('agenda:jobProcessor');

Expand Down
16 changes: 8 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ import { EventEmitter } from 'events';
import * as debug from 'debug';

import * as humanInterval from 'human-interval';
import { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb';
import { Job } from './Job';
import { JobProcessor } from './JobProcessor';
import type { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb';
import type { IJobDefinition } from './types/JobDefinition';
import { IAgendaConfig } from './types/AgendaConfig';
import type { IAgendaConfig } from './types/AgendaConfig';
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import type { IAgendaStatus } from './types/AgendaStatus';
import type { IJobParameters } from './types/JobParameters';
import { Job } from './Job';
import { JobDbRepository } from './JobDbRepository';
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import { filterUndefined } from './utils/filterUndefined';
import { JobProcessor } from './JobProcessor';
import { JobPriority, parsePriority } from './utils/priority';
import { IAgendaStatus } from './types/AgendaStatus';
import { IJobParameters } from './types/JobParameters';
import { filterUndefined } from './utils/filterUndefined';

const log = debug('agenda');

Expand Down
2 changes: 1 addition & 1 deletion src/types/AgendaStatus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Job } from '../Job';
import type { Job } from '../Job';

export interface IAgendaJobStatus {
[name: string]: { running: number; locked: number };
Expand Down
4 changes: 2 additions & 2 deletions src/types/DbOptions.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Db, MongoClientOptions, SortOptionObject } from 'mongodb';
import { IJobParameters } from './JobParameters';
import type { Db, MongoClientOptions, SortOptionObject } from 'mongodb';
import type { IJobParameters } from './JobParameters';

export interface IDatabaseOptions {
db: {
Expand Down
9 changes: 3 additions & 6 deletions src/types/JobDefinition.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Job } from '../Job';
import type { Job } from '../Job';

export interface IJobDefinition<DATA = unknown> {
/** max number of locked jobs of this kind */
Expand All @@ -7,13 +7,10 @@ export interface IJobDefinition<DATA = unknown> {
lockLifetime: number;
/** Higher priority jobs will run first. */
priority?: number;
/** how many jobs of this kind can run in parallel/simultanously */
/** how many jobs of this kind can run in parallel/simultanously per Agenda instance */
concurrency?: number;

// running: number;
// locked: number;

fn: DefinitionProcessor<DATA, void | ((err?) => void)>;
fn: DefinitionProcessor<DATA, void | ((err?: Error) => void)>;
}

export type DefinitionProcessor<DATA, CB> = (
Expand Down
5 changes: 2 additions & 3 deletions src/types/JobParameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ export interface IJobParameters<DATA = unknown | void> {

name: string;
priority: number;
nextRunAt: Date | null;
nextRunAt?: Date | null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now | null can be reverted again right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never $unset nextRunAt and there are multiple places where nextRunAt is set explicitly to null
The reason to make nextRunAt optional was the delete of nextRunAt in line 297 in JobDbRepository.ts

Copy link
Contributor

@simllll simllll Oct 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right nextRunAt: Date | null;

there is .toJson() used (which is kinda weird anyway), but the reason behind this is that it returned a partial result before, you have changed the result of it to a non partial return value, what was the motivation behind that? Could we just let it return a partial result? Otherwise we could also just change line 252 to:
from ...job.toJson() to ...(job.toJson() as Partial<IJobParameters>),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted that ? making nextRunAt mandatory in typings. I changed in line 297 of JobDbRepository.ts the typing when deleting that field. This makes imho the most sense.

/**
* // once: the job is just queued in the database --> this does not really exists, it's just fallback
* normal: job is queued and will be processed (regular case when the user adds a new job)
* single: job with this name is only queued once, if there is an exisitn gentry in the database, the job is just updated, but not newly inserted (this is used for .every())
*/
type: /* 'once' | */ 'normal' | 'single';
type: 'normal' | 'single';

lockedAt?: Date;
lastFinishedAt?: Date;
Expand Down
5 changes: 2 additions & 3 deletions src/utils/nextRunAt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as moment from 'moment-timezone';
import * as humanInterval from 'human-interval';
import * as date from 'date.js';
import * as debug from 'debug';
import { IJobParameters } from '../types/JobParameters';
import type { IJobParameters } from '../types/JobParameters';
import { isValidDate } from './date';

const log = debug('agenda:nextRunAt');
Expand All @@ -15,7 +15,7 @@ const dateForTimezone = (timezoneDate: Date, timezone?: string): moment.Moment =
momentDate.tz(timezone);
}

return momentDate; // .utc(false).toDate();
return momentDate;
};

/**
Expand All @@ -41,7 +41,6 @@ export const computeFromInterval = (attrs: IJobParameters): Date => {
}

result = nextDate;
// Either `xo` linter or Node.js 8 stumble on this line if it isn't just ignored
} catch (error) {
// eslint-disable-line no-unused-vars
// Nope, humanInterval then!
Expand Down
15 changes: 7 additions & 8 deletions src/utils/priority.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
/**
* Internal method to turn priority into a number
* @name Job#priority
* @function
* @param {String|Number} priority string to parse into number
* @returns {Number} priority that was parsed
*/

export type JobPriority = number | keyof typeof priorityMap;

const priorityMap = {
Expand All @@ -16,6 +8,13 @@ const priorityMap = {
highest: 20
};

/**
* Internal method to turn priority into a number
* @name Job#priority
* @function
* @param {String|Number} priority string to parse into number
* @returns {Number} priority that was parsed
*/
export function parsePriority(priority?: JobPriority): number {
if (typeof priority === 'number') {
return priority;
Expand Down
4 changes: 2 additions & 2 deletions test/retry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const clearJobs = async (): Promise<void> => {
};

const jobType = 'do work';
const jobProcessor = () => {};
const jobProcessor = () => { };

describe('Retry', () => {
beforeEach(async () => {
Expand Down Expand Up @@ -79,5 +79,5 @@ describe('Retry', () => {

await agenda.start();
await successPromise;
});
}).timeout(100000);
});