-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix/abort listener execution #484
base: master
Are you sure you want to change the base?
Changes from all commits
b274b8e
ba38228
f9b4919
1f235f3
9ac04b4
650128d
85de985
18e4853
0685ae3
4a892d2
e824a79
efb12c6
8987fa3
f5c38fd
622ed7d
9d85663
7c1e099
29c2a51
ad9534d
7d24650
0e0448a
e183786
7e9fa7e
90c86ac
518e7ad
700be64
d331a58
a1c7ccf
e44d73a
0b1b9f9
9e9f7a9
1023cc8
1a50ba3
76cd6c6
2a122e1
361bccd
590de65
67f60dc
985f5db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,46 +1,72 @@ | ||
const workerpool = require(".."); | ||
|
||
var workerCount = 0; | ||
|
||
// create a worker pool | ||
const pool = workerpool.pool(__dirname + "/workers/cleanupAbort.js", { | ||
// maximum time to wait for worker to cleanup it's resources | ||
// on termination before forcefully stopping the worker | ||
workerTerminateTimeout: 1000, | ||
onCreateWorker: (args) => { | ||
onCreateWorker: function (args) { | ||
console.log("New worker created"); | ||
workerCount += 1; | ||
} | ||
}, | ||
onTerminateWorker: function () { | ||
console.log("worker terminated"); | ||
}, | ||
maxWorkers: 1, | ||
}); | ||
|
||
function add (a, b) { | ||
return a + b; | ||
} | ||
|
||
const main = async () => { | ||
const cleanedUpTask = pool.exec('asyncTimeout', []).timeout(1_000).catch((err) => { | ||
console.log("task timeout"); | ||
console.log("timeout occured: ", err.message); | ||
console.log("worker count ", workerCount); | ||
return pool.exec(add, [1, 2]).then((sum) => { | ||
console.log('add result', sum); | ||
console.log("worker count: ", workerCount); | ||
}); | ||
let abortResolverSuccess; | ||
await pool | ||
.exec("asyncTimeout", [], { | ||
onAbortResolution: function (args) { | ||
console.log("abort operation concluded for task:", args.id); | ||
console.log("is worker terminating", args.isTerminating); | ||
}, | ||
onAbortStart: async function (args) { | ||
console.log( | ||
"abort operation started from task timeout, in onAbortStart", | ||
); | ||
abortResolverSuccess = args.abortPromise; | ||
}, | ||
}) | ||
.timeout(100) | ||
.catch((err) => { | ||
console.log("timeout handled: ", err.message); | ||
}); | ||
await cleanedUpTask; | ||
|
||
const canceledTask = pool.exec('asyncAbortHandlerNeverResolves').cancel().catch((err) => { | ||
console.log("task canceled"); | ||
console.log("cancel occured: ", err.message); | ||
console.log("worker count ", workerCount); | ||
return pool.exec(add, [1, 2]).then((sum) => { | ||
console.log('add result', sum); | ||
console.log("worker count: ", workerCount); | ||
}); | ||
|
||
await abortResolverSuccess.catch((err) => { | ||
console.log("abort operation concluded ", err); | ||
}); | ||
|
||
console.log("pool status after abort operation:", pool.stats()); | ||
|
||
let abortResolverFailure; | ||
await pool | ||
.exec("asyncAbortHandlerNeverResolves", [], { | ||
onAbortStart: function (args) { | ||
console.log( | ||
"abort operation started from task cancel, in onAbortStart", | ||
); | ||
abortResolverFailure = args.abortPromise; | ||
}, | ||
onAbortResolution: function (args) { | ||
console.log("abort operation concluded for task:", args.id); | ||
console.log("is worker terminating", args.isTerminating); | ||
console.log("no min workers are set, no new worker should be created"); | ||
} | ||
}) | ||
.cancel() | ||
.catch((err) => { | ||
console.log("task canceled"); | ||
console.log("cancel occured: ", err.message); | ||
}); | ||
|
||
await canceledTask; | ||
} | ||
await abortResolverFailure.catch((e) => { | ||
console.log("cancelation handled: ", e.message); | ||
}); | ||
|
||
console.log("final pool stats", pool.stats()); | ||
// we dont need to terminate the pool, since all workers should be terminated by this point even though there is a handler. | ||
}; | ||
|
||
main(); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,10 +27,28 @@ | |
* @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. | ||
*/ | ||
|
||
|
||
/** | ||
* @typedef {Object} AbortStartArgs | ||
* @property {number} [id] identifier of the task which is starting its abort operation. | ||
* @property {PromiseLike<void>} [abortPromise] PromiseLike Object which resolves or rejects when the abort operation concludes. | ||
* | ||
*/ | ||
|
||
/** | ||
* @typedef {Object} AbortResolutionArgs | ||
* @property {Error | undefined} [error] An error which occured during the abort operation. If an error did not occure the value will be `undefined`. | ||
* @property {number} [id] identifier of the task. | ||
* @property {boolean} [isTerminating] A flag which indicates the termination status of the worker which ececuted the task. | ||
*/ | ||
|
||
/** | ||
* @typedef {Object} ExecOptions | ||
* @property {(payload: any) => unknown} [on] An event listener, to handle events sent by the worker for this execution. | ||
* @property {Object[]} [transfer] A list of transferable objects to send to the worker. Not supported by `process` worker type. See ./examples/transferableObjects.js for usage. | ||
* @property {(payload: any) => AbortResolutionArgs} [onAbortResolution] An event listener triggered when whenever an abort operation concludes. | ||
* @property {(payload: AbortStartArgs) => void} [onAbortStart] An event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. | ||
* @property {{ promise: import('./Promise.js').Promise<any, Error>; resolve: Function; reject: Function; }} [abortResolver] Defered Promise which resolves or rejects when the abort operation for the task concludes. | ||
Comment on lines
+34
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am torn on if this is a good mechanism for allowing callers access to the promise which controls the abort operation. In We also allow the This means the promise for the abort operation is exposed as two different types. Which is not ideal, but I am unsure if there is a way to unify them without breaking backwards compatibility. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. onAbortResolution is called after the abort handlers have executed and passes the taskId and error if the abort handlers threw an error, and a flag to determine if the worker is shutting down or not.
The idea is to give control of the promise to the caller, so they do not need to get it from event args. but it can be misused easily. |
||
*/ | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -197,7 +197,7 @@ worker.cleanup = function(requestId) { | |
error: convertError(new Error('Worker terminating')), | ||
}); | ||
|
||
// If there are no handlers registered, reject the promise with an error as we want the handler to be notified | ||
// If there are no handlers registered, as we want the handler to be notified | ||
// that cleanup should begin and the handler should be GCed. | ||
return new Promise(function(resolve) { resolve(); }); | ||
} | ||
|
@@ -236,10 +236,10 @@ worker.cleanup = function(requestId) { | |
// - Reject if one or more handlers reject | ||
// Upon one of the above cases a message will be sent to the handler with the result of the handler execution | ||
// which will either kill the worker if the result contains an error, or | ||
return Promise.all([ | ||
settlePromise, | ||
timeoutPromise | ||
]).then(function() { | ||
return new Promise(function (resolve, reject) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure, but can we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can, but I was trying to not use any implementation of |
||
settlePromise.then(resolve, reject); | ||
timeoutPromise.then(resolve, reject); | ||
}).then(function() { | ||
worker.send({ | ||
id: requestId, | ||
method: CLEANUP_METHOD_ID, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an actual use case for the
onAbortResolution
andonAbortStart
callbacks? If there is no clear use case I would prefer to not implement it right now, to keep things simple.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main usage is to give more context to the user on life cycle of the task. but
onAbortStart
does provide access the to promise.It is also useful in tests to confirm that after the resolution of the abort operation the
busyWorker
count is correct as this cannot be asserted on from within the resolution ofcancel
ortimeout
promises.If we are to remove these callbacks then we should keep
abortResolver
so the user knows when the abort operation concludes.