Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/abort listener #448

Merged
merged 47 commits into from
Oct 11, 2024
Merged

Conversation

joshLong145
Copy link
Contributor

Adds an abortListener to workers which allow for cleanup of async tasks which can be run as a cleanup operation to allow workers to be reused if a task timeout or cancellation occurs.

connects PR #441

Copy link
Owner

@josdejong josdejong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, this looks good!

I made some inline comments, can you have a look at those?

test/workers/cleanup-abort.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
test/Pool.test.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
src/index.js Outdated Show resolved Hide resolved
test/Pool.test.js Outdated Show resolved Hide resolved
@joshLong145
Copy link
Contributor Author

joshLong145 commented Jun 17, 2024

@josdejong
After some testing of offloaded functions I think there are issues with referencing variables which are defined outside of the offloaded function scope

var workerpool = require('../');
var pool = workerpool.pool();

function asycTimeout() {
  return new Promise(function (resolve) {
    let timeout = setTimeout(function () {
        resolve();
    }, 5000);
    
    workerpool.addAbortListener(async function () {
        await new Promise((res, rej) => {
          setTimeout(res, 1000);
        });
        clearTimeout(timeout);
        resolve();
    });
  });
};

pool.exec(asycTimeout, [])

The above will error with workerpool is not defined. It does not seem like we can define an instance of workerpool within the global if using offloaded functions. I think the only way to support it would be to pass the registration handler to the function.

UPDATE:

After playing around with scopes on the function wrapper from in worker.methods.run I was able to bind addAbortListener to the function itself so it may be accessed with this.addAbortListener

worker.methods.run = function run(fn, args) {
  var f = new Function('return (' + fn + ').apply(this, arguments);');
  f.addAbortListener = function(listener) {
    worker.abortListeners.push(listener); 
  }

  return f.apply(f, args);
};

If we modify the global value to this instead of null we can then modify the f object to provide the addEventListener context. this is hacky but does now allow for a unique this context from within an offloaded function
we still have access to the global through globalThis below is an example of an offloaded function which uses the above modified run implementation

var workerpool = require('../');
var pool = workerpool.pool();

function asycTimeout() {
  var me = this;
  return new Promise(function (resolve) {
    let timeout = setTimeout(function () {
        resolve();
    }, 5000);
    console.log(me.addAbortListener, globalThis);
    me.addAbortListener(async function () {
        console.log("adasd", clearTimeout);
        clearTimeout(timeout);
        resolve();
    });
  });
};

pool.exec(asycTimeout, [])

@josdejong
Copy link
Owner

josdejong commented Jun 19, 2024

It sounds like a good idea to attach this.addAbortListener to the function itself rather than a "magic" global variable workerpool.addAbortListener 👍 . Can you make sure that it is possible to use it as follows?

const addAbortListener = this.addAbortListener
// ...
addAbortListener(...)

EDIT: and then it makes sense to me to offer this as the only way to add an abort listener, for both offloaded and dedicated workers, right?

@joshLong145
Copy link
Contributor Author

EDIT: and then it makes sense to me to offer this as the only way to add an abort listener, for both offloaded and dedicated workers, right?

Yes this makes sense to me. can make the updates and implement the tests/examples now that we have this worked out.

Since we now can extend the function created for the worker task we can create a worker api with

  • addEventListener
  • emit

So for example

this.worker.addEventListener

@joshLong145
Copy link
Contributor Author

@josdejong

Question on how errors are processed. Since cancel and timeout produce unique error types. and upon an error received over the worker rpc bridge will cause terminate to be invoked on the WorkerHandler instance. see exec on WorkerHandler for where terminateAndNotify is called. I do not think the current implementation is sufficient for properly keeping WorkerHandler instances preserved when attempting to re use workers. since the handler will ve deleted from the pool and a new worker will be created regardless on if the worker is actually cleaned up. I have added some unit tests which show that the onCreateWorker handler gets invoked multiple times when it should only have created a single worker for the lifetime of the pool.

I think for this new feature to work as intended the message protocol does need extending to account for cleanup such that handlers can be notified that the worker can survive the exception and not need cleanup.

Copy link
Owner

@josdejong josdejong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates! I added a few inline comments.

About the WorkerHandler terminating the worker when receiving a cancel or timeout error: that is a very good point. Do you have ideas on how to solve this? Indeed both the worker side and the WorkerHandler side both need to know whether there is an abort listener in place or not, and adjust their behavior accordingly.

src/worker.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
@joshLong145
Copy link
Contributor Author

joshLong145 commented Jul 12, 2024

Thanks for the updates! I added a few inline comments.

About the WorkerHandler terminating the worker when receiving a cancel or timeout error: that is a very good point. Do you have ideas on how to solve this? Indeed both the worker side and the WorkerHandler side both need to know whether there is an abort listener in place or not, and adjust their behavior accordingly.

@josdejong Sorry for the delayed response.

After giving it some thought I see a possible flow for communication which will allow for proper tracking of handlers based on if a worker can be reused after the OnAbort handlers are triggered

sequenceDiagram
    participant Pool
    participant WorkerHandler
    participant Worker
    Pool->>WorkerHandler: TimeoutError/CamcelationError occures, move task with rosolver to `tracking` queue. Send a message to the worker to run cleanup with the task id
    WorkerHandler ->> Worker: Worker recieves message, execute abort handlers.
    Worker ->> WorkerHandler: Send the result of abort handler execution to the worker handler with the task id sent
    WorkerHandler ->> Pool: Check the task id for a tracking and if present either resolve or reject the resolver promise based on the data sent in the message from the worker. Cleanup the task context
    
Loading

With the above model, the resolver Promise created when exec is first called on the WorkerHandler will either resolve or reject based on the result of the message sent back from the onAbort listener execution.
Which will be looked up from a tracking queue. The pool can now have a concept of tasks which need to be tracked for potential future cleanup. Since Cleanup operations are a parallel operation which requires resource tracking a second queue seems like the most obvious way of managing the resource.

The other idea, although much more involved is to rewrite how items are processed on the producer. Instead of items only being processed in a single promise chain with a recursive call. We could use something like p-queue to handle assigning tasks to workers and managing WorkerHandlers if tasks cause a worker to be terminated.

@josdejong
Copy link
Owner

Ow nice I didn't know that you can draw a sequenceDiagram straight in GitHub issues 😎.

I'm not sure whether a queue is really needed since a WorkerHandler and a Worker only process a single task at a time, but at least both need to get the information on whether to abort or not.

I think we could do something like this (it's close to your diagram I think):

  1. In case of a timeout error or termination request, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:
    1. The Worker responds with "abort succesful". After that the WorkerHandler can terminate the Worker
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.
  2. In case of a cancel error, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:
    1. The Worker responds with "abort succesful". Then we're done.
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.

What do you think?

@joshLong145
Copy link
Contributor Author

Ow nice I didn't know that you can draw a sequenceDiagram straight in GitHub issues 😎.

I'm not sure whether a queue is really needed since a WorkerHandler and a Worker only process a single task at a time, but at least both need to get the information on whether to abort or not.

I think we could do something like this (it's close to your diagram I think):

  1. In case of a timeout error or termination request, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:

    1. The Worker responds with "abort succesful". After that the WorkerHandler can terminate the Worker
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.
  2. In case of a cancel error, the WorkerHandler sends a message to the Worker to abort. Then we have three possible situations:

    1. The Worker responds with "abort succesful". Then we're done.
    2. The Worker responds with "no abort callback". After that the WorkerHandler can terminate the Worker
    3. The Worker does not respond (is blocked by heavy CPU for example). After a limited timeout, the WorkerHandler will terminate the Worker anyway.

What do you think?

Your outline makes sense and aligns with what the diagram outlines but with better definitions of possible execution results. I think you have mapped out most of the remaining details. However, I think we might want an explicit case for when there are abort callbacks but they have thrown an error. While this will result in the same outcome as if the worker was stuck in some CPU bound operation we should handle it explicitly.

@josdejong
Copy link
Owner

Thanks. That makes sense indeed, the abort handler can throw an error too.

@joshLong145
Copy link
Contributor Author

joshLong145 commented Aug 5, 2024

Hey @josdejong

I think I have made good progress and was able to implement the feature set we have discussed above where

  1. We are able to prevent the termination of a worker in the pool if the abortListeners resolve and no errors occur within scope of a listener
  2. We can timeout the execution of abortListeners such that if they are too long running / never leave scope we can short circuit and terminate the worker.
  3. If there is an error within the listener we can handle the rejection and terminate the worker.

I have added tests for both timeout and cancel behaviors to confirm both have the desired behaviors.

I still have to update the docs / examples but I think it is ready for review.

One thing I am noticing is that when I run tests the after all hook in Pool.test.js is failing to fulfill in some case and preventing the test run to exit. When running the tests individually I am not able to get the hook to hang. Was wondering if you had any idea what could be the culprit? Below is the final test output

  1) "after all" hook in "{root}"

  130 passing (17s)
  2 pending
  1 failing

  1) "after all" hook in "{root}":
     Error: Timeout of 10000ms exceeded. For async tests and hooks, ensure "done()" is called; if returning a Promise, ensure it resolves.
      at listOnTimeout (node:internal/timers:569:17)
      at process.processTimers (node:internal/timers:512:7)

@josdejong
Copy link
Owner

Thanks! If I have some time I'll also do some debugging to see if I can find anything.

@joshLong145
Copy link
Contributor Author

@josdejong

After tweaking the finally implementation to not return itself on either resolve or reject of the internal promise wrapper we now seem to have a passing test per #448 (comment). With the change I just pushed the isolated test case now passes for me.

@josdejong
Copy link
Owner

Wow, good find!!! 👏

I've done some more digging, and the issue originates in the method Promise.finally not handling promises returned by the callback correctly, like in our failing unit test .finally(() => pool.terminate());. So it looks like this issue is not introduced in this PR but we discovered an existing bug 😅 .

Just removing .then(res) from .finally is not a good solution though: it is there for a reason, namely propagating the original promise chain (and value or errror). But there where no unit tests guarding this. I've worked out a PR to fix the issue with Promise.finally and add more unit tests: #470. Can you revert your change in Promise.finally? I'll merge this PR soon, and if you update your branch with the fix from #470 this issue will be solved.

Copy link
Owner

@josdejong josdejong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few small remarks, otherwise I think we're good to go.

We should first merge #470 though.

src/Promise.js Outdated Show resolved Hide resolved
src/WorkerHandler.js Outdated Show resolved Hide resolved
src/worker.js Outdated Show resolved Hide resolved
test/Pool.test.js Outdated Show resolved Hide resolved
@josdejong
Copy link
Owner

Ok I've now merge #470. Can you update the PR to the latest version of the master branch with updated Promise.finally?

@joshLong145
Copy link
Contributor Author

Ok I've now merge #470. Can you update the PR to the latest version of the master branch with updated Promise.finally?

Merged in,I was thinking the return in the initially finally would suffice for returning the promise, but you are correct in that preserving the chain is still important. Good catch!

@josdejong
Copy link
Owner

josdejong commented Oct 3, 2024

When changing after into afterEach right now, we get a timeout error again. Somehow we still get some race condition when creating multiple pools. I've done some debugging and found the following minimal code demonstrating the issue:

// file: terminatest.js
const { Promise } = require('./src/Promise');
const Pool = require('./src/Pool');

run()

async function run() {
  console.log('start')

  const pool1 = new Pool();
  await pool1.exec(sleep, [100])
    .timeout(50)
    .catch(err => { console.error(String(err)) })

  const pool2 = new Pool();
  const result = await pool2.exec(add, [3, 4])
  console.log({ result })

  await sleep(1000)
  
  console.log('pool1 workers:', pool1.workers.length) // <-- is 1 but should be 0

  await pool1.terminate() // <-- never finishes
  await pool2.terminate()

  console.log('done')
}

function sleep(delay) {
  return new Promise((resolve) => setTimeout(resolve, delay))
}

function add(a, b) {
  return a + b
}

I did some debugging but didn't yet found the cause of this. Some observations:

  1. This only happens when creating 2 pools. This suggests that somehow these pools are not fully isolated from each other. I did look if we mess up the context somewhere (this and me) but didn't see anything.

  2. The issue is gone when reverting the following piece of code to what it is in the master branch:
    // on cancellation, force the worker to terminate
    https://github.com/josdejong/workerpool/pull/448/files#diff-7e092d00b94d598e9df27965dd8a8157654881fcd56f5735e873d45c32c47146R418-R470
    (though this effectively disables the abort listener and all these tests fail).

  3. By adding console.log statements, I noticed that the .timeout(50) causes the worker of pool1 to be terminated twice, I think that should be one. And the callback of WorkerHandler.prototype.terminate is not invoked at all. When adding the following lines to WorkerHandler.prototype.terminate, the issue disappears:

    WorkerHandler.prototype.terminate = function (force, callback) {
      if (this.terminated && typeof callback === 'function') {
        return callback()
      }
    
      // ...

But (3) is not the right solution, we should probably look into why the terminate callback is not invoked in terminateAndNotify and/or why the workerhandler of pool1 is not fully terminated when the .timeout(50) is hit. Maybe we miss calling cleanup() somewhere in WorkerHandler.prototype.terminate? And I still don't see why this issue only triggers when having 2 workerpools.

src/worker.js Outdated Show resolved Hide resolved
@josdejong
Copy link
Owner

I tested the issue I mentioned here again and and all works smooth now 🎉 . I guess the underlying bug is fixed via 35a08d2?

@joshLong145
Copy link
Contributor Author

I tested the issue I mentioned here again and and all works smooth now 🎉 . I guess the underlying bug is fixed via 35a08d2?

Yes 😄
Explained in this comment

@josdejong
Copy link
Owner

Awesome! I'm really happy that this last vague bug is resolved now 😅 !

Let's try to improve tryCleanup a bit, see my latest comment: #448 (comment), and after that we can merge the PR I think.

@josdejong
Copy link
Owner

I think we're there, clicking the Merge button now 🎉 . Thanks a lot for your patience and perseverance Josh, this was a long journey with quite some bumps along the road!

@josdejong josdejong merged commit 20d20db into josdejong:master Oct 11, 2024
5 checks passed
@josdejong
Copy link
Owner

Published now in v9.2.0, thanks again!

@joshLong145
Copy link
Contributor Author

Published now in v9.2.0, thanks again!

No problem, was great working on this :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants