diff --git a/Makefile b/Makefile index 1bfe96dfc..b159191e5 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ dscheck: stress: dune exec -- ./stress/stress_proc.exe dune exec -- ./stress/stress_semaphore.exe + dune exec -- ./stress/stress_release.exe docker: docker build -t eio . diff --git a/lib_eio/core/eio__core.mli b/lib_eio/core/eio__core.mli index 08fe261c9..2671a559f 100644 --- a/lib_eio/core/eio__core.mli +++ b/lib_eio/core/eio__core.mli @@ -87,24 +87,41 @@ module Switch : sig Release handlers are run in LIFO order, in series. Note that [fn] is called within a {!Cancel.protect}, since aborting clean-up actions is usually a bad idea - and the switch may have been cancelled by the time it runs. *) + and the switch may have been cancelled by the time it runs. + You cannot attach new resources to a switch once the cancel hooks start to run. + + This function is thread-safe (but not signal-safe). + If the switch finishes before [fn] can be registered, + it raises [Invalid_argument] and runs [fn] immediately instead. *) type hook (** A handle for removing a clean-up callback. *) val null_hook : hook - (** A dummy hook. Removing it does nothing. *) + (** A dummy hook. [try_remove_hook null_hook = false]. *) val on_release_cancellable : t -> (unit -> unit) -> hook (** Like [on_release], but the handler can be removed later. For example, opening a file will call [on_release_cancellable] to ensure the file is closed later. However, if the file is manually closed before that, it will use {!remove_hook} to remove the hook, - which is no longer needed. *) + which is no longer needed. + + This function is thread-safe (but not signal-safe). *) + + val try_remove_hook : hook -> bool + (** [try_remove_hook h] removes a previously-added hook. + Returns [true] if the hook was successfully removed, or [false] if another + domain ran it or removed it first. + + This function is thread-safe (but not signal-safe). *) val remove_hook : hook -> unit - (** [remove_hook h] removes a previously-added hook. - If the hook has already been removed, this does nothing. *) + (** [remove_hook h] is [ignore (try_remove_hook h)]. + + For multi-domain code, consider using {!try_remove_hook} instead + so that you can handle the case of trying to close a resource + just as another domain is closing it or finishing the switch. *) (** {2 Debugging} *) diff --git a/lib_eio/core/switch.ml b/lib_eio/core/switch.ml index 4fd617340..f2854452c 100644 --- a/lib_eio/core/switch.ml +++ b/lib_eio/core/switch.ml @@ -2,24 +2,31 @@ type t = { mutable fibers : int; (* Total, including daemon_fibers and the main function *) mutable daemon_fibers : int; mutable exs : (exn * Printexc.raw_backtrace) option; - on_release : (unit -> unit) Lwt_dllist.t; + on_release_lock : Mutex.t; + mutable on_release : (unit -> unit) Lwt_dllist.t option; (* [None] when closed. *) waiter : unit Single_waiter.t; (* The main [top]/[sub] function may wait here for fibers to finish. *) cancel : Cancel.t; } type hook = | Null - | Hook : Domain.id * 'a Lwt_dllist.node -> hook + | Hook : Mutex.t * (unit -> unit) Lwt_dllist.node -> hook let null_hook = Null -(* todo: would be good to make this thread-safe. While a switch can only be turned off from its own domain, - we might want to allow closing something explicitly from any domain, and that needs to remove the hook. *) -let remove_hook = function - | Null -> () - | Hook (id, n) -> - if Domain.self () <> id then invalid_arg "Switch hook removed from wrong domain!"; - Lwt_dllist.remove n +let cancelled () = assert false + +let try_remove_hook = function + | Null -> false + | Hook (on_release_lock, n) -> + Mutex.lock on_release_lock; + Lwt_dllist.remove n; + let fn = Lwt_dllist.get n in + Lwt_dllist.set n cancelled; + Mutex.unlock on_release_lock; + fn != cancelled + +let remove_hook x = ignore (try_remove_hook x : bool) let dump f t = Fmt.pf f "@[Switch %d (%d extra fibers):@,%a@]" @@ -91,21 +98,20 @@ let rec await_idle t = Trace.try_get t.cancel.id; Single_waiter.await_protect t.waiter "Switch.await_idle" t.cancel.id done; - (* Call on_release handlers: *) - let queue = Lwt_dllist.create () in - Lwt_dllist.transfer_l t.on_release queue; - let rec release () = - match Lwt_dllist.take_opt_r queue with - | None when t.fibers = 0 && Lwt_dllist.is_empty t.on_release -> () - | None -> await_idle t - | Some fn -> - begin - try Cancel.protect fn with - | ex -> fail t ex - end; - release () + (* Collect on_release handlers: *) + let queue = ref [] in + let enqueue n = + let fn = Lwt_dllist.get n in + Lwt_dllist.set n cancelled; + queue := fn :: !queue in - release () + Mutex.lock t.on_release_lock; + Option.iter (Lwt_dllist.iter_node_l enqueue) t.on_release; + t.on_release <- None; + Mutex.unlock t.on_release_lock; + (* Run on_release handlers *) + !queue |> List.iter (fun fn -> try Cancel.protect fn with ex -> fail t ex); + if t.fibers > 0 then await_idle t let maybe_raise_exs t = match t.exs with @@ -118,7 +124,8 @@ let create cancel = daemon_fibers = 0; exs = None; waiter = Single_waiter.create (); - on_release = Lwt_dllist.create (); + on_release_lock = Mutex.create (); + on_release = Some (Lwt_dllist.create ()); cancel; } @@ -171,25 +178,22 @@ let () = ) let on_release_full t fn = - if Domain.self () = t.cancel.domain then ( - match t.cancel.state with - | On | Cancelling _ -> Lwt_dllist.add_r fn t.on_release - | Finished -> - match Cancel.protect fn with - | () -> invalid_arg "Switch finished!" - | exception ex -> - let bt = Printexc.get_raw_backtrace () in - Printexc.raise_with_backtrace (Release_error ("Switch finished!", ex)) bt - ) else ( + Mutex.lock t.on_release_lock; + match t.on_release with + | Some handlers -> + let node = Lwt_dllist.add_r fn handlers in + Mutex.unlock t.on_release_lock; + node + | None -> + Mutex.unlock t.on_release_lock; match Cancel.protect fn with - | () -> invalid_arg "Switch accessed from wrong domain!" + | () -> invalid_arg "Switch finished!" | exception ex -> let bt = Printexc.get_raw_backtrace () in - Printexc.raise_with_backtrace (Release_error ("Switch accessed from wrong domain!", ex)) bt - ) + Printexc.raise_with_backtrace (Release_error ("Switch finished!", ex)) bt let on_release t fn = ignore (on_release_full t fn : _ Lwt_dllist.node) let on_release_cancellable t fn = - Hook (t.cancel.domain, on_release_full t fn) + Hook (t.on_release_lock, on_release_full t fn) diff --git a/lib_eio/pool.mli b/lib_eio/pool.mli index d299b714e..718ce51c4 100644 --- a/lib_eio/pool.mli +++ b/lib_eio/pool.mli @@ -24,11 +24,8 @@ val create : If [alloc] raises an exception then that use fails, but future calls to {!use} will retry. The [alloc] function is called in the context of the fiber trying to use the pool. - If the pool is shared between domains and the resources are attached to a switch, this - might cause trouble (since switches can't be shared between domains). - You might therefore want to make [alloc] request a resource from the main domain rather than creating one itself. - You should also take care about handling cancellation in [alloc], since resources are typically + You should take care about handling cancellation in [alloc], since resources are typically attached to a switch with the lifetime of the pool, meaning that if [alloc] fails then they won't be freed automatically until the pool itself is finished. diff --git a/stress/dune b/stress/dune index 472e92404..8519a68b3 100644 --- a/stress/dune +++ b/stress/dune @@ -1,3 +1,3 @@ (executables - (names stress_semaphore stress_proc) + (names stress_semaphore stress_proc stress_release) (libraries eio_main)) diff --git a/stress/stress_release.ml b/stress/stress_release.ml new file mode 100644 index 000000000..b8a99eae0 --- /dev/null +++ b/stress/stress_release.ml @@ -0,0 +1,62 @@ +open Eio.Std + +let n_domains = 3 +let n_rounds = 1000 + +(* Each worker domain loops, creating resources and attaching them to the + shared switch [sw]. It also randomly close resources, cancelling the hook. + The main domain finishes the switch while this is happening, freeing all + registered resources. At the end, we check that the number of resources + allocated matches the number freed. *) +let[@warning "-52"] run_domain ~sw ~hooks resources = + try + while true do + Atomic.incr resources; + let hook = Switch.on_release_cancellable sw (fun () -> Atomic.decr resources) in + if Random.bool () then ( + (* Manually close an existing resource. *) + let i = Random.int (Array.length hooks) in + if Switch.try_remove_hook hooks.(i) then + Atomic.decr resources + ); + if Random.bool () then ( + let i = Random.int (Array.length hooks) in + hooks.(i) <- hook; + ) + done + with Invalid_argument "Switch finished!" -> + () + +let main ~pool = + let resources = Array.init n_domains (fun _ -> Atomic.make 0) in + (* Keep up to 10 hooks so we can cancel them randomly too. *) + let hooks = Array.make 10 Switch.null_hook in + for _ = 1 to n_rounds do + (* if i mod 1000 = 0 then traceln "Round %d" i; *) + Switch.run (fun domains_sw -> + Switch.run (fun sw -> + resources |> Array.iter (fun resources -> + Fiber.fork ~sw:domains_sw (fun () -> + Eio.Executor_pool.submit_exn pool ~weight:1.0 (fun () -> run_domain ~sw ~hooks resources) + ) + ); + (* traceln "Wait for domains to start"; *) + while Atomic.get (resources.(n_domains - 1)) < 20 do + Domain.cpu_relax () + done; + ); + (* The child domains will start to finish as they find that + [sw] is not accepting new resources. They may each still + create one last resource. *) + ); + (* All child domains are now finished. *) + let x = Array.fold_left (fun acc resources -> acc + Atomic.get resources) 0 resources in + if x <> 0 then Fmt.failwith "%d resources still open at end!" x + done + +let () = + Eio_main.run @@ fun env -> + let domain_mgr = Eio.Stdenv.domain_mgr env in + Switch.run @@ fun sw -> + let pool = Eio.Executor_pool.create ~sw ~domain_count:n_domains domain_mgr in + main ~pool diff --git a/tests/domains.md b/tests/domains.md index e4083099c..886226d6e 100644 --- a/tests/domains.md +++ b/tests/domains.md @@ -129,27 +129,23 @@ Likewise, switches can't be shared: Exception: Invalid_argument "Switch accessed from wrong domain!". ``` -Can't register a release handler across domains: +Registering a release handler across domains: ```ocaml # run @@ fun mgr -> Switch.run @@ fun sw -> - let p, r = Promise.create () in - Fiber.both - (fun () -> - Eio.Domain_manager.run mgr @@ fun () -> - Switch.run @@ fun sw -> - Promise.resolve r sw; - Fiber.await_cancel () - ) - (fun () -> - let sw = Promise.await p in - Switch.on_release sw ignore - );; -Exception: Invalid_argument "Switch accessed from wrong domain!". + Eio.Domain_manager.run mgr (fun () -> + Switch.on_release sw (fun () -> traceln "Handler called"); + traceln "Handler registered in new domain"; + ); + traceln "Sub-domain finished; ending switch" ++Handler registered in new domain ++Sub-domain finished; ending switch ++Handler called +- : unit = () ``` -Can't release a release handler across domains: +Cancelling a release handler across domains: ```ocaml # run @@ fun mgr -> @@ -159,15 +155,18 @@ Can't release a release handler across domains: (fun () -> Eio.Domain_manager.run mgr @@ fun () -> Switch.run @@ fun sw -> - let hook = Switch.on_release_cancellable sw ignore in + let hook = Switch.on_release_cancellable sw (fun () -> traceln "BUG") in Promise.resolve r hook; Fiber.await_cancel () ) (fun () -> let hook = Promise.await p in - Switch.remove_hook hook + let cancelled = Switch.try_remove_hook hook in + traceln "Cancelled: %b" cancelled; + raise Exit );; -Exception: Invalid_argument "Switch hook removed from wrong domain!". ++Cancelled: true +Exception: Stdlib.Exit. ``` Can't fork into another domain: diff --git a/tests/switch.md b/tests/switch.md index cbe3bbbb2..29a30c3e4 100644 --- a/tests/switch.md +++ b/tests/switch.md @@ -353,7 +353,8 @@ Multiple exceptions: - Invalid_argument("Switch finished!") ``` -Using switch from inside release handler: +Attaching resources to a switch from inside release handler fails +(possibly forking with it should be disallowed in future too): ```ocaml # run (fun sw -> @@ -366,8 +367,13 @@ Using switch from inside release handler: ); Switch.on_release sw (fun () -> Fiber.fork ~sw (fun () -> - Switch.on_release sw (fun () -> traceln "Late release"); traceln "Starting release 2"; + begin + try + Switch.on_release sw (fun () -> traceln "Immediate release"); + with Invalid_argument msg -> + traceln "on_release refused: %s" msg + end; Fiber.yield (); traceln "Finished release 2" ); @@ -376,10 +382,11 @@ Using switch from inside release handler: );; +Main fiber done +Starting release 2 ++Immediate release ++on_release refused: Switch finished! +Starting release 1 +Finished release 2 +Finished release 1 -+Late release - : unit = () ```