Skip to content

Commit

Permalink
fix: use message bus instead of signal to cancel child
Browse files Browse the repository at this point in the history
  • Loading branch information
simllll committed May 11, 2022
1 parent 8dc35f3 commit fcec3a9
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 64 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Since there are a few job queue solutions, here a table comparing them to help y
better suits your needs.

| Feature | Bull | Bee | Agenda | AgendaTS |
|:---------------------------| :-------------: | :------: | :----: | :------: |
| :------------------------- | :-------------: | :------: | :----: | :------: |
| Backend | redis | redis | mongo | mongo |
| Priorities || |||
| Concurrency |||||
Expand Down Expand Up @@ -1148,6 +1148,14 @@ childWorker.ts
```ts
import 'reflect-metadata';

process.on('message', message => {
if (message === 'cancel') {
process.exit(2);
} else {
console.log('got message', message);
}
});

(async () => {
const mongooseConnection = /** connect to database */

Expand Down Expand Up @@ -1204,6 +1212,7 @@ import 'reflect-metadata';
process.exit(1);
});


```

Ensure to only define job definitions during this step, otherwise you create some
Expand Down
121 changes: 58 additions & 63 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,73 +360,68 @@ export class Job<DATA = unknown | void> {
}
const { forkHelper } = this.agenda;

// console.log('location', location);
let controller: AbortController | undefined;
let signal: AbortSignal | undefined;
if (typeof AbortController !== 'undefined') {
controller = new AbortController();
({ signal } = controller);
} else {
console.warn('AbortController not supported!');
}

await new Promise<void>((resolve, reject) => {
let stillRunning = true;

const child = fork(
forkHelper.path,
[
this.attrs.name,
this.attrs._id!.toString(),
this.agenda.definitions[this.attrs.name].filePath || ''
],
{
...forkHelper.options,
signal
}
);

child.on('close', code => {
stillRunning = false;
if (code) {
console.info(
'fork parameters',
forkHelper,
let stillRunning = true;
try {
await new Promise<void>((resolve, reject) => {
const child = fork(
forkHelper.path,
[
this.attrs.name,
this.attrs._id,
this.agenda.definitions[this.attrs.name].filePath
);
const error = new Error(`child process exited with code: ${code}`);
console.warn(error.message);
reject(error);
} else {
resolve();
}
});
child.on('message', message => {
// console.log(`Message from child.js: ${message}`, JSON.stringify(message));
if (typeof message === 'string') {
try {
reject(JSON.parse(message));
} catch (errJson) {
reject(message);
this.attrs._id!.toString(),
this.agenda.definitions[this.attrs.name].filePath || ''
],
forkHelper.options
);

child.on('close', code => {
stillRunning = false;
if (code) {
console.info(
'fork parameters',
forkHelper,
this.attrs.name,
this.attrs._id,
this.agenda.definitions[this.attrs.name].filePath
);
const error = new Error(`child process exited with code: ${code}`);
console.warn(error.message);
reject(error);
} else {
resolve();
}
} else {
reject(message);
}
});

// check if job is still alive
const checkCancel = () =>
setTimeout(() => {
if (this.canceled) {
controller?.abort(); // Stops the child process
} else if (stillRunning) {
setTimeout(checkCancel, 10000);
});
child.on('message', message => {
// console.log(`Message from child.js: ${message}`, JSON.stringify(message));
if (typeof message === 'string') {
try {
reject(JSON.parse(message));
} catch (errJson) {
reject(message);
}
} else {
reject(message);
}
});
checkCancel();
});

// check if job is still alive
const checkCancel = () =>
setTimeout(() => {
if (this.canceled) {
try {
child.send('cancel');
console.info('canceled child', this.attrs.name, this.attrs._id);
} catch (err) {
console.log('cannot send cancel to child');
}
} else if (stillRunning) {
setTimeout(checkCancel, 10000);
}
});
checkCancel();
});
} finally {
stillRunning = false;
}
} else {
await this.runJob();
}
Expand Down
8 changes: 8 additions & 0 deletions test/helpers/forkHelper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { Agenda } from '../../src';

process.on('message', message => {
if (message === 'cancel') {
process.exit(2);
} else {
console.log('got message', message);
}
});

(async () => {
/** do other required initializations */

Expand Down

0 comments on commit fcec3a9

Please sign in to comment.