Skip to content

Commit

Permalink
Add support for domain local await
Browse files Browse the repository at this point in the history
Co-authored-by: Thomas Leonard <talex5@gmail.com>
  • Loading branch information
polytypic and talex5 committed May 3, 2023
1 parent 180c44d commit f55c105
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM ocaml/opam:debian-11-ocaml-5.0
# Make sure we're using opam-2.1:
RUN sudo ln -sf /usr/bin/opam-2.1 /usr/bin/opam
# Ensure opam-repository is up-to-date:
RUN cd opam-repository && git pull -q origin 055c6b85dfb77ee9d7a1ae35dced61b6fd64838a && opam update
RUN cd opam-repository && git pull -q origin 24ff26d7dbf5de564035bbb7d39414bc7f7262d3 && opam update
# Install utop for interactive use:
RUN opam install utop fmt
# Install Eio's dependencies (adding just the opam files first to help with caching):
Expand Down
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Eio replaces existing concurrency libraries such as Lwt
* [Lwt](#lwt)
* [Unix and System Threads](#unix-and-system-threads)
* [Domainslib](#domainslib)
* [kcas](#kcas)
* [Best Practices](#best-practices)
* [Switches](#switches-1)
* [Casting](#casting)
Expand Down Expand Up @@ -1604,6 +1605,65 @@ while most Eio functions can only be used from Eio domains.
The bridge function `run_in_pool` makes use of the fact that `Domainslib.Task.async` is able to run from
an Eio domain, and `Eio.Promise.resolve` is able to run from a Domainslib one.

### kcas

Eio provides the support [kcas][] requires to implement blocking in the
lock-free software transactional memory (STM) implementation that it provides.
This means that one can use all the composable lock-free data structures and
primitives for communication and synchronization implemented using **kcas** to
communicate and synchronize between Eio fibers, raw domains, and any other
schedulers that provide the domain local await mechanism.

To demonstrate **kcas**

```ocaml
# #require "kcas"
# open Kcas
```

let's first create a couple of shared memory locations

```ocaml
# let x = Loc.make 0
val x : int Loc.t = <abstr>
# let y = Loc.make 0
val y : int Loc.t = <abstr>
```

and spawn a domain

```ocaml
# let foreign_domain = Domain.spawn @@ fun () ->
let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in
Loc.set y 22;
x
val foreign_domain : int Domain.t = <abstr>
```

that first waits for one of the locations to change value and then writes to the
other location.

Then we run a Eio program

```ocaml
# let y = Eio_main.run @@ fun _env ->
Loc.set x 20;
Loc.get_as (fun y -> Retry.unless (y <> 0); y) y
val y : int = 22
```

that first writes to the location the other domain is waiting on and then waits
for the other domain to write to the other location.

Joining with the other domain

```ocaml
# y + Domain.join foreign_domain
- : int = 42
```

we arrive at the answer.

## Best Practices

This section contains some recommendations for designing library APIs for use with Eio.
Expand Down Expand Up @@ -1762,5 +1822,6 @@ Some background about the effects system can be found in:
[Eio.Semaphore]: https://ocaml-multicore.github.io/eio/eio/Eio/Semaphore/index.html
[Eio.Condition]: https://ocaml-multicore.github.io/eio/eio/Eio/Condition/index.html
[Domainslib]: https://github.com/ocaml-multicore/domainslib
[kcas]: https://github.com/ocaml-multicore/kcas
[Meio]: https://github.com/tarides/meio
[Lambda Capabilities]: https://roscidus.com/blog/blog/2023/04/26/lambda-capabilities/
2 changes: 2 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
(psq (>= 0.2.0))
(fmt (>= 0.8.9))
(hmap (>= 0.8.1))
(domain-local-await (>= 0.1.0))
(crowbar (and (>= 0.2) :with-test))
(mtime (>= 2.0.0))
(kcas (and (>= 0.3.0) :with-test))
(mdx (and (>= 2.2.0) :with-test))
(alcotest (and (>= 1.4.0) :with-test))
(dscheck (and (>= 0.1.0) :with-test))))
Expand Down
2 changes: 2 additions & 0 deletions eio.opam
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ depends: [
"psq" {>= "0.2.0"}
"fmt" {>= "0.8.9"}
"hmap" {>= "0.8.1"}
"domain-local-await" {>= "0.1.0"}
"crowbar" {>= "0.2" & with-test}
"mtime" {>= "2.0.0"}
"kcas" {>= "0.3.0" & with-test}
"mdx" {>= "2.2.0" & with-test}
"alcotest" {>= "1.4.0" & with-test}
"dscheck" {>= "0.1.0" & with-test}
Expand Down
22 changes: 22 additions & 0 deletions lib_eio/core/dla.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
let prepare_for_await () =
let state = Atomic.make `Init in
let release () =
if Atomic.get state != `Released then
match Atomic.exchange state `Released with
| `Awaiting enqueue -> enqueue (Ok ())
| _ -> ()
and await () =
if Atomic.get state != `Released then
Suspend.enter @@ fun ctx enqueue ->
let awaiting = `Awaiting enqueue in
if Atomic.compare_and_set state `Init awaiting then (
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Atomic.compare_and_set state awaiting `Released then (
enqueue (Error ex)
)
)
) else (
enqueue (Ok ())
)
in
Domain_local_await.{ release; await }
2 changes: 1 addition & 1 deletion lib_eio/core/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name eio__core)
(public_name eio.core)
(libraries cstruct hmap lwt-dllist fmt optint))
(libraries cstruct hmap lwt-dllist fmt optint domain-local-await))
2 changes: 2 additions & 0 deletions lib_eio/core/eio__core.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ module Private = struct
| Fork = Fiber.Fork
| Get_context = Cancel.Get_context
end

module Dla = Dla
end
3 changes: 3 additions & 0 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -806,4 +806,7 @@ module Private : sig
(** Backends should use this for {!Eio.Stdenv.debug}. *)
end

module Dla : sig
val prepare_for_await : unit -> Domain_local_await.t
end
end
6 changes: 5 additions & 1 deletion lib_eio/mock/backend.ml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ let run main =
in
let new_fiber = Fiber_context.make_root () in
let result = ref None in
let Exit_scheduler = fork ~new_fiber (fun () -> result := Some (main ())) in
let Exit_scheduler =
Domain_local_await.using
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () -> result := Some (main ()))) in
match !result with
| None -> raise Deadlock_detected
| Some x -> x
20 changes: 12 additions & 8 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,18 @@ let run ~extra_effects st main arg =
let result = ref None in
let `Exit_scheduler =
let new_fiber = Fiber_context.make_root () in
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
| x -> result := Some (Ok x)
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
result := Some (Error (ex, bt))
Domain_local_await.using
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
| x -> result := Some (Ok x)
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
result := Some (Error (ex, bt))
)
)
)
in
Expand Down
8 changes: 6 additions & 2 deletions lib_eio_posix/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,12 @@ let run ~extra_effects t main x =
let result = ref None in
let `Exit_scheduler =
let new_fiber = Fiber_context.make_root () in
fork ~new_fiber (fun () ->
result := Some (with_op t main x);
Domain_local_await.using
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () ->
result := Some (with_op t main x);
)
)
in
match !result with
Expand Down

0 comments on commit f55c105

Please sign in to comment.