Skip to content

Commit

Permalink
fix: job timeout check and improve error handling for childs
Browse files Browse the repository at this point in the history
  • Loading branch information
simllll committed May 20, 2022
1 parent 46384bd commit b365957
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 80 deletions.
128 changes: 61 additions & 67 deletions src/Job.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as date from 'date.js';
import * as debug from 'debug';
import { ObjectId } from 'mongodb';
import { fork } from 'child_process';
import { ChildProcess, fork } from 'child_process';
import type { Agenda } from './index';
import type { DefinitionProcessor } from './types/JobDefinition';
import { IJobParameters, datefields, TJobDatefield } from './types/JobParameters';
Expand All @@ -20,7 +20,27 @@ export class Job<DATA = unknown | void> {
* you can use it for long running tasks to periodically check if canceled is true,
* also touch will check if and throws that the job got canceled
*/
canceled: Error | undefined;
private canceled?: Error | true;

getCanceledMessage() {
return typeof this.canceled === 'object'
? this.canceled?.message || this.canceled
: this.canceled;
}

private forkedChild?: ChildProcess;

cancel(error?: Error) {
this.canceled = error || true;
if (this.forkedChild) {
try {
this.forkedChild.send('cancel');
console.info('canceled child', this.attrs.name, this.attrs._id);
} catch (err) {
console.log('cannot send cancel to child');
}
}
}

/** internal variable to ensure a job does not set unlimited numbers of setTimeouts if the job is not processed
* immediately */
Expand Down Expand Up @@ -266,12 +286,6 @@ export class Job<DATA = unknown | void> {
}

async isDead(): Promise<boolean> {
if (!this.byJobProcessor || this.attrs.fork) {
// we have no job definition, therfore we are not the job processor, but a client call
// so we get the real state from database
await this.fetchStatus();
}

return this.isExpired();
}

Expand Down Expand Up @@ -360,68 +374,47 @@ export class Job<DATA = unknown | void> {
}
const { forkHelper } = this.agenda;

let stillRunning = true;
try {
await new Promise<void>((resolve, reject) => {
const child = fork(
forkHelper.path,
[
await new Promise<void>((resolve, reject) => {
this.forkedChild = fork(
forkHelper.path,
[
this.attrs.name,
this.attrs._id!.toString(),
this.agenda.definitions[this.attrs.name].filePath || ''
],
forkHelper.options
);

let childError: any;
this.forkedChild.on('close', code => {
if (code) {
console.info(
'fork parameters',
forkHelper,
this.attrs.name,
this.attrs._id!.toString(),
this.agenda.definitions[this.attrs.name].filePath || ''
],
forkHelper.options
);

child.on('close', code => {
stillRunning = false;
if (code) {
console.info(
'fork parameters',
forkHelper,
this.attrs.name,
this.attrs._id,
this.agenda.definitions[this.attrs.name].filePath
);
const error = new Error(`child process exited with code: ${code}`);
console.warn(error.message);
reject(error);
} else {
resolve();
}
});
child.on('message', message => {
// console.log(`Message from child.js: ${message}`, JSON.stringify(message));
if (typeof message === 'string') {
try {
reject(JSON.parse(message));
} catch (errJson) {
reject(message);
}
} else {
reject(message);
this.attrs._id,
this.agenda.definitions[this.attrs.name].filePath
);
const error = new Error(`child process exited with code: ${code}`);
console.warn(error.message);
reject(childError || error);
} else {
resolve();
}
});
this.forkedChild.on('message', message => {
// console.log(`Message from child.js: ${message}`, JSON.stringify(message));
if (typeof message === 'string') {
try {
childError = JSON.parse(message);
} catch (errJson) {
childError = message;
}
});

// check if job is still alive
const checkCancel = () =>
setTimeout(() => {
if (this.canceled) {
try {
child.send('cancel');
console.info('canceled child', this.attrs.name, this.attrs._id);
} catch (err) {
console.log('cannot send cancel to child');
}
} else if (stillRunning) {
setTimeout(checkCancel, 10000);
}
});
checkCancel();
} else {
childError = message;
}
});
} finally {
stillRunning = false;
}
});
} else {
await this.runJob();
}
Expand All @@ -440,6 +433,7 @@ export class Job<DATA = unknown | void> {
this.agenda.emit(`fail:${this.attrs.name}`, error, this);
log('[%s:%s] has failed [%s]', this.attrs.name, this.attrs._id, error.message);
} finally {
this.forkedChild = undefined;
this.attrs.lockedAt = undefined;
try {
await this.agenda.db.saveJobState(this);
Expand Down
16 changes: 9 additions & 7 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,25 @@ export class JobProcessor {
? this.jobQueue.length
: this.jobQueue.getQueue().map(job => ({
...job.toJson(),
canceled: job.canceled?.message || job.canceled
canceled: job.getCanceledMessage()
})),
runningJobs: !fullDetails
? this.runningJobs.length
: this.runningJobs.map(job => ({
...job.toJson(),
canceled: job.canceled?.message || job.canceled
canceled: job.getCanceledMessage()
})),
lockedJobs: !fullDetails
? this.lockedJobs.length
: this.lockedJobs.map(job => ({
...job.toJson(),
canceled: job.canceled?.message || job.canceled
canceled: job.getCanceledMessage()
})),
jobsToLock: !fullDetails
? this.jobsToLock.length
: this.jobsToLock.map(job => ({
...job.toJson(),
canceled: job.canceled?.message || job.canceled
canceled: job.getCanceledMessage()
})),
isLockingOnTheFly: this.isLockingOnTheFly
};
Expand Down Expand Up @@ -499,6 +499,7 @@ export class JobProcessor {
this.runningJobs.push(job);
this.updateStatus(job.attrs.name, 'running', 1);

let jobIsRunning = true;
try {
log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id);

Expand All @@ -508,7 +509,7 @@ export class JobProcessor {
new Promise<void>((resolve, reject) => {
setTimeout(async () => {
// when job is not running anymore, just finish
if (!(await job.isRunning())) {
if (!jobIsRunning) {
resolve();
return;
}
Expand Down Expand Up @@ -547,8 +548,7 @@ export class JobProcessor {
);
}
} catch (error: any) {
// eslint-disable-next-line no-param-reassign
job.canceled = error;
job.cancel(error);
log.extend('runOrRetry')(
'[%s:%s] processing job failed',
job.attrs.name,
Expand All @@ -557,6 +557,8 @@ export class JobProcessor {
);
this.agenda.emit('error', error);
} finally {
jobIsRunning = false;

// Remove the job from the running queue
let runningJobIndex = this.runningJobs.indexOf(job);
if (runningJobIndex === -1) {
Expand Down
12 changes: 6 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { JobDbRepository } from './JobDbRepository';
import { JobPriority, parsePriority } from './utils/priority';
import { JobProcessor } from './JobProcessor';
import { calculateProcessEvery } from './utils/processEvery';
import {getCallerFilePath} from "./utils/stack";
import { getCallerFilePath } from './utils/stack';

const log = debug('agenda');

Expand Down Expand Up @@ -339,11 +339,11 @@ export class Agenda extends EventEmitter {
log('overwriting already defined agenda job', name);
}

const filePath = getCallerFilePath();
const filePath = getCallerFilePath();

this.definitions[name] = {
this.definitions[name] = {
fn: processor,
filePath,
filePath,
concurrency: options?.concurrency || this.attrs.defaultConcurrency,
lockLimit: options?.lockLimit || this.attrs.defaultLockLimit,
priority: parsePriority(options?.priority),
Expand Down Expand Up @@ -538,7 +538,7 @@ export class Agenda extends EventEmitter {
this.attrs.processEvery
);

this.on('processJob', job => this.jobProcessor?.process(job));
this.on('processJob', this.jobProcessor.process);
}

/**
Expand All @@ -552,7 +552,7 @@ export class Agenda extends EventEmitter {

log('Agenda.stop called, clearing interval for processJobs()');

const lockedJobs = this.jobProcessor?.stop();
const lockedJobs = this.jobProcessor.stop();

log('Agenda._unlockJobs()');
const jobIds = lockedJobs?.map(job => job.attrs._id) || [];
Expand Down

0 comments on commit b365957

Please sign in to comment.