Skip to content

Commit

Permalink
Merge pull request #24 from talex5/debug-mode
Browse files Browse the repository at this point in the history
Add debug mode
  • Loading branch information
talex5 authored Aug 30, 2023
2 parents b16c534 + 09a4184 commit 0d35d77
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 16 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ If you replace the `Lwt_eio.run_eio @@ fun () ->` line with `Lwt.return @@`
then it will appear to work in simple cases, but it will act as a blocking read from Lwt's point of view.
It's similar to trying to turn a blocking call like `Stdlib.input_line` into an asynchronous one
using `Lwt.return`. It doesn't actually make it concurrent.
Using `Lwt_eio.with_event_loop ~debug:true` will detect these problems, by blocking effects when in Lwt mode.

We can now test it using an Eio flow:

Expand All @@ -134,7 +135,7 @@ We can now test it using an Eio flow:
val sort : [> Eio.Flow.source_ty ] r -> unit Lwt.t = <fun>
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Lwt_eio.run_lwt @@ fun () ->
sort (Eio.Flow.string_source "b\na\nd\nc\n");;
a
Expand Down Expand Up @@ -183,7 +184,7 @@ We can therefore now call it directly from Eio:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
sort
~src:(Eio.Flow.string_source "b\na\nd\nc\n")
~dst:env#stdout;;
Expand Down
91 changes: 79 additions & 12 deletions lib/lwt_eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,63 @@ let is_forked = ref false
switch argument, which is why we need to use a global variable here. *)
let loop_switch = ref None

type debug_mode =
| Eio (* Effects are permitted *)
| Lwt (* Effects are not permitted *)
| Normal (* We're not checking *)

let mode = ref Normal

let with_mode m fn =
match !mode, m with
| Normal, _ -> fn ()
| Lwt, Eio ->
mode := Eio;
begin
match fn () with
| x -> mode := Lwt; x
| exception ex -> mode := Lwt; raise ex
end
| Eio, Lwt ->
mode := Lwt;
Effect.Deep.match_with fn ()
{ retc = (fun x -> mode := Eio; x);
exnc = (fun ex -> mode := Eio; raise ex);
effc = fun (type a) (e : a Effect.t) : ((a, _) Effect.Deep.continuation -> _) option ->
match e with
| Eio.Private.Effects.Get_context -> None
| _ ->
match !mode with
| Normal -> assert false
| Eio -> None
| Lwt ->
Printf.eprintf "WARNING: Attempt to perform effect in Lwt context\n";
Some (fun k ->
if Printexc.backtrace_status () then
Printexc.print_raw_backtrace stderr (Effect.Deep.get_callstack k 10);
flush stderr;
Effect.Deep.discontinue k (Invalid_argument "Attempt to perform effect in Lwt context")
)
}
| Eio, Eio ->
let bt = Printexc.get_callstack (if Printexc.backtrace_status () then 20 else 0) in
let ex = Failure "Already in Eio context!" in
traceln "WARNING: %a" Fmt.exn_backtrace (ex, bt);
raise ex
| Lwt, Lwt ->
let bt = Printexc.get_callstack (if Printexc.backtrace_status () then 20 else 0) in
let ex = Failure "Already in Lwt context!" in
traceln "WARNING: %a" Fmt.exn_backtrace (ex, bt);
raise ex
| _, Normal -> assert false

let notify () = Lazy.force !ready

(* Run [fn] in a new fiber and return a lazy value that can be forced to cancel it. *)
let fork_with_cancel ~sw fn =
if !is_forked then lazy (failwith "Can't use Eio in a forked child process")
else (
with_mode Eio @@ fun () ->
let cancel = ref None in
Fiber.fork ~sw (fun () ->
try
Expand Down Expand Up @@ -83,32 +134,33 @@ let make_engine ~sw ~clock = object
fork_with_cancel ~sw @@ fun () ->
while true do
Eio_unix.await_readable fd;
Eio.Cancel.protect (fun () -> callback (); notify ())
Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
done

method private register_writable fd callback =
fork_with_cancel ~sw @@ fun () ->
while true do
Eio_unix.await_writable fd;
Eio.Cancel.protect (fun () -> callback (); notify ())
Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
done

method private register_timer delay repeat callback =
fork_with_cancel ~sw @@ fun () ->
if repeat then (
while true do
Eio.Time.sleep clock delay;
Eio.Cancel.protect (fun () -> callback (); notify ())
Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
done
) else (
Eio.Time.sleep clock delay;
Eio.Cancel.protect (fun () -> callback (); notify ())
Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
)

method! forwards_signal signum =
signum = Sys.sigchld

method iter block =
with_mode Eio @@ fun () ->
if block then (
let p, r = Promise.create () in
ready := lazy (Promise.resolve r ());
Expand All @@ -129,6 +181,7 @@ let main ~clock user_promise =
if Option.is_some !loop_switch then invalid_arg "Lwt_eio event loop already running";
Switch.on_release sw (fun () -> loop_switch := None);
loop_switch := Some sw;
with_mode Lwt @@ fun () ->
Lwt_engine.set ~destroy:false (make_engine ~sw ~clock);
(* An Eio fiber may resume an Lwt thread while in [Lwt_engine.iter] and forget to call [notify].
If that called [Lwt.pause] then it wouldn't wake up, so handle this common case here. *)
Expand All @@ -139,9 +192,10 @@ let main ~clock user_promise =
with Exit ->
Lwt_engine.set old_engine

let with_event_loop ~clock fn =
let with_event_loop ?(debug=false) ~clock fn =
Lazy.force install_sigchld_handler;
let p, r = Lwt.wait () in
mode := if debug then Eio else Normal;
Switch.run @@ fun sw ->
Fiber.fork ~sw (fun () -> main ~clock p);
Fun.protect (fun () -> fn Token.v)
Expand All @@ -162,43 +216,56 @@ module Promise = struct
Promise.await_exn p

let await_eio eio_promise =
with_mode Eio @@ fun () ->
let sw = get_loop_switch () in
let p, r = Lwt.wait () in
Fiber.fork ~sw (fun () ->
Lwt.wakeup r (Promise.await eio_promise);
let x = Promise.await eio_promise in
with_mode Lwt @@ fun () ->
Lwt.wakeup r x;
notify ()
);
p

let await_eio_result eio_promise =
with_mode Eio @@ fun () ->
let sw = get_loop_switch () in
let p, r = Lwt.wait () in
Fiber.fork ~sw (fun () ->
match Promise.await eio_promise with
| Ok x -> Lwt.wakeup r x; notify ()
| Error ex -> Lwt.wakeup_exn r ex; notify ()
| Ok x ->
with_mode Lwt @@ fun () ->
Lwt.wakeup r x; notify ()
| Error ex ->
with_mode Lwt @@ fun () ->
Lwt.wakeup_exn r ex; notify ()
);
p
end

let run_eio fn =
with_mode Eio @@ fun () ->
let sw = get_loop_switch () in
let p, r = Lwt.task () in
let cc = ref None in
Fiber.fork ~sw (fun () ->
Eio.Cancel.sub (fun cancel ->
cc := Some cancel;
match fn () with
| x -> Lwt.wakeup r x; notify ()
| exception ex -> Lwt.wakeup_exn r ex; notify ()
| x ->
with_mode Lwt @@ fun () ->
Lwt.wakeup r x; notify ()
| exception ex ->
with_mode Lwt @@ fun () ->
Lwt.wakeup_exn r ex; notify ()
)
);
Lwt.on_cancel p (fun () -> Option.iter (fun cc -> Eio.Cancel.cancel cc Lwt.Canceled) !cc);
p

let run_lwt fn =
Fiber.check ();
let p = fn () in
let p = with_mode Lwt fn in
try
Fiber.check ();
Promise.await_lwt p
Expand All @@ -216,7 +283,7 @@ let job_notification =
(fun () ->
(* Take the first job. The queue is never empty at this point. *)
let thunk = Lf_queue.pop jobs |> Option.get in
thunk ()
with_mode Lwt thunk
)

let run_in_main_dont_wait f =
Expand Down
8 changes: 6 additions & 2 deletions lib/lwt_eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ module Token : sig
existing code. *)
end

val with_event_loop : clock:_ Eio.Time.clock -> (Token.t -> 'a) -> 'a
val with_event_loop : ?debug:bool -> clock:_ Eio.Time.clock -> (Token.t -> 'a) -> 'a
(** [with_event_loop ~clock fn] starts an Lwt event loop running and then executes [fn t].
When that finishes, the event loop is stopped. *)
When that finishes, the event loop is stopped.
@param debug If [true] (the default is [false]), block attempts to perform effects in Lwt context.
[Get_context] is allowed, so [traceln] still works,
but this will detect cases where Lwt code tries to call Eio code directly. *)

module Promise : sig
val await_lwt : 'a Lwt.t -> 'a
Expand Down
153 changes: 153 additions & 0 deletions test/test.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,156 @@ Cancelling the Eio fiber cancels the Lwt job too:
+Lwt caught: Lwt.Resolution_loop.Canceled
Exception: Failure "Simulated error".
```

## Debug mode

In debug mode, we detect attempts to use Eio from Lwt context:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Lwt_eio.run_lwt @@ fun () ->
traceln "Traceln is permitted";
Fiber.yield ();
assert false
+Traceln is permitted
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

In a new fiber:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
Fiber.fork ~sw (fun () ->
Lwt_eio.run_lwt @@ fun () ->
Fiber.yield ();
assert false
);;
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

Resumed by the Lwt engine:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
let x =
let+ () = Lwt.pause () in
Fiber.yield ()
in
Lwt_eio.Promise.await_lwt x;;
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

Resumed by IO:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
let r, w = Eio_unix.pipe sw in
Fiber.both
(fun () ->
Lwt_eio.run_lwt @@ fun () ->
let r = Eio_unix.Resource.fd r in
Eio_unix.Fd.use_exn "lwt_read" r @@ fun r ->
let r = Lwt_unix.of_unix_file_descr r in
let buf = Bytes.create 1 in
let* got = Lwt_unix.read r buf 0 1 in
assert (got = 1);
traceln "Got %S" (Bytes.to_string buf);
Fiber.yield ();
assert false
)
(fun () -> Eio.Flow.copy_string "&" w);;
+Got "&"
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

Changing to the same mode is an error:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Lwt_eio.run_eio (fun () -> assert false);;
+WARNING: Exception: Failure("Already in Eio context!")
+ No backtrace available.
Exception: Failure "Already in Eio context!".
```

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Lwt_eio.run_lwt @@ fun () ->
Lwt_eio.run_lwt (fun () -> assert false);;
+WARNING: Exception: Failure("Already in Lwt context!")
+ No backtrace available.
Exception: Failure "Already in Lwt context!".
```

`await_eio` resumes in Lwt mode:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
let p, r = Promise.create () in
Fiber.both
(fun () ->
Lwt_eio.run_lwt @@ fun () ->
let* () = Lwt_eio.Promise.await_eio p in
Fiber.yield ();
assert false
)
(fun () -> Promise.resolve r ());;
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

`await_eio_result` resumes in Lwt mode:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
let p = Fiber.fork_promise ~sw (fun () -> Fiber.yield (); traceln "Eio done") in
Lwt_eio.run_lwt @@ fun () ->
let* () = Lwt_eio.Promise.await_eio_result p in
Fiber.yield ();
assert false;;
+Eio done
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

`run_eio` resumes in Lwt mode:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
Lwt_eio.run_lwt @@ fun () ->
let* () = Lwt_eio.run_eio Fiber.yield in
Fiber.yield ();
assert false;;
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

Debug mode is off by default:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ ->
Lwt_eio.run_lwt @@ fun () ->
Fiber.yield ();
Lwt.return "Didn't notice";;
- : string = "Didn't notice"
```

0 comments on commit 0d35d77

Please sign in to comment.