From 24885e39dfd68c138a93cb3c09151fc97ea89074 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Mon, 3 Jul 2023 14:44:04 +0100 Subject: [PATCH] Add Lwt_eio.run_lwt_in_main --- lib/lwt_eio.ml | 46 +++++++++++++++++++++++++++++++++ lib/lwt_eio.mli | 16 ++++++++++++ test/test.md | 67 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+) diff --git a/lib/lwt_eio.ml b/lib/lwt_eio.ml index 2d9e27c..d3fba72 100644 --- a/lib/lwt_eio.ml +++ b/lib/lwt_eio.ml @@ -157,3 +157,49 @@ let run_lwt fn = with Eio.Cancel.Cancelled _ as ex -> Lwt.cancel p; raise ex + +(* Jobs to be run in the main Lwt domain. *) +let jobs : (unit -> unit) Queue.t = Queue.create () + +(* Mutex to protect access to [jobs]. *) +let jobs_mutex = Eio.Mutex.create () + +let job_notification = + Lwt_unix.make_notification + (fun () -> + (* Take the first job. The queue is never empty at this point. *) + let thunk = Eio.Mutex.use_rw ~protect:false jobs_mutex (fun () -> Queue.take jobs) in + thunk () + ) + +let run_in_main_dont_wait f = + (* Add the job to the queue. *) + Eio.Mutex.use_rw ~protect:false jobs_mutex (fun () -> Queue.add f jobs); + (* Notify the main thread. *) + Lwt_unix.send_notification job_notification + +let run_lwt_in_main f = + let cancel = ref (fun () -> assert false) in + let p, r = Eio.Promise.create () in + run_in_main_dont_wait (fun () -> + let thread = f () in + cancel := (fun () -> Lwt.cancel thread); + Lwt.on_any thread + (Eio.Promise.resolve_ok r) + (Eio.Promise.resolve_error r) + ); + match + Fiber.check (); + Eio.Promise.await p + with + | Ok x -> x + | Error ex -> raise ex + | exception (Eio.Cancel.Cancelled _ as ex) -> + let cancelled, set_cancelled = Eio.Promise.create () in + run_in_main_dont_wait (fun () -> + (* By the time this runs, [cancel] must have been set. *) + !cancel (); + Eio.Promise.resolve set_cancelled () + ); + Eio.Cancel.protect (fun () -> Eio.Promise.await cancelled); + raise ex diff --git a/lib/lwt_eio.mli b/lib/lwt_eio.mli index 16709c5..376cc61 100644 --- a/lib/lwt_eio.mli +++ b/lib/lwt_eio.mli @@ -31,6 +31,7 @@ end val run_eio : (unit -> 'a) -> 'a Lwt.t (** [run_eio fn] allows running Eio code from within a Lwt function. + It runs [fn ()] in a new Eio fiber and returns a promise for the result. If the returned promise is cancelled, it will cancel the Eio fiber. The new fiber is attached to the Lwt event loop's switch and will also be @@ -38,12 +39,27 @@ val run_eio : (unit -> 'a) -> 'a Lwt.t val run_lwt : (unit -> 'a Lwt.t) -> 'a (** [run_lwt fn] allows running Lwt code from within an Eio function. + It runs [fn ()] to create a Lwt promise and then awaits it. + If the Eio fiber is cancelled, the Lwt promise is cancelled too. + + This can only be called from the domain running Lwt. + For other domains, use {!run_lwt_in_main} instead. *) + +val run_lwt_in_main : (unit -> 'a Lwt.t) -> 'a +(** [run_lwt_in_main fn] schedules [fn ()] to run in Lwt's domain + and waits for the result. + + It is similar to {!Lwt_preemptive.run_in_main}, + but allows other Eio fibers to run while it's waiting. + It can be called from any Eio domain. + If the Eio fiber is cancelled, the Lwt promise is cancelled too. *) val notify : unit -> unit (** [notify ()] causes [Lwt_engine.iter] to return, indicating that the event loop should run the hooks and resume yielded threads. + Ideally, you should call this when an Eio fiber wakes up a Lwt thread, e.g. by resolving a Lwt promise. In most cases however this isn't really needed, since [Lwt_unix.yield] is deprecated and [Lwt.pause] will call this automatically. *) diff --git a/test/test.md b/test/test.md index 4ae3320..9311656 100644 --- a/test/test.md +++ b/test/test.md @@ -179,3 +179,70 @@ After finishing with our mainloop, the old Lwt engine is ready for use again: # Lwt_main.run (Lwt_unix.sleep 0.01);; - : unit = () ``` + +## Running Lwt code from another domain + +A new Eio-only domain runs a job in the original Lwt domain. +The Eio domain is still running while it waits, allowing it to resolve an Lwt promise +and cause the original job to finish and return its result to the Eio domain: + +```ocaml +# Eio_main.run @@ fun env -> + Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ -> + let lwt_domain = Domain.self () in + let pp_domain f d = + if d = lwt_domain then Fmt.string f "Lwt domain" + else Fmt.string f "new Eio domain" + in + traceln "Lwt running in %a" pp_domain lwt_domain; + Eio.Domain_manager.run env#domain_mgr (fun () -> + let eio_domain = Domain.self () in + traceln "Eio running in %a" pp_domain eio_domain; + let p, r = Lwt.wait () in + let result = + Fiber.first + (fun () -> + Lwt_eio.run_lwt_in_main (fun () -> + traceln "Lwt callback running %a" pp_domain (Domain.self ()); + let+ p = p in + p ^ "L" + ) + ) + (fun () -> + Lwt_eio.run_lwt_in_main (fun () -> Lwt.wakeup r "E"; Lwt.return_unit); + Fiber.await_cancel () + ) + in + traceln "Result: %S" result + );; ++Lwt running in Lwt domain ++Eio running in new Eio domain ++Lwt callback running Lwt domain ++Result: "EL" +- : unit = () +``` + +Cancelling the Eio fiber cancels the Lwt job too: + +```ocaml +# Eio_main.run @@ fun env -> + Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ -> + Eio.Domain_manager.run env#domain_mgr (fun () -> + let p, r = Lwt.task () in + Fiber.both + (fun () -> + Lwt_eio.run_lwt_in_main (fun () -> + traceln "Starting Lwt callback..."; + Lwt.catch + (fun () -> p) + (fun ex -> traceln "Lwt caught: %a" Fmt.exn ex; raise ex) + ) + ) + (fun () -> + failwith "Simulated error" + ) + );; ++Starting Lwt callback... ++Lwt caught: Lwt.Resolution_loop.Canceled +Exception: Failure "Simulated error". +```