Skip to content

Commit

Permalink
feat: allow to fork jobs in isolated sub process
Browse files Browse the repository at this point in the history
experimental new feature: this allows to run a job in a seperate
forked child.
  • Loading branch information
simllll committed May 9, 2022
1 parent b086096 commit 2a68c95
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 34 deletions.
35 changes: 35 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
"debug": "~4.3.4",
"human-interval": "~2.0.1",
"luxon": "^2.3.1",
"mongodb": "^4.3.1"
"mongodb": "^4.3.1",
"get-function-location": "^2.0.0"
},
"devDependencies": {
"eslint": "^8.11.0",
Expand Down
125 changes: 99 additions & 26 deletions src/Job.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import * as date from 'date.js';
import * as debug from 'debug';
import { ObjectId } from 'mongodb';
import { fork } from 'child_process';
import * as getFunctionLocation from 'get-function-location';
import type { Agenda } from './index';
import type { DefinitionProcessor } from './types/JobDefinition';
import { IJobParameters, datefields, TJobDatefield } from './types/JobParameters';
import { JobPriority, parsePriority } from './utils/priority';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';

const controller = new AbortController();
const { signal } = controller;

const log = debug('agenda:job');

/**
Expand All @@ -15,6 +20,8 @@ const log = debug('agenda:job');
export class Job<DATA = unknown | void> {
readonly attrs: IJobParameters<DATA>;

static functionLocationCache: { [key: string]: string } = {};

/** this flag is set to true, if a job got canceled (e.g. due to a timeout or other exception),
* you can use it for long running tasks to periodically check if canceled is true,
* also touch will check if and throws that the job got canceled
Expand Down Expand Up @@ -209,7 +216,7 @@ export class Job<DATA = unknown | void> {
* @returns Whether or not job is running at the moment (true for running)
*/
async isRunning(): Promise<boolean> {
if (!this.byJobProcessor) {
if (!this.byJobProcessor || this.attrs.fork) {
// we have no job definition, therfore we are not the job processor, but a client call
// so we get the real state from database
await this.fetchStatus();
Expand Down Expand Up @@ -237,6 +244,10 @@ export class Job<DATA = unknown | void> {
* Saves a job to database
*/
async save(): Promise<Job> {
if (this.agenda.forkedWorker) {
console.warn('calling save() on a Job during a forkedWorker has no effect!');
return this as Job;
}
// ensure db connection is ready
await this.agenda.ready;
return this.agenda.db.saveJob(this as Job);
Expand All @@ -250,7 +261,7 @@ export class Job<DATA = unknown | void> {
}

async isDead(): Promise<boolean> {
if (!this.byJobProcessor) {
if (!this.byJobProcessor || this.attrs.fork) {
// we have no job definition, therfore we are not the job processor, but a client call
// so we get the real state from database
await this.fetchStatus();
Expand All @@ -259,7 +270,13 @@ export class Job<DATA = unknown | void> {
return this.isExpired();
}

isExpired(): boolean {
async isExpired(): Promise<boolean> {
if (!this.byJobProcessor || this.attrs.fork) {
// we have no job definition, therfore we are not the job processor, but a client call
// so we get the real state from database
await this.fetchStatus();
}

const definition = this.agenda.definitions[this.attrs.name];

const lockDeadline = new Date(Date.now() - definition.lockLifetime);
Expand Down Expand Up @@ -317,8 +334,6 @@ export class Job<DATA = unknown | void> {
}

async run(): Promise<void> {
const definition = this.agenda.definitions[this.attrs.name];

this.attrs.lastRunAt = new Date();
log(
'[%s:%s] setting lastRunAt to: %s',
Expand All @@ -333,33 +348,58 @@ export class Job<DATA = unknown | void> {
this.agenda.emit('start', this);
this.agenda.emit(`start:${this.attrs.name}`, this);
log('[%s:%s] starting job', this.attrs.name, this.attrs._id);
if (!definition) {
log('[%s:%s] has no definition, can not run', this.attrs.name, this.attrs._id);
throw new Error('Undefined job');
}

if (definition.fn.length === 2) {
log('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
if (this.attrs.fork && this.agenda.forkHelper) {
const { forkHelper } = this.agenda;
const location =
Job.functionLocationCache[this.attrs.name] ||
(await getFunctionLocation(this.agenda.definitions[this.attrs.name].fn)).source.replace(
/^file:\/\//,
''
);

if (!Job.functionLocationCache[this.attrs.name]) {
Job.functionLocationCache[this.attrs.name] = location;
}
// console.log('location', location);

await new Promise<void>((resolve, reject) => {
try {
const result = definition.fn(this as Job, error => {
if (error) {
reject(error);
return;
}
let stillRunning = true;
const child = fork(forkHelper, [this.attrs.name, this.attrs._id!.toString(), location], {
signal
});

child.on('close', code => {
console.log(`child process exited with code ${code}`);
stillRunning = false;
if (code) {
reject(code);
} else {
resolve();
});

if (this.isPromise(result)) {
result.catch((error: Error) => reject(error));
}
} catch (error) {
reject(error);
}
});
child.on('message', message => {
console.log(`Message from child.js: ${message}`, JSON.stringify(message));
if (typeof message === 'string') {
reject(JSON.parse(message));
} else {
reject(message);
}
});

// check if job is still alive
const checkCancel = () =>
setTimeout(() => {
if (this.canceled) {
controller.abort(); // Stops the child process
} else if (stillRunning) {
setTimeout(checkCancel, 10000);
}
});
checkCancel();
});
} else {
log('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
await (definition.fn as DefinitionProcessor<DATA, void>)(this);
await this.runJob();
}

this.attrs.lastFinishedAt = new Date();
Expand Down Expand Up @@ -397,6 +437,39 @@ export class Job<DATA = unknown | void> {
}
}

async runJob() {
const definition = this.agenda.definitions[this.attrs.name];

if (!definition) {
log('[%s:%s] has no definition, can not run', this.attrs.name, this.attrs._id);
throw new Error('Undefined job');
}

if (definition.fn.length === 2) {
log('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
await new Promise<void>((resolve, reject) => {
try {
const result = definition.fn(this as Job, error => {
if (error) {
reject(error);
return;
}
resolve();
});

if (this.isPromise(result)) {
result.catch((error: Error) => reject(error));
}
} catch (error) {
reject(error);
}
});
} else {
log('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
await (definition.fn as DefinitionProcessor<DATA, void>)(this);
}
}

private isPromise(value: unknown): value is Promise<void> {
return !!(value && typeof (value as Promise<void>).then === 'function');
}
Expand Down
4 changes: 4 additions & 0 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ export class JobDbRepository {
return !!(connectOptions as IDatabaseOptions)?.db?.address;
}

async getJobById(id: string) {
return this.collection.findOne({ _id: new ObjectId(id) });
}

async getJobs(
query: Filter<IJobParameters>,
sort: Sort = {},
Expand Down
6 changes: 3 additions & 3 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ export class JobProcessor {
* handledJobs keeps list of already processed jobs
* @returns {undefined}
*/
private jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
private async jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
// Ensure we have jobs
if (this.jobQueue.length === 0) {
return;
Expand All @@ -395,7 +395,7 @@ export class JobProcessor {

this.jobQueue.remove(job);

if (!job.isExpired()) {
if (!(await job.isExpired())) {
// check if job has expired (and therefore probably got picked up again by another queue in the meantime)
// before it even has started to run

Expand Down Expand Up @@ -513,7 +513,7 @@ export class JobProcessor {
return;
}

if (job.isExpired()) {
if (await job.isExpired()) {
reject(
new Error(
`execution of '${job.attrs.name}' canceled, execution took more than ${
Expand Down
27 changes: 23 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const DefaultOptions = {
defaultLockLimit: 0,
lockLimit: 0,
defaultLockLifetime: 10 * 60 * 1000,
sort: { nextRunAt: 1, priority: -1 } as const
sort: { nextRunAt: 1, priority: -1 } as const,
forkHelper: 'dist/childWorker.js'
};

/**
Expand All @@ -32,9 +33,11 @@ const DefaultOptions = {
export class Agenda extends EventEmitter {
readonly attrs: IAgendaConfig & IDbConfig;

public readonly forkedWorker?: boolean;
public readonly forkHelper?: string;

db: JobDbRepository;
// eslint-disable-next-line default-param-last
// private jobQueue: JobProcessingQueue;

// internally used
on(event: 'processJob', listener: (job: JobWithId) => void): this;

Expand All @@ -47,6 +50,10 @@ export class Agenda extends EventEmitter {
on(event: 'ready', listener: () => void): this;
on(event: 'error', listener: (error: Error) => void): this;
on(event: string, listener: (...args) => void): this {
if (this.forkedWorker) {
console.warn('calling on() during a forkedWorker has no effect!');
return this;
}
return super.on(event, listener);
}

Expand All @@ -62,6 +69,15 @@ export class Agenda extends EventEmitter {
return !!this.jobProcessor;
}

async runForkedJob(name: string, jobId: string) {
const jobData = await this.db.getJobById(jobId);
if (!jobData) {
throw new Error('db entry not found');
}
const job = new Job(this, jobData);
await job.runJob();
}

async getRunningStats(fullDetails = false): Promise<IAgendaStatus> {
if (!this.jobProcessor) {
throw new Error('agenda not running!');
Expand All @@ -84,7 +100,7 @@ export class Agenda extends EventEmitter {
defaultLockLifetime?: number;
// eslint-disable-next-line @typescript-eslint/ban-types
} & (IDatabaseOptions | IMongoOptions | {}) &
IDbConfig = DefaultOptions,
IDbConfig & { forkHelper?: string; forkedWorker?: boolean } = DefaultOptions,
cb?: (error?: Error) => void
) {
super();
Expand All @@ -100,6 +116,9 @@ export class Agenda extends EventEmitter {
sort: config.sort || DefaultOptions.sort
};

this.forkedWorker = config.forkedWorker;
this.forkHelper = config.forkHelper;

this.ready = new Promise(resolve => {
this.once('ready', resolve);
});
Expand Down
3 changes: 3 additions & 0 deletions src/types/JobParameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ export interface IJobParameters<DATA = unknown | void> {
};

lastModifiedBy?: string;

/** forks a new node sub process for executing this job */
fork?: boolean;
}

export type TJobDatefield = keyof Pick<
Expand Down

0 comments on commit 2a68c95

Please sign in to comment.