Skip to content

Commit

Permalink
Make Switch.on_release thread safe
Browse files Browse the repository at this point in the history
This is needed to allow resource pools to be shared between domains.
  • Loading branch information
talex5 committed Feb 9, 2024
1 parent a9e552f commit d979cc0
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 69 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dscheck:
stress:
dune exec -- ./stress/stress_proc.exe
dune exec -- ./stress/stress_semaphore.exe
dune exec -- ./stress/stress_release.exe

docker:
docker build -t eio .
27 changes: 22 additions & 5 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,41 @@ module Switch : sig
Release handlers are run in LIFO order, in series.
Note that [fn] is called within a {!Cancel.protect}, since aborting clean-up actions is usually a bad idea
and the switch may have been cancelled by the time it runs. *)
and the switch may have been cancelled by the time it runs.
You cannot attach new resources to a switch once the cancel hooks start to run.
This function is thread-safe (but not signal-safe).
If the switch finishes before [fn] can be registered,
it raises [Invalid_argument] and runs [fn] immediately instead. *)

type hook
(** A handle for removing a clean-up callback. *)

val null_hook : hook
(** A dummy hook. Removing it does nothing. *)
(** A dummy hook. [try_remove_hook null_hook = false]. *)

val on_release_cancellable : t -> (unit -> unit) -> hook
(** Like [on_release], but the handler can be removed later.
For example, opening a file will call [on_release_cancellable] to ensure the file is closed later.
However, if the file is manually closed before that, it will use {!remove_hook} to remove the hook,
which is no longer needed. *)
which is no longer needed.
This function is thread-safe (but not signal-safe). *)

val try_remove_hook : hook -> bool
(** [try_remove_hook h] removes a previously-added hook.
Returns [true] if the hook was successfully removed, or [false] if another
domain ran it or removed it first.
This function is thread-safe (but not signal-safe). *)

val remove_hook : hook -> unit
(** [remove_hook h] removes a previously-added hook.
If the hook has already been removed, this does nothing. *)
(** [remove_hook h] is [ignore (try_remove_hook h)].
For multi-domain code, consider using {!try_remove_hook} instead
so that you can handle the case of trying to close a resource
just as another domain is closing it or finishing the switch. *)

(** {2 Debugging} *)

Expand Down
80 changes: 42 additions & 38 deletions lib_eio/core/switch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,31 @@ type t = {
mutable fibers : int; (* Total, including daemon_fibers and the main function *)
mutable daemon_fibers : int;
mutable exs : (exn * Printexc.raw_backtrace) option;
on_release : (unit -> unit) Lwt_dllist.t;
on_release_lock : Mutex.t;
mutable on_release : (unit -> unit) Lwt_dllist.t option; (* [None] when closed. *)
waiter : unit Single_waiter.t; (* The main [top]/[sub] function may wait here for fibers to finish. *)
cancel : Cancel.t;
}

type hook =
| Null
| Hook : Domain.id * 'a Lwt_dllist.node -> hook
| Hook : Mutex.t * (unit -> unit) Lwt_dllist.node -> hook

let null_hook = Null

(* todo: would be good to make this thread-safe. While a switch can only be turned off from its own domain,
we might want to allow closing something explicitly from any domain, and that needs to remove the hook. *)
let remove_hook = function
| Null -> ()
| Hook (id, n) ->
if Domain.self () <> id then invalid_arg "Switch hook removed from wrong domain!";
Lwt_dllist.remove n
let cancelled () = assert false

let try_remove_hook = function
| Null -> false
| Hook (on_release_lock, n) ->
Mutex.lock on_release_lock;
Lwt_dllist.remove n;
let fn = Lwt_dllist.get n in
Lwt_dllist.set n cancelled;
Mutex.unlock on_release_lock;
fn != cancelled

let remove_hook x = ignore (try_remove_hook x : bool)

let dump f t =
Fmt.pf f "@[<v2>Switch %d (%d extra fibers):@,%a@]"
Expand Down Expand Up @@ -91,21 +98,20 @@ let rec await_idle t =
Trace.try_get t.cancel.id;
Single_waiter.await_protect t.waiter "Switch.await_idle" t.cancel.id
done;
(* Call on_release handlers: *)
let queue = Lwt_dllist.create () in
Lwt_dllist.transfer_l t.on_release queue;
let rec release () =
match Lwt_dllist.take_opt_r queue with
| None when t.fibers = 0 && Lwt_dllist.is_empty t.on_release -> ()
| None -> await_idle t
| Some fn ->
begin
try Cancel.protect fn with
| ex -> fail t ex
end;
release ()
(* Collect on_release handlers: *)
let queue = ref [] in
let enqueue n =
let fn = Lwt_dllist.get n in
Lwt_dllist.set n cancelled;
queue := fn :: !queue
in
release ()
Mutex.lock t.on_release_lock;
Option.iter (Lwt_dllist.iter_node_l enqueue) t.on_release;
t.on_release <- None;
Mutex.unlock t.on_release_lock;
(* Run on_release handlers *)
!queue |> List.iter (fun fn -> try Cancel.protect fn with ex -> fail t ex);
if t.fibers > 0 then await_idle t

let maybe_raise_exs t =
match t.exs with
Expand All @@ -118,7 +124,8 @@ let create cancel =
daemon_fibers = 0;
exs = None;
waiter = Single_waiter.create ();
on_release = Lwt_dllist.create ();
on_release_lock = Mutex.create ();
on_release = Some (Lwt_dllist.create ());
cancel;
}

Expand Down Expand Up @@ -171,25 +178,22 @@ let () =
)

let on_release_full t fn =
if Domain.self () = t.cancel.domain then (
match t.cancel.state with
| On | Cancelling _ -> Lwt_dllist.add_r fn t.on_release
| Finished ->
match Cancel.protect fn with
| () -> invalid_arg "Switch finished!"
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Printexc.raise_with_backtrace (Release_error ("Switch finished!", ex)) bt
) else (
Mutex.lock t.on_release_lock;
match t.on_release with
| Some handlers ->
let node = Lwt_dllist.add_r fn handlers in
Mutex.unlock t.on_release_lock;
node
| None ->
Mutex.unlock t.on_release_lock;
match Cancel.protect fn with
| () -> invalid_arg "Switch accessed from wrong domain!"
| () -> invalid_arg "Switch finished!"
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Printexc.raise_with_backtrace (Release_error ("Switch accessed from wrong domain!", ex)) bt
)
Printexc.raise_with_backtrace (Release_error ("Switch finished!", ex)) bt

let on_release t fn =
ignore (on_release_full t fn : _ Lwt_dllist.node)

let on_release_cancellable t fn =
Hook (t.cancel.domain, on_release_full t fn)
Hook (t.on_release_lock, on_release_full t fn)
5 changes: 1 addition & 4 deletions lib_eio/pool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ val create :
If [alloc] raises an exception then that use fails, but future calls to {!use} will retry.
The [alloc] function is called in the context of the fiber trying to use the pool.
If the pool is shared between domains and the resources are attached to a switch, this
might cause trouble (since switches can't be shared between domains).
You might therefore want to make [alloc] request a resource from the main domain rather than creating one itself.
You should also take care about handling cancellation in [alloc], since resources are typically
You should take care about handling cancellation in [alloc], since resources are typically
attached to a switch with the lifetime of the pool, meaning that if [alloc] fails then they won't
be freed automatically until the pool itself is finished.
Expand Down
2 changes: 1 addition & 1 deletion stress/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
(executables
(names stress_semaphore stress_proc)
(names stress_semaphore stress_proc stress_release)
(libraries eio_main))
62 changes: 62 additions & 0 deletions stress/stress_release.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
open Eio.Std

let n_domains = 3
let n_rounds = 1000

(* Each worker domain loops, creating resources and attaching them to the
shared switch [sw]. It also randomly close resources, cancelling the hook.
The main domain finishes the switch while this is happening, freeing all
registered resources. At the end, we check that the number of resources
allocated matches the number freed. *)
let[@warning "-52"] run_domain ~sw ~hooks resources =
try
while true do
Atomic.incr resources;
let hook = Switch.on_release_cancellable sw (fun () -> Atomic.decr resources) in
if Random.bool () then (
(* Manually close an existing resource. *)
let i = Random.int (Array.length hooks) in
if Switch.try_remove_hook hooks.(i) then
Atomic.decr resources
);
if Random.bool () then (
let i = Random.int (Array.length hooks) in
hooks.(i) <- hook;
)
done
with Invalid_argument "Switch finished!" ->
()

let main ~pool =
let resources = Array.init n_domains (fun _ -> Atomic.make 0) in
(* Keep up to 10 hooks so we can cancel them randomly too. *)
let hooks = Array.make 10 Switch.null_hook in
for _ = 1 to n_rounds do
(* if i mod 1000 = 0 then traceln "Round %d" i; *)
Switch.run (fun domains_sw ->
Switch.run (fun sw ->
resources |> Array.iter (fun resources ->
Fiber.fork ~sw:domains_sw (fun () ->
Eio.Executor_pool.submit_exn pool ~weight:1.0 (fun () -> run_domain ~sw ~hooks resources)
)
);
(* traceln "Wait for domains to start"; *)
while Atomic.get (resources.(n_domains - 1)) < 20 do
Domain.cpu_relax ()
done;
);
(* The child domains will start to finish as they find that
[sw] is not accepting new resources. They may each still
create one last resource. *)
);
(* All child domains are now finished. *)
let x = Array.fold_left (fun acc resources -> acc + Atomic.get resources) 0 resources in
if x <> 0 then Fmt.failwith "%d resources still open at end!" x
done

let () =
Eio_main.run @@ fun env ->
let domain_mgr = Eio.Stdenv.domain_mgr env in
Switch.run @@ fun sw ->
let pool = Eio.Executor_pool.create ~sw ~domain_count:n_domains domain_mgr in
main ~pool
35 changes: 17 additions & 18 deletions tests/domains.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,27 +129,23 @@ Likewise, switches can't be shared:
Exception: Invalid_argument "Switch accessed from wrong domain!".
```

Can't register a release handler across domains:
Registering a release handler across domains:

```ocaml
# run @@ fun mgr ->
Switch.run @@ fun sw ->
let p, r = Promise.create () in
Fiber.both
(fun () ->
Eio.Domain_manager.run mgr @@ fun () ->
Switch.run @@ fun sw ->
Promise.resolve r sw;
Fiber.await_cancel ()
)
(fun () ->
let sw = Promise.await p in
Switch.on_release sw ignore
);;
Exception: Invalid_argument "Switch accessed from wrong domain!".
Eio.Domain_manager.run mgr (fun () ->
Switch.on_release sw (fun () -> traceln "Handler called");
traceln "Handler registered in new domain";
);
traceln "Sub-domain finished; ending switch"
+Handler registered in new domain
+Sub-domain finished; ending switch
+Handler called
- : unit = ()
```

Can't release a release handler across domains:
Cancelling a release handler across domains:

```ocaml
# run @@ fun mgr ->
Expand All @@ -159,15 +155,18 @@ Can't release a release handler across domains:
(fun () ->
Eio.Domain_manager.run mgr @@ fun () ->
Switch.run @@ fun sw ->
let hook = Switch.on_release_cancellable sw ignore in
let hook = Switch.on_release_cancellable sw (fun () -> traceln "BUG") in
Promise.resolve r hook;
Fiber.await_cancel ()
)
(fun () ->
let hook = Promise.await p in
Switch.remove_hook hook
let cancelled = Switch.try_remove_hook hook in
traceln "Cancelled: %b" cancelled;
raise Exit
);;
Exception: Invalid_argument "Switch hook removed from wrong domain!".
+Cancelled: true
Exception: Stdlib.Exit.
```

Can't fork into another domain:
Expand Down
13 changes: 10 additions & 3 deletions tests/switch.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ Multiple exceptions:
- Invalid_argument("Switch finished!")
```

Using switch from inside release handler:
Attaching resources to a switch from inside release handler fails
(possibly forking with it should be disallowed in future too):

```ocaml
# run (fun sw ->
Expand All @@ -366,8 +367,13 @@ Using switch from inside release handler:
);
Switch.on_release sw (fun () ->
Fiber.fork ~sw (fun () ->
Switch.on_release sw (fun () -> traceln "Late release");
traceln "Starting release 2";
begin
try
Switch.on_release sw (fun () -> traceln "Immediate release");
with Invalid_argument msg ->
traceln "on_release refused: %s" msg
end;
Fiber.yield ();
traceln "Finished release 2"
);
Expand All @@ -376,10 +382,11 @@ Using switch from inside release handler:
);;
+Main fiber done
+Starting release 2
+Immediate release
+on_release refused: Switch finished!
+Starting release 1
+Finished release 2
+Finished release 1
+Late release
- : unit = ()
```

Expand Down

0 comments on commit d979cc0

Please sign in to comment.