From 2add717d36a21a6ff7e59a6437ad7777aec2413a Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Mon, 5 Dec 2022 16:57:06 +0000 Subject: [PATCH] Make Eio_linux.wakeup signal-safe This will eventually allow Waiters to be used from signal handlers and GC finalizers (once that's lock-free too). Previously, `wakeup` couldn't be used from a signal handler because it took a lock (to ensure that the eventfd wasn't closed while it wrote to it). Now, we avoid ever closing eventfds and simply keep free ones in a pool. --- lib_eio_linux/eio_linux.ml | 69 +++++++++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index df639be5c..684bbecff 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -196,10 +196,8 @@ type t = { run_q : runnable Lf_queue.t; (* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event. - In that case, [need_wakeup = true] and you must signal using [eventfd]. You must hold [eventfd_mutex] - when writing to or closing [eventfd]. *) + In that case, [need_wakeup = true] and you must signal using [eventfd]. *) eventfd : FD.t; - eventfd_mutex : Mutex.t; (* If [false], the main thread will check [run_q] before sleeping again (possibly because an event has been or will be sent to [eventfd]). @@ -216,21 +214,22 @@ let wake_buffer = Bytes.set_int64_ne b 0 1L; b +(* This can be called from any systhread (including ones not running Eio), + and also from signal handlers or GC finalizers. It must not take any locks. *) let wakeup t = - Mutex.lock t.eventfd_mutex; - match - Log.debug (fun f -> f "Sending wakeup on eventfd %a" FD.pp t.eventfd); - Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *) - let sent = Unix.single_write (FD.get_exn "wakeup" t.eventfd) wake_buffer 0 8 in + Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *) + match t.eventfd.fd with + | `Closed -> () (* Domain has shut down (presumably after handling the event) *) + | `Open fd -> + let sent = Unix.single_write fd wake_buffer 0 8 in assert (sent = 8) - with - | () -> Mutex.unlock t.eventfd_mutex - | exception ex -> Mutex.unlock t.eventfd_mutex; raise ex +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) let enqueue_thread st k x = Lf_queue.push st.run_q (Thread (k, x)); if Atomic.get st.need_wakeup then wakeup st +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) let enqueue_failed_thread st k ex = Lf_queue.push st.run_q (Failed_thread (k, ex)); if Atomic.get st.need_wakeup then wakeup st @@ -964,7 +963,42 @@ module Low_level = struct |> List.filter_map to_eio_sockaddr_t end -external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd" +module EventFD_pool : sig + (* We need to write to event FDs from signal handlers and GC finalizers. + This means we can't take a lock, which means we can't easily prevent + the owning domain from closing the FD while we're writing to it + (which could result in us writing to an unreleaded file if the FD + got reused). To avoid that, we never close event FDs but just return them + to a free pool. + + The case where this matters is: + + 1. Some other systhread calls [wakeup]. + 2 [wakeup] adds an item to the run-queue and sees it needs to send a wake-up event. + 3. The domain wakes up for some other reason, handles the event, then shuts down. + 4. The original systhread writes to the eventfd. + *) + + val get : unit -> Unix.file_descr + (* Take the next free eventfd from the pool, or create a new one if the pool's empty. + You might get a few spurious events from it as other threads are shutting down, + so you must be able to cope with that. *) + + val put : Unix.file_descr -> unit + (* [put fd] adds [fd] to the free pool. *) +end = struct + external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd" + + let free = Lf_queue.create () + + let get () = + match Lf_queue.pop free with + | Some fd -> fd + | None -> eio_eventfd 0 + + let put fd = + Lf_queue.push free fd +end type has_fd = < fd : FD.t > type source = < Eio.Flow.source; Eio.Flow.close; has_fd > @@ -1405,12 +1439,11 @@ let rec run : type a. in let run_q = Lf_queue.create () in Lf_queue.push run_q IO; - let eventfd_mutex = Mutex.create () in let sleep_q = Zzz.create () in let io_q = Queue.create () in let mem_q = Queue.create () in let eventfd = FD.placeholder ~seekable:false ~close_unix:false in - let st = { mem; uring; run_q; eventfd_mutex; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } in + let st = { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } in Log.debug (fun l -> l "starting main thread"); let rec fork ~new_fiber:fiber fn = let open Effect.Deep in @@ -1542,13 +1575,11 @@ let rec run : type a. let new_fiber = Fiber_context.make_root () in fork ~new_fiber (fun () -> Switch.run_protected (fun sw -> - let fd = eio_eventfd 0 in + let fd = EventFD_pool.get () in st.eventfd.fd <- `Open fd; Switch.on_release sw (fun () -> - Mutex.lock st.eventfd_mutex; - FD.close st.eventfd; - Mutex.unlock st.eventfd_mutex; - Unix.close fd + let unix = FD.to_unix `Take st.eventfd in + EventFD_pool.put unix ); Log.debug (fun f -> f "Monitoring eventfd %a" FD.pp st.eventfd); result := Some (