diff --git a/lib_eio/workpool.ml b/lib_eio/workpool.ml new file mode 100644 index 000000000..5102e0d3f --- /dev/null +++ b/lib_eio/workpool.ml @@ -0,0 +1,144 @@ +type job = Pack : (unit -> 'a) * ('a, exn) Result.t Promise.u -> job + +type action = + | Process of job + | Quit of { + atomic: int Atomic.t; + target: int; + all_done: unit Promise.u; + } + +(* Worker: 1 domain/thread + m jobs per worker, n domains per workpool *) + +type t = { + (* The work queue *) + stream: action Stream.t; + (* Number of domains. Depending on settings, domains may run more than 1 job at a time. *) + domain_count: int; + (* True when [Workpool.terminate] has been called. *) + is_terminating: bool Atomic.t; + (* Resolved when the workpool begins terminating. *) + terminating: action Promise.t * action Promise.u; + (* Resolved when the workpool has terminated. *) + terminated: unit Promise.t * unit Promise.u; +} + +(* This function is the core of workpool.ml. + Each worker recursively calls [loop ()] until the [terminating] + promise is resolved. Workers pull one job at a time from the Stream. *) +let start_worker ~sw ~limit ~terminating stream = + let capacity = ref limit in + let condition = Condition.create () in + let run_job job w = + Fiber.fork ~sw (fun () -> + decr capacity; + Promise.resolve w + (try Ok (job ()) with + | exn -> Error exn); + incr capacity; + Condition.broadcast condition ) + in + (* The main worker loop. *) + let rec loop () = + let action = + (* Peeking to minimize the chance of dropping jobs with [Fiber.first]. See Issue #558. + Issue #558 must be (will be) fixed before this module can be considered for merging. *) + match Promise.peek terminating with + | Some x -> x + | None -> Fiber.first (fun () -> Stream.take stream) (fun () -> Promise.await terminating) + in + match action with + | Process (Pack (job, w)) -> + (* We start the job right away. This also gives a chance to other domains + to start waiting on the Stream before the current thread blocks on [Stream.take] again. *) + run_job job w; + while !capacity = 0 do + Condition.await_no_mutex condition + done; + (loop [@tailcall]) () + | Quit { atomic; target; all_done } -> + (* Wait until the completion of all of this worker's jobs. *) + while !capacity < limit do + Condition.await_no_mutex condition + done; + (* If we're the last worker terminating, resolve the promise. *) + if Atomic.fetch_and_add atomic 1 = target then Promise.resolve all_done () + in + loop () + +(* Start a new domain. The worker will need a switch, then we start the worker. *) +let start_domain ~sw ~domain_mgr ~limit ~terminating ~transient stream = + let go () = + Domain_manager.run domain_mgr (fun () -> + Switch.run @@ fun sw -> start_worker ~sw ~limit ~terminating stream ) + in + (* [transient] workpools run as daemons to not hold the user's switch from completing. + It's up to the user to hold the switch open (and thus, the workpool) + by blocking on the jobs issued to the workpool. + [Workpool.run] and [Workpool.run_exn] will block so this shouldn't be a problem. + Still, the user can call [Workpool.create] with [~transient:false] to + disable this behavior, in which case the user must call [Workpool.terminate] + to release the switch. *) + match transient with + | false -> Fiber.fork ~sw go + | true -> + Fiber.fork_daemon ~sw (fun () -> + go (); + `Stop_daemon ) + +let create ~sw ~domain_count ~domain_concurrency ?(capacity = 0) ?(transient = true) domain_mgr = + if capacity < 0 then raise (Invalid_argument "Workpool capacity < 0"); + let stream = Stream.create capacity in + let instance = + { + stream; + domain_count; + is_terminating = Atomic.make false; + terminating = Promise.create (); + terminated = Promise.create (); + } + in + let terminating = fst instance.terminating in + for _ = 1 to domain_count do + start_domain ~sw ~domain_mgr ~limit:domain_concurrency ~terminating ~transient stream + done; + instance + +let run_promise ~sw { stream; _ } f = + let p, w = Promise.create () in + Fiber.fork_promise ~sw (fun () -> + Stream.add stream (Process (Pack (f, w))); + Promise.await_exn p ) + +let run { stream; _ } f = + let p, w = Promise.create () in + Stream.add stream (Process (Pack (f, w))); + Promise.await p + +let run_exn instance f = + match run instance f with + | Ok x -> x + | Error exn -> raise exn + +let terminate ~sw ({ terminating = _, w1; terminated = p2, w2; _ } as instance) = + if Atomic.compare_and_set instance.is_terminating false true + then ( + (* Instruct workers to shutdown *) + Promise.resolve w1 (Quit { atomic = Atomic.make 1; target = instance.domain_count; all_done = w2 }); + (* Reject all present and future queued jobs *) + Fiber.fork_daemon ~sw (fun () -> + while true do + match Stream.take instance.stream with + | Process (Pack (_, w)) -> Promise.resolve_error w (Failure "Workpool.terminate called") + | Quit _ -> () + done; + `Stop_daemon ); + (* Wait for all workers to have shutdown *) + Promise.await p2 ) + else (* [Workpool.terminate] was called more than once. *) + Promise.await p2 + +let is_terminating { terminating = p, _; _ } = Promise.is_resolved p + +let is_terminated { terminated = p, _; _ } = Promise.is_resolved p diff --git a/lib_eio/workpool.mli b/lib_eio/workpool.mli new file mode 100644 index 000000000..382e36f4d --- /dev/null +++ b/lib_eio/workpool.mli @@ -0,0 +1,40 @@ +type t + +(** Creates a new workpool with [domain_count]. + + [domain_concurrency] is the maximum number of jobs that each domain can run at a time. + + [capacity] (default: 0) is identical to the [Eio.Stream.create] capacity parameter. + + [transient] (default: true). When true, the workpool will not block the [~sw] Switch from completing. + When false, you must call [terminate] to release the [~sw] Switch. *) +val create : + sw:Switch.t -> + domain_count:int -> + domain_concurrency:int -> + ?capacity:int -> + ?transient:bool -> + #Domain_manager.t -> + t + +(** Run a job on this workpool. It is placed at the end of the queue. *) +val run : t -> (unit -> 'a) -> ('a, exn) result + +(** Same as [run] but raises if the job failed. *) +val run_exn : t -> (unit -> 'a) -> 'a + +(** Same as [run] but returns immediately, without blocking. *) +val run_promise : sw:Switch.t -> t -> (unit -> 'a) -> ('a, exn) result Promise.t + +(** Waits for all running jobs to complete, then returns. + No new jobs are started, even if they were already enqueued. + To abort all running jobs instead of waiting for them, call [Switch.fail] on the Switch used to create this workpool *) +val terminate : sw:Switch.t -> t -> unit + +(** Returns true if the [terminate] function has been called on this workpool. + Also returns true if the workpool has fully terminated. *) +val is_terminating : t -> bool + +(** Returns true if the [terminate] function has been called on this workpool AND + the workpool has fully terminated (all running jobs have completed). *) +val is_terminated : t -> bool