Skip to content

Commit

Permalink
fix: allow data type defintions for jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
simllll committed Oct 12, 2020
1 parent 3bd90dc commit ef85fc5
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 105 deletions.
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
{
"name": "@hokify/agenda",
"version": "4.0.2",
"version": "4.0.4",
"description": "Light weight job scheduler for Node.js",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"publishConfig": {
"access": "public"
},
"files": [
"dist"
],
Expand Down
59 changes: 41 additions & 18 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { parsePriority } from './utils/priority';
import type { Agenda } from './index';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';
import { IJobParameters } from './types/JobParameters';
import type { DefinitionProcessor } from './types/JobDefinition';

const log = debug('agenda:job');

Expand All @@ -13,12 +14,31 @@ const log = debug('agenda:job');
* @property {Object} agenda - The Agenda instance
* @property {Object} attrs
*/
export class Job {
readonly attrs: IJobParameters;
export class Job<DATA = any | void> {
readonly attrs: IJobParameters<DATA>;

constructor(
agenda: Agenda,
args: Partial<IJobParameters<void>> & {
name: string;
type: 'normal' | 'single';
}
);
constructor(
agenda: Agenda,
args: Partial<IJobParameters<DATA>> & {
name: string;
type: 'normal' | 'single';
data: DATA;
}
);
constructor(
readonly agenda: Agenda,
args: Partial<IJobParameters> & { name: string; type: 'normal' | 'single' }
args: Partial<IJobParameters<DATA>> & {
name: string;
type: 'normal' | 'single';
data: any;
}
) {
// Remove special args

Expand All @@ -35,10 +55,11 @@ export class Job {
};
}

toJson() {
toJson(): Partial<IJobParameters> {
const attrs = this.attrs || {};
const result = {};

// eslint-disable-next-line no-restricted-syntax
for (const prop in attrs) {
if ({}.hasOwnProperty.call(attrs, prop)) {
result[prop] = attrs[prop];
Expand Down Expand Up @@ -155,18 +176,17 @@ export class Job {
return this.agenda.db.saveJob(this);
}

remove() {
remove(): Promise<number> {
return this.agenda.cancel({ _id: this.attrs._id });
}

async touch(progress?: number) {
// eslint-disable-next-line prefer-rest-params
async touch(progress?: number): Promise<void> {
this.attrs.lockedAt = new Date();
this.attrs.progress = progress;
return this.save();
await this.save();
}

computeNextRunAt() {
private computeNextRunAt() {
try {
if (this.attrs.repeatInterval) {
this.attrs.nextRunAt = computeFromInterval(this.attrs);
Expand Down Expand Up @@ -222,23 +242,26 @@ export class Job {
log('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
await new Promise((resolve, reject) => {
try {
const result = definition.fn(this, err => {
if (err) {
reject(err);
return;
const result = (definition.fn as DefinitionProcessor<DATA, (err?) => void>)(
this,
err => {
if (err) {
reject(err);
return;
}
resolve();
}
resolve();
});
);
if (this.isPromise(result)) {
result.catch(err => reject(err));
(result as any).catch(err => reject(err));
}
} catch (err) {
reject(err);
}
});
} else {
log('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
await definition.fn(this);
await (definition.fn as DefinitionProcessor<DATA, void>)(this);
}

this.attrs.lastFinishedAt = new Date();
Expand Down Expand Up @@ -271,6 +294,6 @@ export class Job {
}

private isPromise(value): value is Promise<void> {
return Boolean(value && typeof value.then === 'function');
return !!(value && typeof value.then === 'function');
}
}
23 changes: 12 additions & 11 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
UpdateQuery,
ObjectId
} from 'mongodb';
import { Job } from './Job';
import type { Job } from './Job';
import { hasMongoProtocol } from './utils/mongodb';
import type { Agenda } from './index';
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
Expand Down Expand Up @@ -207,22 +207,23 @@ export class JobDbRepository {
* @param {Job} job job to save into MongoDB
* @returns {Promise} resolves when job is saved or errors
*/
async saveJob(job: Job) {
async saveJob<T = any>(job: Job<T>): Promise<Job<T>> {
try {
log('attempting to save a job into Agenda instance');

// Grab information needed to save job but that we don't want to persist in MongoDB
const id = job.attrs._id;
const { unique, uniqueOpts } = job.attrs;
// const { unique, uniqueOpts } = job.attrs;

// Store job as JSON and remove props we don't want to store from object
const props: Partial<IJobParameters> = job.toJson();
delete props._id;
delete props.unique;
delete props.uniqueOpts;
// _id, unique, uniqueOpts
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { _id, unique, uniqueOpts, ...props } = {
...job.toJson(),
// Store name of agenda queue as last modifier in job data
lastModifiedBy: this.agenda.attrs.name
};

// Store name of agenda queue as last modifier in job data
props.lastModifiedBy = this.agenda.attrs.name;
log('[job %s] set job props: \n%O', id, props);

// Grab current time and set default query options for MongoDB
Expand Down Expand Up @@ -291,11 +292,11 @@ export class JobDbRepository {
return this.processDbResult(job, result.value);
}

if (unique) {
if (job.attrs.unique) {
// If we want the job to be unique, then we can upsert based on the 'unique' query object that was passed in
const query: FilterQuery<any> = job.attrs.unique;
query.name = props.name;
if (uniqueOpts?.insertOnly) {
if (job.attrs.uniqueOpts?.insertOnly) {
update = { $setOnInsert: props };
}

Expand Down
2 changes: 1 addition & 1 deletion src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class JobProcessor {
| undefined;
} = {};

getStatus() {
async getStatus() {
return {
jobStatus: this.jobStatus,
runningJobs: this.runningJobs.length,
Expand Down
Loading

0 comments on commit ef85fc5

Please sign in to comment.