diff --git a/README.md b/README.md index ebb0836..b3b72df 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ Since there are a few job queue solutions, here a table comparing them to help y better suits your needs. | Feature | Bull | Bee | Agenda | AgendaTS | -|:---------------------------| :-------------: | :------: | :----: | :------: | +| :------------------------- | :-------------: | :------: | :----: | :------: | | Backend | redis | redis | mongo | mongo | | Priorities | ✓ | | ✓ | ✓ | | Concurrency | ✓ | ✓ | ✓ | ✓ | @@ -1148,6 +1148,14 @@ childWorker.ts ```ts import 'reflect-metadata'; +process.on('message', message => { + if (message === 'cancel') { + process.exit(2); + } else { + console.log('got message', message); + } +}); + (async () => { const mongooseConnection = /** connect to database */ @@ -1204,6 +1212,7 @@ import 'reflect-metadata'; process.exit(1); }); + ``` Ensure to only define job definitions during this step, otherwise you create some diff --git a/src/Job.ts b/src/Job.ts index 2be73d0..5eae89d 100644 --- a/src/Job.ts +++ b/src/Job.ts @@ -360,73 +360,68 @@ export class Job { } const { forkHelper } = this.agenda; - // console.log('location', location); - let controller: AbortController | undefined; - let signal: AbortSignal | undefined; - if (typeof AbortController !== 'undefined') { - controller = new AbortController(); - ({ signal } = controller); - } else { - console.warn('AbortController not supported!'); - } - - await new Promise((resolve, reject) => { - let stillRunning = true; - - const child = fork( - forkHelper.path, - [ - this.attrs.name, - this.attrs._id!.toString(), - this.agenda.definitions[this.attrs.name].filePath || '' - ], - { - ...forkHelper.options, - signal - } - ); - - child.on('close', code => { - stillRunning = false; - if (code) { - console.info( - 'fork parameters', - forkHelper, + let stillRunning = true; + try { + await new Promise((resolve, reject) => { + const child = fork( + forkHelper.path, + [ this.attrs.name, - this.attrs._id, - this.agenda.definitions[this.attrs.name].filePath - ); - const error = new Error(`child process exited with code: ${code}`); - console.warn(error.message); - reject(error); - } else { - resolve(); - } - }); - child.on('message', message => { - // console.log(`Message from child.js: ${message}`, JSON.stringify(message)); - if (typeof message === 'string') { - try { - reject(JSON.parse(message)); - } catch (errJson) { - reject(message); + this.attrs._id!.toString(), + this.agenda.definitions[this.attrs.name].filePath || '' + ], + forkHelper.options + ); + + child.on('close', code => { + stillRunning = false; + if (code) { + console.info( + 'fork parameters', + forkHelper, + this.attrs.name, + this.attrs._id, + this.agenda.definitions[this.attrs.name].filePath + ); + const error = new Error(`child process exited with code: ${code}`); + console.warn(error.message); + reject(error); + } else { + resolve(); } - } else { - reject(message); - } - }); - - // check if job is still alive - const checkCancel = () => - setTimeout(() => { - if (this.canceled) { - controller?.abort(); // Stops the child process - } else if (stillRunning) { - setTimeout(checkCancel, 10000); + }); + child.on('message', message => { + // console.log(`Message from child.js: ${message}`, JSON.stringify(message)); + if (typeof message === 'string') { + try { + reject(JSON.parse(message)); + } catch (errJson) { + reject(message); + } + } else { + reject(message); } }); - checkCancel(); - }); + + // check if job is still alive + const checkCancel = () => + setTimeout(() => { + if (this.canceled) { + try { + child.send('cancel'); + console.info('canceled child', this.attrs.name, this.attrs._id); + } catch (err) { + console.log('cannot send cancel to child'); + } + } else if (stillRunning) { + setTimeout(checkCancel, 10000); + } + }); + checkCancel(); + }); + } finally { + stillRunning = false; + } } else { await this.runJob(); } diff --git a/test/helpers/forkHelper.ts b/test/helpers/forkHelper.ts index e720b11..48f0c5f 100644 --- a/test/helpers/forkHelper.ts +++ b/test/helpers/forkHelper.ts @@ -1,5 +1,13 @@ import { Agenda } from '../../src'; +process.on('message', message => { + if (message === 'cancel') { + process.exit(2); + } else { + console.log('got message', message); + } +}); + (async () => { /** do other required initializations */