Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eio.Workpool #584

Closed
wants to merge 1 commit into from
Closed

Eio.Workpool #584

wants to merge 1 commit into from

Conversation

SGrondin
Copy link
Collaborator

@SGrondin SGrondin commented Jul 12, 2023

For your consideration, this PR is an invitation to discuss the user-facing API and the implementation of a Eio.Workpool module.

Let's start with some general "guiding principles" that underpin the current design. I'll gladly make changes to the design if the maintainers invalidate any of these principles.

Principles

P1: A workpool should not pay attention to recommended_domain_count. Users know their own workloads (we don't). The users will tell us how many domains to use.

P2: We need to support m concurrent jobs per domain. An efficient design is one that fully utilizes each core. For CPU-bound workloads, it means 1 job per thread. For IO-bound it's a lot more. There's also hybrid workloads, with CPU-demanding processing interspersed between IO calls. For those workloads, the right number is 1/p where p is the proportion of each job being CPU-bound.

P3: The user knows the right moment to start and shutdown their workpools. They want the threads to be ready to go by the time the jobs come flying in. Having to spawn threads on the fly (lazily) should be avoided.

Tests

We've got them! They test durations using monotonic clocks to validate that certain tasks execute concurrently. Despite it being an obvious race condition, the tests are solid and consistent, not flaky at all. I hope they'll behave that way in CI testing too. The tests are now fully deterministic using mock clocks, mock domains and the mock backend.

Caveats

This PR uses Fiber.n_any which is only added as part of #587 so it obviously has to wait until the other one is merged.


The code is fairly short and I've added plenty of comments to help the reviewers.
I'll trim some comments once it's been reviewed.

Thank you for your time.

@SGrondin
Copy link
Collaborator Author

I just made a few tweaks: fixed a race condition and removed the Core-style ~f named arguments since Eio doesn't normally label lambdas.

@SGrondin SGrondin force-pushed the workpool branch 2 times, most recently from 16d21de to f0d31ae Compare July 20, 2023 16:32
@SGrondin SGrondin marked this pull request as ready for review July 20, 2023 16:39
@SGrondin
Copy link
Collaborator Author

SGrondin commented Jul 20, 2023

I'm marking it as Ready For Review since I've fixed the race condition (using Fiber.n_any which is only available as part of #587)

@SGrondin SGrondin changed the title Eio.Workpool prototype Eio.Workpool Jul 26, 2023
@SGrondin SGrondin force-pushed the workpool branch 2 times, most recently from 9706699 to 4afaf1d Compare August 12, 2023 17:48
@SGrondin
Copy link
Collaborator Author

Is there a way to test this under dscheck? I looked over in tests/dscheck and I didn't see any use of domains or env, making me think it may not be realistic to do so.

Copy link
Collaborator

@talex5 talex5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to test this under dscheck?

This isn't doing any lock-free stuff itself, so dscheck isn't necessary. Instead, you can use the mock domain manager to make the tests deterministic. That currently lives in network.md, but it's generally useful and should be moved to the eio.mock library.

lib_eio/workpool.ml Outdated Show resolved Hide resolved
lib_eio/workpool.ml Outdated Show resolved Hide resolved
lib_eio/workpool.mli Outdated Show resolved Hide resolved
if Atomic.compare_and_set instance.is_terminating false true
then (
(* Instruct workers to shutdown *)
Promise.resolve w1 (Quit { atomic = Atomic.make 1; target = instance.domain_count; all_done = w2 });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit over-complicated. I was expecting it to write instance.domain_count quit messages to the stream. That would avoid the need for a second channel for the quit message and the n_any.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason it's this way is due to a combination of factors:

  • I don't want the Quit messages to have to wait for the other enqueued jobs to run and complete ahead of them
  • As much as possible, calling terminate should "immediately" halt starting new jobs.
    The obvious solution is to make terminate reject all queued jobs before enqueueing Quit messages, but that doesn't work well with the post-termination background rejection loop, because terminate becomes both a producer and a consumer while the workers are still consumers. It can be made to work, but the end result was more complex and less predictable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want the Quit messages to have to wait for the other enqueued jobs to run and complete ahead of them.

The workers can still check is_terminating before running a job and reject the job if it's set. I think that would have the same effect.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After trying it out, I'm remembering why it's not that way.
It's minor but I think it makes a difference.

By using a second channel, we're able to start the background rejection loop immediately. Otherwise, if all workers are fully occupied at the time terminate is called, we have to wait for a worker to be available to start rejecting jobs.

An alternative I've also explored is to immediately (T0) reject all queued jobs before enqueueing n Quit messages (T1), but that leaves jobs enqueued between T0 and T1 to hang until the background rejection job starts (which can only happen after all workers have exited so their Quit messages don't get dropped). This inconsistent behavior can be patched over by checking the is_terminating (bool Atomic.t) when submitting a new job but I'm trying to avoid all unnecessary thread coordination in the hot path...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have to do that anyway, otherwise this could happen:

  1. Client checks is_terminating and see it's still OK.
  2. Pool gets terminated; all workers finish; switch ends.
  3. Client adds job to queue.

It's not clear what behaviour we really want for terminate though. e.g. why do we want to reject jobs that were submitted before terminate was called? Then there's no way for a client to behave "correctly" (so that it's jobs never get rejected) in the graceful shutdown case.

It would probably be helpful to have an example of a program that needs support for this kind of graceful shutdown (rather than just finishing the switch, which I suspect will cover most real uses). Or possibly we should make an initial version without terminate and add it later if/when it's needed.


let is_terminating { terminating = p, _; _ } = Promise.is_resolved p

let is_terminated { terminated = p, _; _ } = Promise.is_resolved p
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? The caller can just wait for their switch to finish.

Copy link
Collaborator Author

@SGrondin SGrondin Sep 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's there to allow for uncommon use cases where terminate is called before the switch release. My original idea for these 2 functions was to allow for rare use cases without forcing those users to keep track of all this through side channels.
But now I'm starting to think maybe having these 2 functions is a mistake because without them the user would be encouraged to create finer lifetimes instead of reusing the same (overly) long-lived Switch.
What do you think?
Edit: I'm using them in tests and would like to continue doing so. Maybe they should go into a Private submodule?

lib_eio/workpool.ml Outdated Show resolved Hide resolved
lib_eio/workpool.mli Outdated Show resolved Hide resolved
@talex5
Copy link
Collaborator

talex5 commented Aug 28, 2023

Instead, you can use the mock domain manager to make the tests deterministic. That currently lives in network.md, but it's generally useful and should be moved to the eio.mock library.

I've moved it out in #610.

@SGrondin
Copy link
Collaborator Author

SGrondin commented Sep 2, 2023

Thanks for the thorough review. I've either made the requested change or left a comment/question above. I'm now working on using a mock clock in tests.

Edit: The tests are now 100% deterministic! Mock clocks, mock domains, mock backend.

Comment on lines +29 to +34
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
Fiber.yield ();
Copy link
Collaborator Author

@SGrondin SGrondin Sep 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a "yield until idle" function? If not, this is the precise number of yields needed...

At the moment if -for example- the implementation of Stream.take were to change it could require an additional yield. That would be quite confusing for the person making the change! Obviously I could just yield 50 times to be safe, but I'm sure there's a better way hidden somewhere deep in the internals of Eio, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't. It needs support in Eio_mock.Backend. I was thinking of having a Backend.run_full that provides an environment with a mock clock that auto-advances whenever the run-queue is empty.
e.g.

Eio_mock.Backend.run @@ fun env ->
let clock = env#clock in
...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be fantastic to have! While reading other tests to see how mock clocks were used, I saw multiple opportunities where it could have been used.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have mixed feelings about those explicit yields in the tests. A purist argument would be that it forces the writer to clearly express the semantics of a function... but in practice it is quite a headache.

@SGrondin
Copy link
Collaborator Author

SGrondin commented Nov 6, 2023

Closing in favor of #639

@SGrondin SGrondin deleted the workpool branch December 3, 2023 15:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants