Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debug mode #24

Merged
merged 1 commit into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ If you replace the `Lwt_eio.run_eio @@ fun () ->` line with `Lwt.return @@`
then it will appear to work in simple cases, but it will act as a blocking read from Lwt's point of view.
It's similar to trying to turn a blocking call like `Stdlib.input_line` into an asynchronous one
using `Lwt.return`. It doesn't actually make it concurrent.
Using `Lwt_eio.with_event_loop ~debug:true` will detect these problems, by blocking effects when in Lwt mode.

We can now test it using an Eio flow:

Expand All @@ -134,7 +135,7 @@ We can now test it using an Eio flow:
val sort : [> Eio.Flow.source_ty ] r -> unit Lwt.t = <fun>

# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Lwt_eio.run_lwt @@ fun () ->
sort (Eio.Flow.string_source "b\na\nd\nc\n");;
a
Expand Down Expand Up @@ -183,7 +184,7 @@ We can therefore now call it directly from Eio:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
sort
~src:(Eio.Flow.string_source "b\na\nd\nc\n")
~dst:env#stdout;;
Expand Down
91 changes: 79 additions & 12 deletions lib/lwt_eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,63 @@ let is_forked = ref false
switch argument, which is why we need to use a global variable here. *)
let loop_switch = ref None

type debug_mode =
| Eio (* Effects are permitted *)
| Lwt (* Effects are not permitted *)
| Normal (* We're not checking *)

let mode = ref Normal

let with_mode m fn =
match !mode, m with
| Normal, _ -> fn ()
| Lwt, Eio ->
mode := Eio;
begin
match fn () with
| x -> mode := Lwt; x
| exception ex -> mode := Lwt; raise ex
end
| Eio, Lwt ->
mode := Lwt;
Effect.Deep.match_with fn ()
{ retc = (fun x -> mode := Eio; x);
exnc = (fun ex -> mode := Eio; raise ex);
effc = fun (type a) (e : a Effect.t) : ((a, _) Effect.Deep.continuation -> _) option ->
match e with
| Eio.Private.Effects.Get_context -> None
| _ ->
match !mode with
| Normal -> assert false
| Eio -> None
| Lwt ->
Printf.eprintf "WARNING: Attempt to perform effect in Lwt context\n";
Some (fun k ->
if Printexc.backtrace_status () then
Printexc.print_raw_backtrace stderr (Effect.Deep.get_callstack k 10);
flush stderr;
Effect.Deep.discontinue k (Invalid_argument "Attempt to perform effect in Lwt context")
)
}
| Eio, Eio ->
let bt = Printexc.get_callstack (if Printexc.backtrace_status () then 20 else 0) in
let ex = Failure "Already in Eio context!" in
traceln "WARNING: %a" Fmt.exn_backtrace (ex, bt);
raise ex
| Lwt, Lwt ->
let bt = Printexc.get_callstack (if Printexc.backtrace_status () then 20 else 0) in
let ex = Failure "Already in Lwt context!" in
traceln "WARNING: %a" Fmt.exn_backtrace (ex, bt);
raise ex
| _, Normal -> assert false

let notify () = Lazy.force !ready

(* Run [fn] in a new fiber and return a lazy value that can be forced to cancel it. *)
let fork_with_cancel ~sw fn =
if !is_forked then lazy (failwith "Can't use Eio in a forked child process")
else (
with_mode Eio @@ fun () ->
let cancel = ref None in
Fiber.fork ~sw (fun () ->
try
Expand Down Expand Up @@ -83,32 +134,33 @@ let make_engine ~sw ~clock = object
fork_with_cancel ~sw @@ fun () ->
while true do
Eio_unix.await_readable fd;
Eio.Cancel.protect (fun () -> callback (); notify ())
Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
done

method private register_writable fd callback =
fork_with_cancel ~sw @@ fun () ->
while true do
Eio_unix.await_writable fd;
Eio.Cancel.protect (fun () -> callback (); notify ())
Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
done

method private register_timer delay repeat callback =
fork_with_cancel ~sw @@ fun () ->
if repeat then (
while true do
Eio.Time.sleep clock delay;
Eio.Cancel.protect (fun () -> callback (); notify ())
Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
done
) else (
Eio.Time.sleep clock delay;
Eio.Cancel.protect (fun () -> callback (); notify ())
Eio.Cancel.protect (fun () -> with_mode Lwt callback; notify ())
)

method! forwards_signal signum =
signum = Sys.sigchld

method iter block =
with_mode Eio @@ fun () ->
if block then (
let p, r = Promise.create () in
ready := lazy (Promise.resolve r ());
Expand All @@ -129,6 +181,7 @@ let main ~clock user_promise =
if Option.is_some !loop_switch then invalid_arg "Lwt_eio event loop already running";
Switch.on_release sw (fun () -> loop_switch := None);
loop_switch := Some sw;
with_mode Lwt @@ fun () ->
Lwt_engine.set ~destroy:false (make_engine ~sw ~clock);
(* An Eio fiber may resume an Lwt thread while in [Lwt_engine.iter] and forget to call [notify].
If that called [Lwt.pause] then it wouldn't wake up, so handle this common case here. *)
Expand All @@ -139,9 +192,10 @@ let main ~clock user_promise =
with Exit ->
Lwt_engine.set old_engine

let with_event_loop ~clock fn =
let with_event_loop ?(debug=false) ~clock fn =
Lazy.force install_sigchld_handler;
let p, r = Lwt.wait () in
mode := if debug then Eio else Normal;
Switch.run @@ fun sw ->
Fiber.fork ~sw (fun () -> main ~clock p);
Fun.protect (fun () -> fn Token.v)
Expand All @@ -162,43 +216,56 @@ module Promise = struct
Promise.await_exn p

let await_eio eio_promise =
with_mode Eio @@ fun () ->
let sw = get_loop_switch () in
let p, r = Lwt.wait () in
Fiber.fork ~sw (fun () ->
Lwt.wakeup r (Promise.await eio_promise);
let x = Promise.await eio_promise in
with_mode Lwt @@ fun () ->
Lwt.wakeup r x;
notify ()
);
p

let await_eio_result eio_promise =
with_mode Eio @@ fun () ->
let sw = get_loop_switch () in
let p, r = Lwt.wait () in
Fiber.fork ~sw (fun () ->
match Promise.await eio_promise with
| Ok x -> Lwt.wakeup r x; notify ()
| Error ex -> Lwt.wakeup_exn r ex; notify ()
| Ok x ->
with_mode Lwt @@ fun () ->
Lwt.wakeup r x; notify ()
| Error ex ->
with_mode Lwt @@ fun () ->
Lwt.wakeup_exn r ex; notify ()
);
p
end

let run_eio fn =
with_mode Eio @@ fun () ->
let sw = get_loop_switch () in
let p, r = Lwt.task () in
let cc = ref None in
Fiber.fork ~sw (fun () ->
Eio.Cancel.sub (fun cancel ->
cc := Some cancel;
match fn () with
| x -> Lwt.wakeup r x; notify ()
| exception ex -> Lwt.wakeup_exn r ex; notify ()
| x ->
with_mode Lwt @@ fun () ->
Lwt.wakeup r x; notify ()
| exception ex ->
with_mode Lwt @@ fun () ->
Lwt.wakeup_exn r ex; notify ()
)
);
Lwt.on_cancel p (fun () -> Option.iter (fun cc -> Eio.Cancel.cancel cc Lwt.Canceled) !cc);
p

let run_lwt fn =
Fiber.check ();
let p = fn () in
let p = with_mode Lwt fn in
try
Fiber.check ();
Promise.await_lwt p
Expand All @@ -216,7 +283,7 @@ let job_notification =
(fun () ->
(* Take the first job. The queue is never empty at this point. *)
let thunk = Lf_queue.pop jobs |> Option.get in
thunk ()
with_mode Lwt thunk
)

let run_in_main_dont_wait f =
Expand Down
8 changes: 6 additions & 2 deletions lib/lwt_eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ module Token : sig
existing code. *)
end

val with_event_loop : clock:_ Eio.Time.clock -> (Token.t -> 'a) -> 'a
val with_event_loop : ?debug:bool -> clock:_ Eio.Time.clock -> (Token.t -> 'a) -> 'a
(** [with_event_loop ~clock fn] starts an Lwt event loop running and then executes [fn t].
When that finishes, the event loop is stopped. *)
When that finishes, the event loop is stopped.

@param debug If [true] (the default is [false]), block attempts to perform effects in Lwt context.
[Get_context] is allowed, so [traceln] still works,
but this will detect cases where Lwt code tries to call Eio code directly. *)

module Promise : sig
val await_lwt : 'a Lwt.t -> 'a
Expand Down
153 changes: 153 additions & 0 deletions test/test.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,156 @@ Cancelling the Eio fiber cancels the Lwt job too:
+Lwt caught: Lwt.Resolution_loop.Canceled
Exception: Failure "Simulated error".
```

## Debug mode

In debug mode, we detect attempts to use Eio from Lwt context:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Lwt_eio.run_lwt @@ fun () ->
traceln "Traceln is permitted";
Fiber.yield ();
assert false
+Traceln is permitted
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

In a new fiber:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
Fiber.fork ~sw (fun () ->
Lwt_eio.run_lwt @@ fun () ->
Fiber.yield ();
assert false
);;
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

Resumed by the Lwt engine:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
let x =
let+ () = Lwt.pause () in
Fiber.yield ()
in
Lwt_eio.Promise.await_lwt x;;
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

Resumed by IO:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
let r, w = Eio_unix.pipe sw in
Fiber.both
(fun () ->
Lwt_eio.run_lwt @@ fun () ->
let r = Eio_unix.Resource.fd r in
Eio_unix.Fd.use_exn "lwt_read" r @@ fun r ->
let r = Lwt_unix.of_unix_file_descr r in
let buf = Bytes.create 1 in
let* got = Lwt_unix.read r buf 0 1 in
assert (got = 1);
traceln "Got %S" (Bytes.to_string buf);
Fiber.yield ();
assert false
)
(fun () -> Eio.Flow.copy_string "&" w);;
+Got "&"
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

Changing to the same mode is an error:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Lwt_eio.run_eio (fun () -> assert false);;
+WARNING: Exception: Failure("Already in Eio context!")
+ No backtrace available.
Exception: Failure "Already in Eio context!".
```

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Lwt_eio.run_lwt @@ fun () ->
Lwt_eio.run_lwt (fun () -> assert false);;
+WARNING: Exception: Failure("Already in Lwt context!")
+ No backtrace available.
Exception: Failure "Already in Lwt context!".
```

`await_eio` resumes in Lwt mode:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
let p, r = Promise.create () in
Fiber.both
(fun () ->
Lwt_eio.run_lwt @@ fun () ->
let* () = Lwt_eio.Promise.await_eio p in
Fiber.yield ();
assert false
)
(fun () -> Promise.resolve r ());;
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

`await_eio_result` resumes in Lwt mode:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
let p = Fiber.fork_promise ~sw (fun () -> Fiber.yield (); traceln "Eio done") in
Lwt_eio.run_lwt @@ fun () ->
let* () = Lwt_eio.Promise.await_eio_result p in
Fiber.yield ();
assert false;;
+Eio done
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

`run_eio` resumes in Lwt mode:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~debug:true ~clock:env#clock @@ fun _ ->
Switch.run @@ fun sw ->
Lwt_eio.run_lwt @@ fun () ->
let* () = Lwt_eio.run_eio Fiber.yield in
Fiber.yield ();
assert false;;
WARNING: Attempt to perform effect in Lwt context
Exception: Invalid_argument "Attempt to perform effect in Lwt context".
```

Debug mode is off by default:

```ocaml
# Eio_main.run @@ fun env ->
Lwt_eio.with_event_loop ~clock:env#clock @@ fun _ ->
Lwt_eio.run_lwt @@ fun () ->
Fiber.yield ();
Lwt.return "Didn't notice";;
- : string = "Didn't notice"
```