Skip to content

Commit

Permalink
Add Lwt_eio.run_lwt_in_main
Browse files Browse the repository at this point in the history
  • Loading branch information
talex5 committed Jul 5, 2023
1 parent 220dfd5 commit 24885e3
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 0 deletions.
46 changes: 46 additions & 0 deletions lib/lwt_eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,49 @@ let run_lwt fn =
with Eio.Cancel.Cancelled _ as ex ->
Lwt.cancel p;
raise ex

(* Jobs to be run in the main Lwt domain. *)
let jobs : (unit -> unit) Queue.t = Queue.create ()

(* Mutex to protect access to [jobs]. *)
let jobs_mutex = Eio.Mutex.create ()

let job_notification =
Lwt_unix.make_notification
(fun () ->
(* Take the first job. The queue is never empty at this point. *)
let thunk = Eio.Mutex.use_rw ~protect:false jobs_mutex (fun () -> Queue.take jobs) in
thunk ()
)

let run_in_main_dont_wait f =
(* Add the job to the queue. *)
Eio.Mutex.use_rw ~protect:false jobs_mutex (fun () -> Queue.add f jobs);
(* Notify the main thread. *)
Lwt_unix.send_notification job_notification

let run_lwt_in_main f =
let cancel = ref (fun () -> assert false) in
let p, r = Eio.Promise.create () in
run_in_main_dont_wait (fun () ->
let thread = f () in
cancel := (fun () -> Lwt.cancel thread);
Lwt.on_any thread
(Eio.Promise.resolve_ok r)
(Eio.Promise.resolve_error r)
);
match
Fiber.check ();
Eio.Promise.await p
with
| Ok x -> x
| Error ex -> raise ex
| exception (Eio.Cancel.Cancelled _ as ex) ->
let cancelled, set_cancelled = Eio.Promise.create () in
run_in_main_dont_wait (fun () ->
(* By the time this runs, [cancel] must have been set. *)
!cancel ();
Eio.Promise.resolve set_cancelled ()
);
Eio.Cancel.protect (fun () -> Eio.Promise.await cancelled);
raise ex
16 changes: 16 additions & 0 deletions lib/lwt_eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,35 @@ end

val run_eio : (unit -> 'a) -> 'a Lwt.t
(** [run_eio fn] allows running Eio code from within a Lwt function.
It runs [fn ()] in a new Eio fiber and returns a promise for the result.
If the returned promise is cancelled, it will cancel the Eio fiber.
The new fiber is attached to the Lwt event loop's switch and will also be
cancelled if the function passed to {!with_event_loop} returns. *)

val run_lwt : (unit -> 'a Lwt.t) -> 'a
(** [run_lwt fn] allows running Lwt code from within an Eio function.
It runs [fn ()] to create a Lwt promise and then awaits it.
If the Eio fiber is cancelled, the Lwt promise is cancelled too.
This can only be called from the domain running Lwt.
For other domains, use {!run_lwt_in_main} instead. *)

val run_lwt_in_main : (unit -> 'a Lwt.t) -> 'a
(** [run_lwt_in_main fn] schedules [fn ()] to run in Lwt's domain
and waits for the result.
It is similar to {!Lwt_preemptive.run_in_main},
but allows other Eio fibers to run while it's waiting.
It can be called from any Eio domain.
If the Eio fiber is cancelled, the Lwt promise is cancelled too. *)

val notify : unit -> unit
(** [notify ()] causes [Lwt_engine.iter] to return,
indicating that the event loop should run the hooks and resume yielded threads.
Ideally, you should call this when an Eio fiber wakes up a Lwt thread, e.g. by resolving a Lwt promise.
In most cases however this isn't really needed,
since [Lwt_unix.yield] is deprecated and [Lwt.pause] will call this automatically. *)
67 changes: 67 additions & 0 deletions test/test.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,70 @@ After finishing with our mainloop, the old Lwt engine is ready for use again:
# Lwt_main.run (Lwt_unix.sleep 0.01);;
- : unit = ()
```

## Running Lwt code from another domain

A new Eio-only domain runs a job in the original Lwt domain.
The Eio domain is still running while it waits, allowing it to resolve an Lwt promise
and cause the original job to finish and return its result to the Eio domain:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ ->
let lwt_domain = Domain.self () in
let pp_domain f d =
if d = lwt_domain then Fmt.string f "Lwt domain"
else Fmt.string f "new Eio domain"
in
traceln "Lwt running in %a" pp_domain lwt_domain;
Eio.Domain_manager.run env#domain_mgr (fun () ->
let eio_domain = Domain.self () in
traceln "Eio running in %a" pp_domain eio_domain;
let p, r = Lwt.wait () in
let result =
Fiber.first
(fun () ->
Lwt_eio.run_lwt_in_main (fun () ->
traceln "Lwt callback running %a" pp_domain (Domain.self ());
let+ p = p in
p ^ "L"
)
)
(fun () ->
Lwt_eio.run_lwt_in_main (fun () -> Lwt.wakeup r "E"; Lwt.return_unit);
Fiber.await_cancel ()
)
in
traceln "Result: %S" result
);;
+Lwt running in Lwt domain
+Eio running in new Eio domain
+Lwt callback running Lwt domain
+Result: "EL"
- : unit = ()
```

Cancelling the Eio fiber cancels the Lwt job too:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ ->
Eio.Domain_manager.run env#domain_mgr (fun () ->
let p, r = Lwt.task () in
Fiber.both
(fun () ->
Lwt_eio.run_lwt_in_main (fun () ->
traceln "Starting Lwt callback...";
Lwt.catch
(fun () -> p)
(fun ex -> traceln "Lwt caught: %a" Fmt.exn ex; raise ex)
)
)
(fun () ->
failwith "Simulated error"
)
);;
+Starting Lwt callback...
+Lwt caught: Lwt.Resolution_loop.Canceled
Exception: Failure "Simulated error".
```

0 comments on commit 24885e3

Please sign in to comment.