Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for domain local await #494

Merged
merged 1 commit into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM ocaml/opam:debian-11-ocaml-5.0
# Make sure we're using opam-2.1:
RUN sudo ln -sf /usr/bin/opam-2.1 /usr/bin/opam
# Ensure opam-repository is up-to-date:
RUN cd opam-repository && git pull -q origin 055c6b85dfb77ee9d7a1ae35dced61b6fd64838a && opam update
RUN cd opam-repository && git pull -q origin 24ff26d7dbf5de564035bbb7d39414bc7f7262d3 && opam update
# Install utop for interactive use:
RUN opam install utop fmt
# Install Eio's dependencies (adding just the opam files first to help with caching):
Expand Down
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Eio replaces existing concurrency libraries such as Lwt
* [Lwt](#lwt)
* [Unix and System Threads](#unix-and-system-threads)
* [Domainslib](#domainslib)
* [kcas](#kcas)
* [Best Practices](#best-practices)
* [Switches](#switches-1)
* [Casting](#casting)
Expand Down Expand Up @@ -1604,6 +1605,65 @@ while most Eio functions can only be used from Eio domains.
The bridge function `run_in_pool` makes use of the fact that `Domainslib.Task.async` is able to run from
an Eio domain, and `Eio.Promise.resolve` is able to run from a Domainslib one.

### kcas

Eio provides the support [kcas][] requires to implement blocking in the
lock-free software transactional memory (STM) implementation that it provides.
This means that one can use all the composable lock-free data structures and
primitives for communication and synchronization implemented using **kcas** to
communicate and synchronize between Eio fibers, raw domains, and any other
schedulers that provide the domain local await mechanism.

To demonstrate **kcas**

```ocaml
# #require "kcas"
# open Kcas
```

let's first create a couple of shared memory locations

```ocaml
# let x = Loc.make 0
val x : int Loc.t = <abstr>
# let y = Loc.make 0
val y : int Loc.t = <abstr>
```

and spawn a domain

```ocaml
# let foreign_domain = Domain.spawn @@ fun () ->
let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in
Loc.set y 22;
x
val foreign_domain : int Domain.t = <abstr>
```

that first waits for one of the locations to change value and then writes to the
other location.

Then we run a Eio program

```ocaml
# let y = Eio_main.run @@ fun _env ->
Loc.set x 20;
Loc.get_as (fun y -> Retry.unless (y <> 0); y) y
val y : int = 22
```

that first writes to the location the other domain is waiting on and then waits
for the other domain to write to the other location.

Joining with the other domain

```ocaml
# y + Domain.join foreign_domain
- : int = 42
```

we arrive at the answer.

## Best Practices

This section contains some recommendations for designing library APIs for use with Eio.
Expand Down Expand Up @@ -1762,5 +1822,6 @@ Some background about the effects system can be found in:
[Eio.Semaphore]: https://ocaml-multicore.github.io/eio/eio/Eio/Semaphore/index.html
[Eio.Condition]: https://ocaml-multicore.github.io/eio/eio/Eio/Condition/index.html
[Domainslib]: https://github.com/ocaml-multicore/domainslib
[kcas]: https://github.com/ocaml-multicore/kcas
[Meio]: https://github.com/tarides/meio
[Lambda Capabilities]: https://roscidus.com/blog/blog/2023/04/26/lambda-capabilities/
2 changes: 2 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
(psq (>= 0.2.0))
(fmt (>= 0.8.9))
(hmap (>= 0.8.1))
(domain-local-await (>= 0.1.0))
(crowbar (and (>= 0.2) :with-test))
(mtime (>= 2.0.0))
(kcas (and (>= 0.3.0) :with-test))
(mdx (and (>= 2.2.0) :with-test))
(alcotest (and (>= 1.4.0) :with-test))
(dscheck (and (>= 0.1.0) :with-test))))
Expand Down
2 changes: 2 additions & 0 deletions eio.opam
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ depends: [
"psq" {>= "0.2.0"}
"fmt" {>= "0.8.9"}
"hmap" {>= "0.8.1"}
"domain-local-await" {>= "0.1.0"}
"crowbar" {>= "0.2" & with-test}
"mtime" {>= "2.0.0"}
"kcas" {>= "0.3.0" & with-test}
"mdx" {>= "2.2.0" & with-test}
"alcotest" {>= "1.4.0" & with-test}
"dscheck" {>= "0.1.0" & with-test}
Expand Down
22 changes: 22 additions & 0 deletions lib_eio/core/dla.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
let prepare_for_await () =
let state = Atomic.make `Init in
let release () =
if Atomic.get state != `Released then
match Atomic.exchange state `Released with
| `Awaiting enqueue -> enqueue (Ok ())
| _ -> ()
and await () =
if Atomic.get state != `Released then
Suspend.enter @@ fun ctx enqueue ->
let awaiting = `Awaiting enqueue in
if Atomic.compare_and_set state `Init awaiting then (
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Atomic.compare_and_set state awaiting `Released then (
enqueue (Error ex)
)
)
) else (
enqueue (Ok ())
)
in
Domain_local_await.{ release; await }
2 changes: 1 addition & 1 deletion lib_eio/core/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name eio__core)
(public_name eio.core)
(libraries cstruct hmap lwt-dllist fmt optint))
(libraries cstruct hmap lwt-dllist fmt optint domain-local-await))
2 changes: 2 additions & 0 deletions lib_eio/core/eio__core.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ module Private = struct
| Fork = Fiber.Fork
| Get_context = Cancel.Get_context
end

module Dla = Dla
end
3 changes: 3 additions & 0 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -806,4 +806,7 @@ module Private : sig
(** Backends should use this for {!Eio.Stdenv.debug}. *)
end

module Dla : sig
val prepare_for_await : unit -> Domain_local_await.t
end
end
6 changes: 5 additions & 1 deletion lib_eio/mock/backend.ml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ let run main =
in
let new_fiber = Fiber_context.make_root () in
let result = ref None in
let Exit_scheduler = fork ~new_fiber (fun () -> result := Some (main ())) in
let Exit_scheduler =
Domain_local_await.using
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () -> result := Some (main ()))) in
match !result with
| None -> raise Deadlock_detected
| Some x -> x
20 changes: 12 additions & 8 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,18 @@ let run ~extra_effects st main arg =
let result = ref None in
let `Exit_scheduler =
let new_fiber = Fiber_context.make_root () in
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
| x -> result := Some (Ok x)
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
result := Some (Error (ex, bt))
Domain_local_await.using
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
| x -> result := Some (Ok x)
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
result := Some (Error (ex, bt))
)
)
)
in
Expand Down
8 changes: 6 additions & 2 deletions lib_eio_posix/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,12 @@ let run ~extra_effects t main x =
let result = ref None in
let `Exit_scheduler =
let new_fiber = Fiber_context.make_root () in
fork ~new_fiber (fun () ->
result := Some (with_op t main x);
Domain_local_await.using
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () ->
result := Some (with_op t main x);
)
)
in
match !result with
Expand Down