Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/abort listener #448

Merged
merged 47 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
eef94c6
feat: add abort listener
joshLong145 Jun 11, 2024
a00079e
test: add abort listener test
joshLong145 Jun 11, 2024
0085ded
doc: add comments
joshLong145 Jun 11, 2024
5207481
test: fix abort test case and add comment
joshLong145 Jun 12, 2024
138dcb6
ref: remove global handler
joshLong145 Jun 21, 2024
36e5256
ref: add abort handler to worker api binded to method
joshLong145 Jun 21, 2024
a6bba86
fix: abort timeout option
joshLong145 Jun 25, 2024
313f8d3
test: repro tests
joshLong145 Jun 25, 2024
284e03d
chore: update embedded
joshLong145 Jun 25, 2024
d9f023b
dev(worker): add message passing for communicating clean up operations
joshLong145 Aug 3, 2024
1afd385
test: add tests
joshLong145 Aug 3, 2024
7982f99
chore: update embedded worker
joshLong145 Aug 3, 2024
70cd321
dev: Implemnet message passing for timeout and cancellation cleanup o…
joshLong145 Aug 5, 2024
3fbb219
test: update test cases
joshLong145 Aug 5, 2024
cf4a15e
chore update embedded worker
joshLong145 Aug 5, 2024
507a9d6
Merge branch 'master' of github.com:josdejong/workerpool into feat/ab…
joshLong145 Aug 5, 2024
39bd5bb
ref: move worker api to static obect definition
joshLong145 Aug 5, 2024
26aba2a
chore: update codegen
joshLong145 Aug 5, 2024
7a23cf2
ref: remove delay in TimeoutError
joshLong145 Aug 6, 2024
6e8e525
chore: update embedded worker
joshLong145 Aug 6, 2024
ac89dfc
ref: cleanup per review comments
joshLong145 Aug 28, 2024
fdbf443
test: updates per PR comments
joshLong145 Aug 30, 2024
dc22cdd
test: change max worker count for testing worker reuse
joshLong145 Aug 30, 2024
6711fdf
docs: add example and README info for new abort handler feature
joshLong145 Sep 1, 2024
2c38089
ref: comments and logging fixes
joshLong145 Sep 2, 2024
357af05
ref: fix terminationHandler override
joshLong145 Sep 4, 2024
d290b1e
docs: fix typo in comment
joshLong145 Sep 4, 2024
64e3b9b
test: update test cases
joshLong145 Sep 4, 2024
c68cc9b
ref: migrate timeout set to event handler to prevent infinite recursi…
joshLong145 Sep 10, 2024
eb40937
test: add only to timeout test for reproduction test
joshLong145 Sep 14, 2024
003adf6
test: remove forbid-only for test
joshLong145 Sep 14, 2024
bf5676e
ref: revert changes for isolation test
joshLong145 Sep 14, 2024
52c66b6
ref: add termination case for tracked tasks on pool termination and f…
joshLong145 Sep 14, 2024
9f2e281
chore: update code gen
joshLong145 Sep 14, 2024
abddfe3
Merge branch 'master' of github.com:josdejong/workerpool into feat/ab…
joshLong145 Sep 18, 2024
bcb62ce
ref: remove returning itself on promise settling causing recursive calls
joshLong145 Sep 29, 2024
0453c88
Merge branch 'master' of github.com:josdejong/workerpool into feat/ab…
joshLong145 Oct 2, 2024
877d88a
ref: revert timeout change
joshLong145 Oct 2, 2024
1ffe80a
dev: review comments
joshLong145 Oct 2, 2024
9fa09af
ref: revert `after` hook change to `afterEach`
joshLong145 Oct 3, 2024
35a08d2
ref: add error throw to promise reject and remove call to `worker.exit`
joshLong145 Oct 3, 2024
0e84084
chore: update embeddedWorker
joshLong145 Oct 3, 2024
8692d53
docs: fix comment on timeout in worker handler
joshLong145 Oct 3, 2024
a286a89
chore: update embeddedWorker
joshLong145 Oct 3, 2024
6b829b6
dev: reimplement tryCleanup as cleanup
joshLong145 Oct 9, 2024
b18249a
dev: worker.terminateAndExit now returns Promise
joshLong145 Oct 10, 2024
45e842b
ref: resolve promise over rejection with error
joshLong145 Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,38 @@ workerpool.worker({
});
```

Tasks may configure an `abort handler` to perform cleanup operations when `timeout` or `cancel` is called on a `task`. the `abortListenerTimeout` option can be configured to control when cleanup should be aborted in the case an `abortHandler` never resolves. This timeout trigger will cause the given worker to be cleaned up. Allowing a new worker to be created if need be.

```js
function asyncTimeout() {
var me = this;
return new Promise(function (resolve) {
let timeout = setTimeout(() => {
resolve();
}, 5000);

// An abort listener allows for cleanup for a given worker
// such that it may be resused for future tasks
// if an execption is thrown within scope of the handler
// the worker instance will be destroyed.
me.worker.addAbortListener(async function () {
clearTimeout(timeout);
resolve();
});
});
}

// create a worker and register public functions
workerpool.worker(
{
asyncTimeout: asyncTimeout,
},
{
abortListenerTimeout: 1000
}
);
```

### Events

You can send data back from workers to the pool while the task is being executed using the `workerEmit` function:
Expand Down
46 changes: 46 additions & 0 deletions examples/abort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const workerpool = require("..");

var workerCount = 0;

// create a worker pool
const pool = workerpool.pool(__dirname + "/workers/cleanupAbort.js", {
// maximum time to wait for worker to cleanup it's resources
// on termination before forcefully stopping the worker
workerTerminateTimeout: 1000,
onCreateWorker: (args) => {
console.log("New worker created");
workerCount += 1;
}
});

function add (a, b) {
return a + b;
}

const main = async () => {
const cleanedUpTask = pool.exec('asyncTimeout', []).timeout(1_000).catch((err) => {
console.log("task timeout");
console.log("timeout occured: ", err.message);
console.log("worker count ", workerCount);
return pool.exec(add, [1, 2]).then((sum) => {
console.log('add result', sum);
console.log("worker count: ", workerCount);
});
});
await cleanedUpTask;

const canceledTask = pool.exec('asyncAbortHandlerNeverResolves').cancel().catch((err) => {
console.log("task canceled");
console.log("cancel occured: ", err.message);
console.log("worker count ", workerCount);
return pool.exec(add, [1, 2]).then((sum) => {
console.log('add result', sum);
console.log("worker count: ", workerCount);
});
});

await canceledTask;
}


main();
55 changes: 55 additions & 0 deletions examples/workers/cleanupAbort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
var workerpool = require("../..");

function asyncTimeout() {
var me = this;
return new Promise(function (resolve) {
let timeout = setTimeout(() => {
resolve();
}, 5000);

// An abort listener allows for cleanup for a given worker
// such that it may be resused for future tasks
// if an execption is thrown within scope of the handler
// the worker instance will be destroyed.
me.worker.addAbortListener(async function () {
clearTimeout(timeout);
resolve();
});
});
}

function asyncAbortHandlerNeverResolves() {
var me = this;
return new Promise((resolve) => {
let timeout = setTimeout(() => {
resolve();
}, 5000);

// An abort listener allows for cleanup for a given worker
// such that it may be resused for future tasks
// if an execption is thrown within scope of the handler
// the worker instance will be destroyed.
me.worker.addAbortListener(function () {
clearTimeout(timeout);
return new Promise((res) => {
setTimeout(() => {
res();
resolve();
// set the timeout high so it will not resolve before the external
// timeout triggers and exits the worker
}, 1_000_000_000);
});
});
});
}

// create a worker and register public functions
workerpool.worker(
{
asyncTimeout: asyncTimeout,
asyncAbortHandlerNeverResolves: asyncAbortHandlerNeverResolves,
},
{
abortListenerTimeout: 1000
}
);
80 changes: 72 additions & 8 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ const {validateOptions, forkOptsNames, workerThreadOptsNames, workerOptsNames} =
*/
var TERMINATE_METHOD_ID = '__workerpool-terminate__';

/**
* Special message by parent which causes a child process worker to perform cleaup
* steps before determining if the child process worker should be terminated.
*/
var CLEANUP_METHOD_ID = '__workerpool-cleanup__';

function ensureWorkerThreads() {
var WorkerThreads = tryRequireWorkerThreads()
if (!WorkerThreads) {
Expand Down Expand Up @@ -294,6 +300,20 @@ function WorkerHandler(script, _options) {
}
}
}

if (response.method === CLEANUP_METHOD_ID) {
var trackedTask = me.tracking[response.id];
if (trackedTask !== undefined) {
if (response.error) {
clearTimeout(trackedTask.timeoutId);
trackedTask.resolver.reject(objectToError(response.error))
} else {
me.tracking && clearTimeout(trackedTask.timeoutId);
trackedTask.resolver.resolve(trackedTask.result);
}
}
delete me.tracking[id];
}
}
});

Expand All @@ -306,6 +326,7 @@ function WorkerHandler(script, _options) {
me.processing[id].resolver.reject(error);
}
}

me.processing = Object.create(null);
}

Expand Down Expand Up @@ -337,7 +358,7 @@ function WorkerHandler(script, _options) {
});

this.processing = Object.create(null); // queue with tasks currently in progress

this.tracking = Object.create(null); // queue with tasks being monitored for cleanup status
this.terminating = false;
this.terminated = false;
this.cleaning = false;
Expand Down Expand Up @@ -399,17 +420,50 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
var me = this;
return resolver.promise.catch(function (error) {
if (error instanceof Promise.CancellationError || error instanceof Promise.TimeoutError) {
me.tracking[id] = {
id,
resolver: Promise.defer()
};

// remove this task from the queue. It is already rejected (hence this
// catch event), and else it will be rejected again when terminating
delete me.processing[id];

// terminate worker
return me.terminateAndNotify(true)
.then(function() {
throw error;
}, function(err) {
throw err;
});
me.tracking[id].resolver.promise = me.tracking[id].resolver.promise.catch(function(err) {
delete me.tracking[id];

var promise = me.terminateAndNotify(true)
.then(function() {
throw err;
}, function(err) {
throw err;
});

return promise;
});

me.worker.send({
id,
method: CLEANUP_METHOD_ID
});


/**
* Sets a timeout to reject the cleanup operation if the message sent to the worker
* does not receive a response. see worker.tryCleanup for worker cleanup operations.
* Here we use the workerTerminateTimeout as the worker will be terminated if the timeout does invoke.
*
* We need this timeout in either case of a Timeout or Cancellation Error as if
* the worker does not send a message we still need to give a window of time for a response.
*
* The workerTermniateTimeout is used here if this promise is rejected the worker cleanup
* operations will occure.
*/
me.tracking[id].timeoutId = setTimeout(function() {
me.tracking[id].resolver.reject(error);
}, me.workerTerminateTimeout);

return me.tracking[id].resolver.promise;
} else {
throw error;
}
Expand Down Expand Up @@ -441,9 +495,18 @@ WorkerHandler.prototype.terminate = function (force, callback) {
this.processing[id].resolver.reject(new Error('Worker terminated'));
}
}

this.processing = Object.create(null);
}

// If we are terminating, cancel all tracked task for cleanup
for (var task of Object.values(me.tracking)) {
clearTimeout(task.timeoutId);
task.resolver.reject(new Error('Worker Terminating'));
}

me.tracking = Object.create(null);

if (typeof callback === 'function') {
this.terminationHandler = callback;
}
Expand All @@ -452,6 +515,7 @@ WorkerHandler.prototype.terminate = function (force, callback) {
var cleanup = function(err) {
me.terminated = true;
me.cleaning = false;

if (me.worker != null && me.worker.removeAllListeners) {
// removeAllListeners is only available for child_process
me.worker.removeAllListeners('message');
Expand Down
2 changes: 1 addition & 1 deletion src/generated/embeddedWorker.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
/**
* @typedef {Object} WorkerRegisterOptions
* @property {(code: number | undefined) => PromiseLike<void> | void} [onTerminate] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The difference with pool's `onTerminateWorker` is that this callback runs in the worker context, while onTerminateWorker is executed on the main thread.
* @property {number} [abortListenerTimeout] The timeout in milliseconds to wait for a worker to clean up it's resources if an abort listener does not resolve, before stopping the worker forcefully. Default value is `1000`.
*/

/**
Expand Down
Loading