Skip to content

Commit

Permalink
Keep pool of systhreads for blocking operations
Browse files Browse the repository at this point in the history
  • Loading branch information
SGrondin authored and talex5 committed Feb 2, 2024
1 parent 964ed27 commit d9f2f20
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 4 deletions.
3 changes: 1 addition & 2 deletions lib_eio/unix/private.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ module Fork_action = Fork_action

let run_in_systhread ?(label="systhread") fn =
Eio.Private.Suspend.enter label @@ fun _ctx enqueue ->
let _t : Thread.t = Thread.create (fun () -> enqueue (try Ok (fn ()) with exn -> Error exn)) () in
()
Thread_pool.run_on_systhread ~enqueue fn
101 changes: 101 additions & 0 deletions lib_eio/unix/thread_pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
(* This thread pool does not spawn threads in advance,
but up to [max_standby_systhreads_per_domain] threads are
kept alive to wait for more work to arrive.
This number was chosen somewhat arbitrarily but benchmarking
shows it to be a good compromise. *)
let max_standby_systhreads_per_domain = 20

type job =
| New
| Exit
| Job : {
fn: unit -> 'a;
enqueue: ('a, exn) result -> unit;
} -> job

(* Mailbox with blocking semaphore *)
module Mailbox = struct
type t = {
available: Semaphore.Binary.t;
mutable cell: job;
}

let create () = { available = Semaphore.Binary.make false; cell = New }

let put mbox x =
(* The Semaphore contains an atomic frontier,
therefore [cell] does not need to be an atomic *)
mbox.cell <- x;
Semaphore.Binary.release mbox.available

let take mbox =
Semaphore.Binary.acquire mbox.available;
mbox.cell
end

(* A lock-free Treiber stack of systhreads on stand-by.
A fresh thread is created if no thread is immediately available.
When the domain exits all thread on stand-by are shutdown. *)
type t = {
threads: (Mailbox.t * int) list Atomic.t;
terminating: bool Atomic.t;
}

let create () = { threads = Atomic.make []; terminating = Atomic.make false }

let terminate { threads; terminating } =
Atomic.set terminating true;
List.iter (fun (mbox, _) -> Mailbox.put mbox Exit) (Atomic.get threads)

let rec keep_thread_or_exit ({ threads; _ } as pool) mbox =
match Atomic.get threads with
| (_, count) :: _ when count >= max_standby_systhreads_per_domain ->
(* We've got enough threads on stand-by, so discard the current thread *)
raise Thread.Exit
| current ->
let count = match current with
| [] -> 0
| (_, count) :: _ -> count
in
if not (Atomic.compare_and_set threads current ((mbox, count + 1) :: current))
then keep_thread_or_exit pool mbox (* concurrent update, try again *)

let make_thread pool =
let mbox = Mailbox.create () in
let _t : Thread.t = Thread.create (fun () ->
while true do
match Mailbox.take mbox with
| New -> assert false
| Exit -> raise Thread.Exit
| Job { fn; enqueue } ->
enqueue (try Ok (fn ()) with exn -> Error exn);
(* We're not yielding inside of [keep_thread_or_exit] so
no need to check [terminating] multiple times *)
if Atomic.get pool.terminating then raise Thread.Exit;
keep_thread_or_exit pool mbox
done
) ()
in
mbox

let rec get_thread ({ threads; _ } as pool) =
match Atomic.get threads with
| [] -> make_thread pool
| ((mbox, _count) :: rest) as current ->
if not (Atomic.compare_and_set threads current rest)
then get_thread pool (* concurrent update, try again *)
else mbox

(* https://v2.ocaml.org/manual/parallelism.html#s:par_systhread_interaction
"Only one systhread at a time is allowed to run OCaml code on a particular domain."
So we keep a separate threadpool per domain. *)
let key =
Domain.DLS.new_key @@ fun () ->
let pool = create () in
Domain.at_exit (fun () -> terminate pool);
pool

let run_on_systhread ~enqueue fn =
let pool = Domain.DLS.get key in
let mbox = get_thread pool in
Mailbox.put mbox (Job { fn; enqueue })
1 change: 1 addition & 0 deletions lib_eio/unix/thread_pool.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
val run_on_systhread : enqueue:(('a, exn) result -> unit) -> (unit -> 'a) -> unit
1 change: 0 additions & 1 deletion lib_eio_posix/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ module Fd = Eio_unix.Fd
module Trace = Eio.Private.Trace
module Fiber_context = Eio.Private.Fiber_context

(* todo: keeping a pool of workers is probably faster *)
let in_worker_thread label = Eio_unix.run_in_systhread ~label

let await_readable op fd =
Expand Down
1 change: 0 additions & 1 deletion lib_eio_windows/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ open Eio.Std

type ty = Read | Write

(* todo: keeping a pool of workers is probably faster *)
let in_worker_thread = Eio_unix.run_in_systhread

module Fd = Eio_unix.Fd
Expand Down

0 comments on commit d9f2f20

Please sign in to comment.