Skip to content

Commit

Permalink
Merge pull request #460 from talex5/fork-seq
Browse files Browse the repository at this point in the history
Add Fiber.fork_seq
  • Loading branch information
talex5 authored Mar 14, 2023
2 parents 1b198cc + 9d968dc commit eef510c
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 11 deletions.
13 changes: 7 additions & 6 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
(conflicts
(ocaml-base-compiler (< 5.0.0~beta1))
(ocaml-variants (< 5.0.0~beta1))
(ocaml-system (< 5.0.0~beta1)))
(ocaml-system (< 5.0.0~beta1))
(seq (< 0.3)))
(depends
(ocaml (>= 5.0.0))
(bigstringaf (>= 0.9.0))
Expand All @@ -26,7 +27,7 @@
(hmap (>= 0.8.1))
(crowbar (and (>= 0.2) :with-test))
(mtime (>= 2.0.0))
(mdx (and (>= 1.10.0) :with-test))
(mdx (and (>= 2.2.0) :with-test))
(alcotest (and (>= 1.4.0) :with-test))
(dscheck (and (>= 0.1.0) :with-test))))
(package
Expand All @@ -36,7 +37,7 @@
(depends
(alcotest (and (>= 1.4.0) :with-test))
(eio (= :version))
(mdx (and (>= 1.10.0) :with-test))
(mdx (and (>= 2.2.0) :with-test))
(logs (>= 0.7.0))
(fmt (>= 0.8.9))
(cmdliner (and (>= 1.1.0) :with-test))
Expand All @@ -48,7 +49,7 @@
(depends
(eio (= :version))
(iomux (>= 0.2))
(mdx (and (>= 1.10.0) :with-test))
(mdx (and (>= 2.2.0) :with-test))
(fmt (>= 0.8.9))))
(package
(name eio_luv)
Expand All @@ -58,15 +59,15 @@
(eio (= :version))
(luv (>= 0.5.11))
(luv_unix (>= 0.5.0))
(mdx (and (>= 1.10.0) :with-test))
(mdx (and (>= 2.2.0) :with-test))
(fmt (>= 0.8.9))))
(package
(name eio_main)
(synopsis "Effect-based direct-style IO mainloop for OCaml")
(description "Selects an appropriate Eio backend for the current platform.")
(depopts eio_luv)
(depends
(mdx (and (>= 1.10.0) :with-test))
(mdx (and (>= 2.2.0) :with-test))
(eio_linux (and (= :version) (= :os "linux")))
(eio_posix (and (= :version) (<> :os "windows")))
(eio_luv (and (= :version) (or (= :os "windows") :with-test)))))
Expand Down
3 changes: 2 additions & 1 deletion eio.opam
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ depends: [
"hmap" {>= "0.8.1"}
"crowbar" {>= "0.2" & with-test}
"mtime" {>= "2.0.0"}
"mdx" {>= "1.10.0" & with-test}
"mdx" {>= "2.2.0" & with-test}
"alcotest" {>= "1.4.0" & with-test}
"dscheck" {>= "0.1.0" & with-test}
"odoc" {with-doc}
Expand All @@ -29,6 +29,7 @@ conflicts: [
"ocaml-base-compiler" {< "5.0.0~beta1"}
"ocaml-variants" {< "5.0.0~beta1"}
"ocaml-system" {< "5.0.0~beta1"}
"seq" {< "0.3"}
]
build: [
["dune" "subst"] {dev}
Expand Down
2 changes: 1 addition & 1 deletion eio_linux.opam
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ depends: [
"dune" {>= "3.7"}
"alcotest" {>= "1.4.0" & with-test}
"eio" {= version}
"mdx" {>= "1.10.0" & with-test}
"mdx" {>= "2.2.0" & with-test}
"logs" {>= "0.7.0"}
"fmt" {>= "0.8.9"}
"cmdliner" {>= "1.1.0" & with-test}
Expand Down
2 changes: 1 addition & 1 deletion eio_luv.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ depends: [
"eio" {= version}
"luv" {>= "0.5.11"}
"luv_unix" {>= "0.5.0"}
"mdx" {>= "1.10.0" & with-test}
"mdx" {>= "2.2.0" & with-test}
"fmt" {>= "0.8.9"}
"odoc" {with-doc}
]
Expand Down
2 changes: 1 addition & 1 deletion eio_main.opam
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ doc: "https://ocaml-multicore.github.io/eio/"
bug-reports: "https://github.com/ocaml-multicore/eio/issues"
depends: [
"dune" {>= "3.7"}
"mdx" {>= "1.10.0" & with-test}
"mdx" {>= "2.2.0" & with-test}
"eio_linux" {= version & os = "linux"}
"eio_posix" {= version & os != "windows"}
"eio_luv" {= version & os = "windows" | with-test}
Expand Down
2 changes: 1 addition & 1 deletion eio_posix.opam
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ depends: [
"dune" {>= "3.7"}
"eio" {= version}
"iomux" {>= "0.2"}
"mdx" {>= "1.10.0" & with-test}
"mdx" {>= "2.2.0" & with-test}
"fmt" {>= "0.8.9"}
"odoc" {with-doc}
]
Expand Down
30 changes: 30 additions & 0 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,36 @@ module Fiber : sig
This is just a convenience wrapper around {!fork}.
If [fn] raises an exception then the promise is resolved to the error, but [sw] is not failed. *)

val fork_seq : sw:Switch.t -> (('a -> unit) -> unit) -> 'a Seq.t
(** [fork_seq ~sw fn] creates (but does not start) a new fiber to run [fn yield].
Requesting the next item from the returned sequence resumes the fiber until it
calls [yield x], using [x] value as the next item in the sequence. If [fn]
returns without producing a value then the result is {!Seq.Nil} (end-of-sequence).
The returned sequence can be consumed safely from another domain.
[fn] itself always runs in the domain that called [fork_seq].
Example:
{[
Switch.run @@ fun sw ->
let seq = Fiber.fork_seq ~sw (fun yield ->
for i = 1 to 3 do
traceln "Yielding %d" i;
yield i
done
) in
Seq.iter (traceln "Got: %d") seq
]}
If [fn] raises an exception then the consumer receives it.
If the consumer cancels while awaiting a value, the producer is cancelled when
it next calls [yield].
It is an error to request two items at once, or to request items out of sequence.
@param sw When the switch finishes, the fiber is cancelled (if still running).
Attempting to read from the sequence after this raises an exception. *)

val fork_daemon : sw:Switch.t -> (unit -> [`Stop_daemon]) -> unit
(** [fork_daemon] is like {!fork} except that instead of waiting for the fiber to finish,
the switch will cancel it once all non-daemon fibers are done.
Expand Down
107 changes: 107 additions & 0 deletions lib_eio/core/fiber.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[@@@alert "-unstable"]

type _ Effect.t += Fork : Cancel.fiber_context * (unit -> unit) -> unit Effect.t

let yield () =
Expand Down Expand Up @@ -284,3 +286,108 @@ let with_binding var value fn =
let without_binding var fn =
let ctx = Effect.perform Cancel.Get_context in
Cancel.Fiber_context.with_vars ctx (Hmap.rem var ctx.vars) fn

(* Coroutines.
[fork_coroutine ~sw fn] creates a new fiber for [fn]. [fn] immediately suspends, setting its state to
[Ready enqueue]. A consumer can resume it by setting the state to [Running] and calling [enqueue],
while suspending itself. The consumer passes in its own [enqueue] function. They run alternatively
like this, switching between the [Ready] and [Running] states.
To finish, the coroutine fiber can set the state to [Finished] or [Failed],
or the client can set the state to [Client_cancelled].
*)

(* Note: we could easily generalise this to [('in, 'out) coroutine] if that was useful. *)
type 'out coroutine =
[ `Init
| `Ready of [`Running of 'out Suspend.enqueue] Suspend.enqueue
| `Running of 'out Suspend.enqueue
| `Finished
| `Client_cancelled of exn
| `Failed of exn ]

(* The only good reason for the state to change while the coroutine is running is if the client
cancels. Return the exception in that case. If the coroutine is buggy it might e.g. fork two
fibers and yield twice for a single request - return Invalid_argument in that case. *)
let unwrap_cancelled state =
match Atomic.get state with
| `Client_cancelled ex -> ex
| `Finished | `Failed _ -> Invalid_argument "Coroutine has already stopped!"
| `Ready _ -> Invalid_argument "Coroutine has already yielded!"
| `Init | `Running _ -> Invalid_argument "Coroutine in unexpected state!"

let run_coroutine ~state fn =
let await_request ~prev ~on_suspend =
(* Suspend and wait for the consumer to resume us: *)
Suspend.enter (fun ctx enqueue ->
let ready = `Ready enqueue in
if Atomic.compare_and_set state prev ready then (
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Atomic.compare_and_set state ready (`Failed ex) then
enqueue (Error ex);
(* else the client enqueued a resume for us; handle that instead *)
);
on_suspend ()
) else (
enqueue (Error (unwrap_cancelled state))
)
)
in
let current_state = ref (await_request ~prev:`Init ~on_suspend:ignore) in
fn (fun v ->
(* The coroutine wants to yield the value [v] and suspend. *)
let `Running enqueue as prev = !current_state in
current_state := await_request ~prev ~on_suspend:(fun () -> enqueue (Ok (Some v)))
);
(* [fn] has finished. End the stream. *)
if Atomic.compare_and_set state (!current_state :> _ coroutine) `Finished then (
let `Running enqueue = !current_state in
enqueue (Ok None)
) else (
raise (unwrap_cancelled state)
)

let fork_coroutine ~sw fn =
let state = Atomic.make `Init in
fork_daemon ~sw (fun () ->
try
run_coroutine ~state fn;
`Stop_daemon
with ex ->
match ex, Atomic.exchange state (`Failed ex) with
| _, `Running enqueue ->
(* A client is waiting for us. Send the error there. Also do this if we were cancelled. *)
enqueue (Error ex);
`Stop_daemon
| Cancel.Cancelled _, _ ->
(* The client isn't waiting (probably it got cancelled, then we tried to yield to it and got cancelled too).
If it tries to resume us later it will see the error. *)
`Stop_daemon
| _ ->
(* Something unexpected happened. Re-raise. *)
raise ex
);
fun () ->
Suspend.enter (fun ctx enqueue ->
let rec aux () =
match Atomic.get state with
| `Ready resume as prev ->
let running = `Running enqueue in
if Atomic.compare_and_set state prev running then (
resume (Ok running);
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Atomic.compare_and_set state running (`Client_cancelled ex) then
enqueue (Error ex)
)
) else aux ()
| `Finished -> enqueue (Error (Invalid_argument "Coroutine has already finished!"))
| `Failed ex | `Client_cancelled ex -> enqueue (Error (Invalid_argument ("Coroutine has already failed: " ^ Printexc.to_string ex)))
| `Running _ -> enqueue (Error (Invalid_argument "Coroutine is still running!"))
| `Init -> assert false
in
aux ()
)

let fork_seq ~sw fn =
Seq.of_dispenser (fork_coroutine ~sw fn)
Loading

0 comments on commit eef510c

Please sign in to comment.