Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Eio_linux.wakeup signal-safe #381

Merged
merged 1 commit into from
Dec 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,8 @@ type t = {
run_q : runnable Lf_queue.t;

(* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event.
In that case, [need_wakeup = true] and you must signal using [eventfd]. You must hold [eventfd_mutex]
when writing to or closing [eventfd]. *)
In that case, [need_wakeup = true] and you must signal using [eventfd]. *)
eventfd : FD.t;
eventfd_mutex : Mutex.t;

(* If [false], the main thread will check [run_q] before sleeping again
(possibly because an event has been or will be sent to [eventfd]).
Expand All @@ -216,21 +214,22 @@ let wake_buffer =
Bytes.set_int64_ne b 0 1L;
b

(* This can be called from any systhread (including ones not running Eio),
and also from signal handlers or GC finalizers. It must not take any locks. *)
let wakeup t =
Mutex.lock t.eventfd_mutex;
match
Log.debug (fun f -> f "Sending wakeup on eventfd %a" FD.pp t.eventfd);
Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *)
let sent = Unix.single_write (FD.get_exn "wakeup" t.eventfd) wake_buffer 0 8 in
Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *)
match t.eventfd.fd with
| `Closed -> () (* Domain has shut down (presumably after handling the event) *)
| `Open fd ->
let sent = Unix.single_write fd wake_buffer 0 8 in
assert (sent = 8)
with
| () -> Mutex.unlock t.eventfd_mutex
| exception ex -> Mutex.unlock t.eventfd_mutex; raise ex

(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *)
let enqueue_thread st k x =
Lf_queue.push st.run_q (Thread (k, x));
if Atomic.get st.need_wakeup then wakeup st

(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *)
let enqueue_failed_thread st k ex =
Lf_queue.push st.run_q (Failed_thread (k, ex));
if Atomic.get st.need_wakeup then wakeup st
Expand Down Expand Up @@ -964,7 +963,42 @@ module Low_level = struct
|> List.filter_map to_eio_sockaddr_t
end

external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd"
module EventFD_pool : sig
(* We need to write to event FDs from signal handlers and GC finalizers.
This means we can't take a lock, which means we can't easily prevent
the owning domain from closing the FD while we're writing to it
(which could result in us writing to an unreleaded file if the FD
got reused). To avoid that, we never close event FDs but just return them
to a free pool.
The case where this matters is:
1. Some other systhread calls [wakeup].
2 [wakeup] adds an item to the run-queue and sees it needs to send a wake-up event.
3. The domain wakes up for some other reason, handles the event, then shuts down.
4. The original systhread writes to the eventfd.
*)

val get : unit -> Unix.file_descr
(* Take the next free eventfd from the pool, or create a new one if the pool's empty.
You might get a few spurious events from it as other threads are shutting down,
so you must be able to cope with that. *)

val put : Unix.file_descr -> unit
(* [put fd] adds [fd] to the free pool. *)
end = struct
external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd"

let free = Lf_queue.create ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to something else, free for me is the opposite of alloc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What name do you suggest? It's the list of free FDs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

free_list, fd_list, fd_pool, free_pool, but don't let this block you, there's no point in me bikeshedding it over this, if free is of your preference please go ahead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'm going to keep it as it is. free_list can be misread as a command just like free can; fd_list doesn't say why the FDs are there, which seems the most important thing; and the module already has pool in the name. Also, it's only in scope for 6 lines.


let get () =
match Lf_queue.pop free with
| Some fd -> fd
| None -> eio_eventfd 0

let put fd =
Lf_queue.push free fd
end

type has_fd = < fd : FD.t >
type source = < Eio.Flow.source; Eio.Flow.close; has_fd >
Expand Down Expand Up @@ -1405,12 +1439,11 @@ let rec run : type a.
in
let run_q = Lf_queue.create () in
Lf_queue.push run_q IO;
let eventfd_mutex = Mutex.create () in
let sleep_q = Zzz.create () in
let io_q = Queue.create () in
let mem_q = Queue.create () in
let eventfd = FD.placeholder ~seekable:false ~close_unix:false in
let st = { mem; uring; run_q; eventfd_mutex; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } in
let st = { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } in
Log.debug (fun l -> l "starting main thread");
let rec fork ~new_fiber:fiber fn =
let open Effect.Deep in
Expand Down Expand Up @@ -1542,13 +1575,11 @@ let rec run : type a.
let new_fiber = Fiber_context.make_root () in
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
let fd = eio_eventfd 0 in
let fd = EventFD_pool.get () in
st.eventfd.fd <- `Open fd;
Switch.on_release sw (fun () ->
Mutex.lock st.eventfd_mutex;
FD.close st.eventfd;
Mutex.unlock st.eventfd_mutex;
Unix.close fd
let unix = FD.to_unix `Take st.eventfd in
EventFD_pool.put unix
);
Log.debug (fun f -> f "Monitoring eventfd %a" FD.pp st.eventfd);
result := Some (
Expand Down