From 38deacfb7d1a5c14f16793d790a367b500bd0a50 Mon Sep 17 00:00:00 2001 From: Simon Grondin Date: Wed, 12 Jul 2023 12:07:04 -0500 Subject: [PATCH] Eio.Workpool --- lib_eio/eio.ml | 1 + lib_eio/eio.mli | 3 + lib_eio/workpool.ml | 139 +++++++++++++++++ lib_eio/workpool.mli | 37 +++++ tests/workpool.md | 344 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 524 insertions(+) create mode 100644 lib_eio/workpool.ml create mode 100644 lib_eio/workpool.mli create mode 100644 tests/workpool.md diff --git a/lib_eio/eio.ml b/lib_eio/eio.ml index 424662b0a..5f05745bd 100644 --- a/lib_eio/eio.ml +++ b/lib_eio/eio.ml @@ -10,6 +10,7 @@ module Condition = Condition module Stream = Stream module Lazy = Lazy module Pool = Pool +module Workpool = Workpool module Exn = Exn module Resource = Resource module Buf_read = Buf_read diff --git a/lib_eio/eio.mli b/lib_eio/eio.mli index 86f0075fd..d8bc13216 100644 --- a/lib_eio/eio.mli +++ b/lib_eio/eio.mli @@ -45,6 +45,9 @@ module Pool = Pool (** Cancelling fibers. *) module Cancel = Eio__core.Cancel +(** A high-level domain workpool *) +module Workpool = Workpool + (** Commonly used standard features. This module is intended to be [open]ed. *) module Std = Std diff --git a/lib_eio/workpool.ml b/lib_eio/workpool.ml new file mode 100644 index 000000000..278e91339 --- /dev/null +++ b/lib_eio/workpool.ml @@ -0,0 +1,139 @@ +type job = Pack : (unit -> 'a) * ('a, exn) Result.t Promise.u -> job + +type action = + | Process of job + | Quit of { + atomic: int Atomic.t; + target: int; + all_done: unit Promise.u; + } + +(* Worker: 1 domain/thread + m jobs per worker, n domains per workpool *) + +type t = { + sw: Switch.t; + (* The work queue *) + stream: action Stream.t; + (* Number of domains. Depending on settings, domains may run more than 1 job at a time. *) + domain_count: int; + (* True when [Workpool.terminate] has been called. *) + is_terminating: bool Atomic.t; + (* Resolved when the workpool begins terminating. *) + terminating: action Promise.t * action Promise.u; + (* Resolved when the workpool has terminated. *) + terminated: unit Promise.t * unit Promise.u; +} + +let reject (Pack (_, w)) = Promise.resolve_error w (Failure "Workpool.terminate called") + +(* This function is the core of workpool.ml. + Each worker recursively calls [loop ()] until the [terminating] + promise is resolved. Workers pull one job at a time from the Stream. *) +let start_worker ~limit ~terminating stream = + Switch.run @@ fun sw -> + let capacity = Semaphore.make limit in + let run_job job w = + Fiber.fork ~sw (fun () -> + Promise.resolve w + (try Ok (job ()) with + | exn -> Error exn); + Semaphore.release capacity ) + in + (* The main worker loop. *) + let rec loop () = + let actions = Fiber.n_any [ (fun () -> Promise.await terminating); (fun () -> Semaphore.acquire capacity; Stream.take stream) ] in + match actions with + | [ Process (Pack (job, w)) ] -> + (* We start the job right away. This also gives a chance to other domains + to start waiting on the Stream before the current thread blocks on [Stream.take] again. *) + run_job job w; + (loop [@tailcall]) () + | Quit { atomic; target; all_done } :: maybe_job -> + List.iter + (function + | Process job -> reject job + | _ -> assert false) + maybe_job; + (* If we're the last worker terminating, resolve the promise + after waiting until the completion of all of this worker's jobs. *) + if Atomic.fetch_and_add atomic 1 = target + then Switch.on_release sw (Promise.resolve all_done) + | _ -> assert false + in + loop () + +(* Start a new domain. The worker will need a switch, then we start the worker. *) +let start_domain ~sw ~domain_mgr ~limit ~terminating ~transient stream = + let go () = + Domain_manager.run domain_mgr (fun () -> start_worker ~limit ~terminating stream ) + in + (* [transient] workpools run as daemons to not hold the user's switch from completing. + It's up to the user to hold the switch open (and thus, the workpool) + by blocking on the jobs issued to the workpool. + [Workpool.submit] and [Workpool.submit_exn] will block so this shouldn't be a problem. + Still, the user can call [Workpool.create] with [~transient:false] to + disable this behavior, in which case the user must call [Workpool.terminate] + to release the switch. *) + match transient with + | false -> Fiber.fork ~sw go + | true -> + Fiber.fork_daemon ~sw (fun () -> + go (); + `Stop_daemon ) + +let create ~sw ~domain_count ~domain_concurrency ?(transient = true) domain_mgr = + let stream = Stream.create 0 in + let instance = + { + sw; + stream; + domain_count; + is_terminating = Atomic.make false; + terminating = Promise.create (); + terminated = Promise.create (); + } + in + let terminating = fst instance.terminating in + for _ = 1 to domain_count do + start_domain ~sw ~domain_mgr ~limit:domain_concurrency ~terminating ~transient stream + done; + instance + +let submit_fork ~sw { stream; _ } f = + let p, w = Promise.create () in + Fiber.fork_promise ~sw (fun () -> + Stream.add stream (Process (Pack (f, w))); + Promise.await_exn p ) + +let submit { stream; _ } f = + let p, w = Promise.create () in + Stream.add stream (Process (Pack (f, w))); + Promise.await p + +let submit_exn instance f = + match submit instance f with + | Ok x -> x + | Error exn -> raise exn + +let terminate ({ terminating = _, w1; terminated = p2, w2; _ } as instance) = + if Atomic.compare_and_set instance.is_terminating false true + then ( + (* Instruct workers to shutdown *) + Promise.resolve w1 (Quit { atomic = Atomic.make 1; target = instance.domain_count; all_done = w2 }); + (* Reject all present and future queued jobs *) + Fiber.fork_daemon ~sw:instance.sw (fun () -> + while true do + match Stream.take instance.stream with + | Process job -> reject job + | _ -> assert false + done; + `Stop_daemon ); + (* Wait for all workers to have shutdown *) + Promise.await p2 ) + else (* [Workpool.terminate] was called more than once. *) + Promise.await p2 + +let is_terminating { is_terminating; _ } = Atomic.get is_terminating + +let is_terminated { terminated = p, _; _ } = Promise.is_resolved p diff --git a/lib_eio/workpool.mli b/lib_eio/workpool.mli new file mode 100644 index 000000000..351cb6407 --- /dev/null +++ b/lib_eio/workpool.mli @@ -0,0 +1,37 @@ +type t + +(** Creates a new workpool with [domain_count]. + + [domain_concurrency] is the maximum number of jobs that each domain can run at a time. + + [transient] (default: true). When true, the workpool will not block the [~sw] Switch from completing. + When false, you must call [terminate] to release the [~sw] Switch. *) +val create : + sw:Switch.t -> + domain_count:int -> + domain_concurrency:int -> + ?transient:bool -> + _ Domain_manager.t -> + t + +(** Run a job on this workpool. It is placed at the end of the queue. *) +val submit : t -> (unit -> 'a) -> ('a, exn) result + +(** Same as [submit] but raises if the job failed. *) +val submit_exn : t -> (unit -> 'a) -> 'a + +(** Same as [submit] but returns immediately, without blocking. *) +val submit_fork : sw:Switch.t -> t -> (unit -> 'a) -> ('a, exn) result Promise.t + +(** Waits for all running jobs to complete, then returns. + No new jobs are started, even if they were already enqueued. + To abort all running jobs instead of waiting for them, call [Switch.fail] on the Switch used to create this workpool *) +val terminate : t -> unit + +(** Returns true if the [terminate] function has been called on this workpool. + Also returns true if the workpool has fully terminated. *) +val is_terminating : t -> bool + +(** Returns true if the [terminate] function has been called on this workpool AND + the workpool has fully terminated (all running jobs have completed). *) +val is_terminated : t -> bool diff --git a/tests/workpool.md b/tests/workpool.md new file mode 100644 index 000000000..f49680556 --- /dev/null +++ b/tests/workpool.md @@ -0,0 +1,344 @@ +# Setting up the environment + +```ocaml +# #require "eio_main";; +# #require "eio.mock";; +``` + +Creating some useful helper functions + +```ocaml +open Eio.Std + +module Workpool = Eio.Workpool + +let () = Eio.Exn.Backend.show := false + +let run fn = + Eio_mock.Backend.run @@ fun () -> + Eio_mock.Domain_manager.run @@ fun mgr -> + let clock = Eio_mock.Clock.make () in + let sleep ms = + let t0 = Eio.Time.now clock in + let t1 = t0 +. ms in + traceln "Sleeping %.0f: %.0f -> %.0f" ms t0 t1; + Fiber.both + (fun () -> Eio.Time.sleep_until clock t1) + (fun () -> + Fiber.yield (); + Fiber.yield (); + Fiber.yield (); + Fiber.yield (); + Fiber.yield (); + Fiber.yield (); + Fiber.yield (); + if Float.(Eio.Time.now clock <> t1) then + Eio_mock.Clock.advance clock) + in + let duration expected f = + let t0 = Eio.Time.now clock in + let res = f () in + let t1 = Eio.Time.now clock in + let actual = t1 -. t0 in + if Float.(actual = expected) + then (traceln "Duration (valid): %.0f" expected; res) + else failwith (Format.sprintf "Duration was not %.0f: %.0f" expected actual) + in + fn mgr sleep duration +``` + +# Workpool.create + +Workpool is created, transient by default: + +```ocaml +# run @@ fun mgr sleep duration -> + Switch.run @@ fun sw -> + ignore @@ Workpool.create + ~sw ~domain_count:2 ~domain_concurrency:1 mgr + ;; +- : unit = () +``` + +Workpool holds up the switch when non-transient: + +```ocaml +# run @@ fun mgr sleep duration -> + let terminated = ref false in + Switch.run (fun sw -> + let wp = + Workpool.create + ~sw ~domain_count:2 ~domain_concurrency:1 mgr ~transient:false + in + Fiber.fork_daemon ~sw (fun () -> + Fiber.yield (); + terminated := true; + Workpool.terminate wp; + `Stop_daemon + ) + ); + !terminated + ;; +- : bool = true +``` + +# Concurrency + +Runs jobs in parallel as much as possible (domains): + +```ocaml +# run @@ fun mgr sleep duration -> + Switch.run @@ fun sw -> + let total = ref 0 in + let wp = Workpool.create ~sw ~domain_count:2 ~domain_concurrency:1 mgr in + duration 150. (fun () -> + List.init 5 (fun i -> i + 1) + |> Fiber.List.iter (fun i -> Workpool.submit_exn wp (fun () -> + sleep 50.; + total := !total + i + )); + !total + );; ++[1] Sleeping 50: 0 -> 50 ++[2] Sleeping 50: 0 -> 50 ++[1] mock time is now 50 ++[1] Sleeping 50: 50 -> 100 ++[2] Sleeping 50: 50 -> 100 ++[1] mock time is now 100 ++[1] Sleeping 50: 100 -> 150 ++[1] mock time is now 150 ++[0] Duration (valid): 150 +- : int = 15 +``` + +Runs jobs in parallel as much as possible (workers): + +```ocaml +# run @@ fun mgr sleep duration -> + Switch.run @@ fun sw -> + let total = ref 0 in + let wp = Workpool.create ~sw ~domain_count:1 ~domain_concurrency:2 mgr in + duration 150. (fun () -> + List.init 5 (fun i -> i + 1) + |> Fiber.List.iter (fun i -> Workpool.submit_exn wp (fun () -> + sleep 50.; + total := !total + i + )); + !total + );; ++[1] Sleeping 50: 0 -> 50 ++[1] Sleeping 50: 0 -> 50 ++[1] mock time is now 50 ++[1] Sleeping 50: 50 -> 100 ++[1] Sleeping 50: 50 -> 100 ++[1] mock time is now 100 ++[1] Sleeping 50: 100 -> 150 ++[1] mock time is now 150 ++[0] Duration (valid): 150 +- : int = 15 +``` + +Runs jobs in parallel as much as possible (both): + +```ocaml +# run @@ fun mgr sleep duration -> + Switch.run @@ fun sw -> + let total = ref 0 in + let wp = Workpool.create ~sw ~domain_count:2 ~domain_concurrency:2 mgr in + duration 100. (fun () -> + List.init 5 (fun i -> i + 1) + |> Fiber.List.iter (fun i -> Workpool.submit_exn wp (fun () -> + sleep 50.; + total := !total + i + )); + !total + );; ++[1] Sleeping 50: 0 -> 50 ++[2] Sleeping 50: 0 -> 50 ++[1] Sleeping 50: 0 -> 50 ++[2] Sleeping 50: 0 -> 50 ++[1] mock time is now 50 ++[1] Sleeping 50: 50 -> 100 ++[1] mock time is now 100 ++[0] Duration (valid): 100 +- : int = 15 +``` + +# Job error handling + +`Workpool.submit` returns a Result: + +```ocaml +# run @@ fun mgr sleep duration -> + Switch.run @@ fun sw -> + let total = ref 0 in + let wp = Workpool.create ~sw ~domain_count:1 ~domain_concurrency:4 mgr in + duration 100. (fun () -> + let results = + List.init 5 (fun i -> i + 1) + |> Fiber.List.map (fun i -> Workpool.submit wp (fun () -> + sleep 50.; + if i mod 2 = 0 + then failwith (Int.to_string i) + else (let x = !total in total := !total + i; x) + )) + in + results, !total + );; ++[1] Sleeping 50: 0 -> 50 ++[1] Sleeping 50: 0 -> 50 ++[1] Sleeping 50: 0 -> 50 ++[1] Sleeping 50: 0 -> 50 ++[1] mock time is now 50 ++[1] Sleeping 50: 50 -> 100 ++[1] mock time is now 100 ++[0] Duration (valid): 100 +- : (int, exn) result list * int = +([Ok 0; Error (Failure "2"); Ok 1; Error (Failure "4"); Ok 4], 9) +``` + +`Workpool.submit_exn` raises: + +```ocaml +# run @@ fun mgr sleep duration -> + Switch.run @@ fun sw -> + let total = ref 0 in + let wp = Workpool.create ~sw ~domain_count:1 ~domain_concurrency:2 mgr in + List.init 5 (fun i -> i + 1) + |> Fiber.List.map (fun i -> Workpool.submit_exn wp (fun () -> + traceln "Started %d" i; + let x = !total in + total := !total + i; + if x = 3 + then failwith (Int.to_string i) + else x + ));; ++[1] Started 1 ++[1] Started 2 ++[1] Started 3 ++[1] Started 4 +Exception: Failure "3". +``` + +# Blocking for capacity + +`Workpool.submit` will block waiting for room in the queue: + +```ocaml +# run @@ fun mgr sleep duration -> + Switch.run @@ fun sw -> + let wp = Workpool.create ~sw ~domain_count:1 ~domain_concurrency:1 mgr in + + let p1 = Fiber.fork_promise ~sw (fun () -> Workpool.submit_exn wp (fun () -> sleep 50.)) in + + duration 50. (fun () -> Workpool.submit_exn wp @@ fun () -> ()); + + duration 0. (fun () -> Promise.await_exn p1) + ;; ++[1] Sleeping 50: 0 -> 50 ++[1] mock time is now 50 ++[0] Duration (valid): 50 ++[0] Duration (valid): 0 +- : unit = () +``` + +`Workpool.submit_fork` will not block if there's not enough room in the queue: + +```ocaml +# run @@ fun mgr sleep duration -> + Switch.run @@ fun sw -> + let wp = Workpool.create ~sw ~domain_count:1 ~domain_concurrency:1 mgr in + + let p1 = duration 0. (fun () -> + Fiber.fork_promise ~sw (fun () -> Workpool.submit_exn wp (fun () -> sleep 50.)) + ) + in + let p2 = duration 0. (fun () -> + Fiber.fork_promise ~sw (fun () -> Workpool.submit_exn wp (fun () -> sleep 50.)) + ) + in + let p3 = duration 0. (fun () -> + Workpool.submit_fork ~sw wp (fun () -> ()) + ) + in + + duration 100. (fun () -> + Promise.await_exn p1; + Promise.await_exn p2; + Promise.await_exn p3; + (* Value restriction :( *) + Promise.create_resolved (Ok ()) + ) + |> Promise.await_exn + ;; ++[0] Duration (valid): 0 ++[0] Duration (valid): 0 ++[0] Duration (valid): 0 ++[1] Sleeping 50: 0 -> 50 ++[1] mock time is now 50 ++[1] Sleeping 50: 50 -> 100 ++[1] mock time is now 100 ++[0] Duration (valid): 100 +- : unit = () +``` + +# Termination + +`Workpool.terminate` waits for jobs currently running to finish and rejects queued jobs: + +```ocaml +# run @@ fun mgr sleep duration -> + let print_status wp = + traceln "Terminating: %b (terminated: %b)" + (Workpool.is_terminating wp) (Workpool.is_terminated wp) + in + Switch.run @@ fun sw -> + let total = ref 0 in + let wp = Workpool.create ~sw ~domain_count:2 ~domain_concurrency:2 mgr in + let results = Fiber.fork_promise ~sw (fun () -> + duration 300. (fun () -> + List.init 5 (fun i -> i + 1) + |> Fiber.List.iter (fun i -> Workpool.submit_exn wp (fun () -> + sleep 150.; + total := !total + i + )); + !total + ) + ) + in + sleep 75.; + (* Exactly one job should be left in the queue + for Workpool.terminate to reject *) + let x = duration 75. (fun () -> + print_status wp; + let p = Fiber.fork_promise ~sw (fun () -> Workpool.terminate wp) in + Fiber.fork_daemon ~sw (fun () -> + Workpool.submit wp (fun () -> ()) + |> Result.is_error + |> traceln "after_terminate is_error: %b"; + `Stop_daemon + ); + print_status wp; + Promise.await_exn p; + print_status wp; + !total + ) + in + traceln "Total: %d (terminated: %b)" x (Workpool.is_terminated wp); + Promise.await_exn results + ;; ++[0] Sleeping 75: 0 -> 75 ++[1] Sleeping 150: 0 -> 150 ++[2] Sleeping 150: 0 -> 150 ++[1] Sleeping 150: 0 -> 150 ++[2] Sleeping 150: 0 -> 150 ++[0] mock time is now 75 ++[0] Terminating: false (terminated: false) ++[0] Terminating: true (terminated: false) ++[1] mock time is now 150 ++[0] after_terminate is_error: true ++[0] Terminating: true (terminated: true) ++[0] Duration (valid): 75 ++[0] Total: 10 (terminated: true) +Exception: Failure "Workpool.terminate called". +```