Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep pool of systhreads for blocking operations (simplified) #681

Merged
merged 2 commits into from
Feb 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib_eio/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
(language c)
(include_dirs include)
(names fork_action stubs))
(libraries eio unix threads mtime.clock.os))
(libraries eio eio.utils unix threads mtime.clock.os))

(rule
(enabled_if %{bin-available:lintcstubs_arity_cmt})
Expand Down
2 changes: 1 addition & 1 deletion lib_eio/unix/eio_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ let () =
let sleep d =
Eio.Time.Mono.sleep (Effect.perform Private.Get_monotonic_clock) d

let run_in_systhread = Private.run_in_systhread
let run_in_systhread = Thread_pool.run_in_systhread

module Ipaddr = Net.Ipaddr

Expand Down
8 changes: 6 additions & 2 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ val sleep : float -> unit
It can also be used in programs that don't care about tracking determinism. *)

val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a
(** [run_in_systhread fn] runs the function [fn] in a newly created system thread (a {! Thread.t}).
This allows blocking calls to be made non-blocking.
(** [run_in_systhread fn] runs the function [fn] using a pool of system threads ({! Thread.t}).

This pool creates a new system thread if all threads are busy, it does not wait.
[run_in_systhread] allows blocking calls to be made non-blocking.

@param label The operation name to use in trace output. *)

Expand Down Expand Up @@ -98,6 +100,8 @@ module Private : sig

module Fork_action = Fork_action

module Thread_pool = Thread_pool

val read_link : Fd.t option -> string -> string
end

Expand Down
2 changes: 1 addition & 1 deletion lib_eio/unix/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) =
| `Udp _ -> [Unix.NI_DGRAM]
in
let sockaddr = sockaddr_to_unix sockaddr in
Private.run_in_systhread ~label:"getnameinfo" (fun () ->
Thread_pool.run_in_systhread ~label:"getnameinfo" (fun () ->
let Unix.{ni_hostname; ni_service} = Unix.getnameinfo sockaddr options in
(ni_hostname, ni_service))

Expand Down
6 changes: 1 addition & 5 deletions lib_eio/unix/private.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ let pipe sw = Effect.perform (Pipe sw)

module Rcfd = Rcfd
module Fork_action = Fork_action

let run_in_systhread ?(label="systhread") fn =
Eio.Private.Suspend.enter label @@ fun _ctx enqueue ->
let _t : Thread.t = Thread.create (fun () -> enqueue (try Ok (fn ()) with exn -> Error exn)) () in
()
module Thread_pool = Thread_pool

external eio_readlinkat : Unix.file_descr -> string -> Cstruct.t -> int = "eio_unix_readlinkat"

Expand Down
129 changes: 129 additions & 0 deletions lib_eio/unix/thread_pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
module Zzz = Eio_utils.Zzz

type job =
| New
| Exit
| Job : {
fn : unit -> 'a;
enqueue : ('a, exn) result -> unit;
} -> job

(* Mailbox with blocking semaphore *)
module Mailbox = struct
type t = {
available : Semaphore.Binary.t;
mutable cell : job;
}

let create () = { available = Semaphore.Binary.make false; cell = New }

let put mbox x =
(* The Semaphore contains an atomic frontier,
therefore [cell] does not need to be an atomic *)
mbox.cell <- x;
Semaphore.Binary.release mbox.available

let take mbox =
Semaphore.Binary.acquire mbox.available;
mbox.cell
end

module Free_pool = struct
type list =
| Empty
| Closed
| Free of Mailbox.t * list

type t = list Atomic.t

let rec close_list = function
| Free (x, xs) -> Mailbox.put x Exit; close_list xs
| Empty | Closed -> ()

let close t =
let items = Atomic.exchange t Closed in
close_list items

let rec drop t =
match Atomic.get t with
| Closed | Empty -> ()
| Free _ as items ->
if Atomic.compare_and_set t items Empty then close_list items
else drop t

let rec put t mbox =
match Atomic.get t with
| Closed -> assert false
| (Empty | Free _) as current ->
let next = Free (mbox, current) in
if not (Atomic.compare_and_set t current next) then
put t mbox (* concurrent update, try again *)

let make_thread t =
let mbox = Mailbox.create () in
let _thread : Thread.t = Thread.create (fun () ->
while true do
match Mailbox.take mbox with
| New -> assert false
| Exit -> raise Thread.Exit
| Job { fn; enqueue } ->
let result = try Ok (fn ()) with exn -> Error exn in
put t mbox; (* Ensure thread is in free-pool before enqueuing. *)
enqueue result
done
) ()
in
mbox

let rec get_thread t =
match Atomic.get t with
| Closed -> invalid_arg "Thread pool closed!"
| Empty -> make_thread t
| Free (mbox, next) as current ->
if Atomic.compare_and_set t current next then mbox
else get_thread t (* concurrent update, try again *)
end

type t = {
free : Free_pool.t;
sleep_q : Zzz.t;
mutable timeout : Zzz.Key.t option;
}

type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, exn) result * t) Effect.t

let terminate t =
Free_pool.close t.free;
Option.iter (fun key -> Zzz.remove t.sleep_q key; t.timeout <- None) t.timeout

let create ~sleep_q =
{ free = Atomic.make Free_pool.Empty; sleep_q; timeout = None }

let run t fn =
match fn () with
| x -> terminate t; x
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
terminate t;
Printexc.raise_with_backtrace ex bt

let submit t ~ctx ~enqueue fn =
match Eio.Private.Fiber_context.get_error ctx with
| Some e -> enqueue (Error e)
| None ->
let mbox = Free_pool.get_thread t.free in
Mailbox.put mbox (Job { fn; enqueue })

let run_in_systhread ?(label="systhread") fn =
Eio.Private.Trace.suspend_fiber label;
let r, t = Effect.perform (Run_in_systhread fn) in
if t.timeout = None then (
let time =
Mtime.add_span (Mtime_clock.now ()) Mtime.Span.(20 * ms)
|> Option.value ~default:Mtime.max_stamp
in
t.timeout <- Some (Zzz.add t.sleep_q time (Fn (fun () -> Free_pool.drop t.free; t.timeout <- None)))
);
match r with
| Ok x -> x
| Error ex -> raise ex
25 changes: 25 additions & 0 deletions lib_eio/unix/thread_pool.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
(** A pool of systhreads, to avoid the overhead of creating a new thread for each operation. *)

type t

val create : sleep_q:Eio_utils.Zzz.t -> t
(** [create ~sleep_q] is a new thread pool.

[sleep_q] is used to register a clean-up task to finish idle threads. *)

val run : t -> (unit -> 'a) -> 'a
(** [run t fn] runs [fn ()] and then marks [t] as closed, releasing all idle threads. *)

val submit :
t ->
ctx:Eio.Private.Fiber_context.t ->
enqueue:(('a, exn) result -> unit) ->
(unit -> 'a) ->
unit
(** [submit t ~ctx ~enqueue fn] starts running [fn] in a sys-thread, which uses [enqueue] to return the result.

If [ctx] is already cancelled then the error is passed to [enqueue] immediately.
Systhreads do not respond to cancellation once running. *)

type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, exn) result * t) Effect.t
val run_in_systhread : ?label:string -> (unit -> 'a) -> 'a
20 changes: 14 additions & 6 deletions lib_eio/utils/zzz.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ module Key = struct
let compare = Optint.Int63.compare
end

type item =
| Fiber of unit Suspended.t
| Fn of (unit -> unit)

module Job = struct
type t = {
time : Mtime.t;
thread : unit Suspended.t;
item : item;
}

let compare a b = Mtime.compare a.time b.time
Expand All @@ -21,10 +25,10 @@ type t = {

let create () = { sleep_queue = Q.empty; next_id = Optint.Int63.zero }

let add t time thread =
let add t time item =
let id = t.next_id in
t.next_id <- Optint.Int63.succ t.next_id;
let sleeper = { Job.time; thread } in
let sleeper = { Job.time; item } in
t.sleep_queue <- Q.add id sleeper t.sleep_queue;
id

Expand All @@ -33,9 +37,13 @@ let remove t id =

let pop t ~now =
match Q.min t.sleep_queue with
| Some (_, { Job.time; thread }) when time <= now ->
Eio.Private.Fiber_context.clear_cancel_fn thread.fiber;
| Some (_, { Job.time; item }) when time <= now ->
begin
match item with
| Fiber k -> Eio.Private.Fiber_context.clear_cancel_fn k.fiber
| Fn _ -> ()
end;
t.sleep_queue <- Option.get (Q.rest t.sleep_queue);
`Due thread
`Due item
| Some (_, { Job.time; _ }) -> `Wait_until time
| None -> `Nothing
23 changes: 14 additions & 9 deletions lib_eio/utils/zzz.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@ end
type t
(** A set of timers (implemented as a priority queue). *)

type item =
| Fiber of unit Suspended.t
| Fn of (unit -> unit)

val create : unit -> t
(** [create ()] is a fresh empty queue. *)

val add : t -> Mtime.t -> unit Suspended.t -> Key.t
(** [add t time thread] adds a new event, due at [time], and returns its ID.
You must use {!Eio.Private.Fiber_context.set_cancel_fn} on [thread] before
calling {!pop}.
Your cancel function should call {!remove} (in addition to resuming [thread]). *)
val add : t -> Mtime.t -> item -> Key.t
(** [add t time item] adds a new event, due at [time], and returns its ID.

If [item] is a {!Fiber},
you must use {!Eio.Private.Fiber_context.set_cancel_fn} on it before calling {!pop}.
Your cancel function should call {!remove} (in addition to resuming it). *)

val remove : t -> Key.t -> unit
(** [remove t key] removes an event previously added with [add]. *)

val pop : t -> now:Mtime.t -> [`Due of unit Suspended.t | `Wait_until of Mtime.t | `Nothing]
(** [pop ~now t] removes and returns the earliest thread due by [now].
It also clears the thread's cancel function.
If no thread is due yet, it returns the time the earliest thread becomes due. *)
val pop : t -> now:Mtime.t -> [`Due of item | `Wait_until of Mtime.t | `Nothing]
(** [pop ~now t] removes and returns the earliest item due by [now].
For fibers, it also clears the thread's cancel function.
If no item is due yet, it returns the time the earliest item becomes due. *)
2 changes: 1 addition & 1 deletion lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ let noop () =

let sleep_until time =
Sched.enter "sleep" @@ fun t k ->
let job = Eio_utils.Zzz.add t.sleep_q time k in
let job = Eio_utils.Zzz.add t.sleep_q time (Fiber k) in
Eio.Private.Fiber_context.set_cancel_fn k.fiber (fun ex ->
Eio_utils.Zzz.remove t.sleep_q job;
Sched.enqueue_failed_thread t k ex
Expand Down
19 changes: 16 additions & 3 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type t = {
need_wakeup : bool Atomic.t;

sleep_q: Zzz.t;

thread_pool : Eio_unix.Private.Thread_pool.t;
}

type _ Effect.t +=
Expand Down Expand Up @@ -213,8 +215,12 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
let now = Mtime_clock.now () in
match Zzz.pop ~now sleep_q with
| `Due k ->
(* A sleeping task is now due *)
Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *)
Suspended.continue k () (* A sleeping task is now due *)
begin match k with
| Fiber k -> Suspended.continue k ()
| Fn fn -> fn (); schedule st
end
| `Wait_until _ | `Nothing as next_due ->
(* Handle any pending events before submitting. This is faster. *)
match Uring.get_cqe_nonblocking uring with
Expand Down Expand Up @@ -447,6 +453,12 @@ let run ~extra_effects st main arg =
);
schedule st
)
| Eio_unix.Private.Thread_pool.Run_in_systhread fn -> Some (fun k ->
let k = { Suspended.k; fiber } in
let enqueue x = enqueue_thread st k (x, st.thread_pool) in
Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn;
schedule st
)
| Alloc -> Some (fun k ->
match st.mem with
| None -> continue k None
Expand Down Expand Up @@ -475,7 +487,7 @@ let run ~extra_effects st main arg =
fork ~new_fiber (fun () ->
Switch.run_protected ~name:"eio_linux" (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
match Eio_unix.Private.Thread_pool.run st.thread_pool (fun () -> main arg) with
| x -> result := Some (Ok x)
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Expand Down Expand Up @@ -548,7 +560,8 @@ let with_sched ?(fallback=no_fallback) config fn =
let io_q = Queue.create () in
let mem_q = Queue.create () in
with_eventfd @@ fun eventfd ->
fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q }
let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in
fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool }
with
| x -> Uring.exit uring; x
| exception ex ->
Expand Down
1 change: 0 additions & 1 deletion lib_eio_posix/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ module Fd = Eio_unix.Fd
module Trace = Eio.Private.Trace
module Fiber_context = Eio.Private.Fiber_context

(* todo: keeping a pool of workers is probably faster *)
let in_worker_thread label = Eio_unix.run_in_systhread ~label

let await_readable op fd =
Expand Down
Loading
Loading