Skip to content

Commit

Permalink
WIP: Add support for domain local await
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Apr 24, 2023
1 parent fc9960e commit 0a7d1fd
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 12 deletions.
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Eio replaces existing concurrency libraries such as Lwt
* [Lwt](#lwt)
* [Unix and System Threads](#unix-and-system-threads)
* [Domainslib](#domainslib)
* [kcas](#kcas)
* [Best Practices](#best-practices)
* [Switches](#switches-1)
* [Casting](#casting)
Expand Down Expand Up @@ -1616,6 +1617,65 @@ while most Eio functions can only be used from Eio domains.
The bridge function `run_in_pool` makes use of the fact that `Domainslib.Task.async` is able to run from
an Eio domain, and `Eio.Promise.resolve` is able to run from a Domainslib one.

### kcas

Eio provides the domain local await mechanism that the [kcas][] library uses to
provide blocking support for the lock-free software transactional memory (STM)
implementation that it provides. This means that one can use all the composable
lock-free data structures and primitives for communication and synchronization
implemented using **kcas** to communicate and synchronize between Eio fibers,
raw domains, and any other schedulers that provide the domain local await
mechanism.

To demonstrate **kcas**

```ocaml
# open Kcas
```

let's first create a couple of shared memory locations

```ocaml
# let x = Loc.make 0
val x : int Loc.t = <abstr>
# let y = Loc.make 0
val y : int Loc.t = <abstr>
```

and spawn a domain

```ocaml
# let foreign_domain = Domain.spawn @@ fun () ->
let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in
Loc.set y 22;
x
val foreign_domain : int Domain.t = <abstr>
```

that first waits for one of the locations to change value and then writes to the
other location.

Then we run a Eio program

```ocaml
# let y = Eio_main.run @@ fun _env ->
Loc.set x 20;
Loc.get_as (fun y -> Retry.unless (y <> 0); y) y
val y : int = 22
```

that first writes to the location the other domain is waiting on and then waits
for the other domain to write to the other location.

Joining with the other domain

```ocaml
# y + Domain.join foreign_domain
- : int = 42
```

we arrive at the answer.

## Best Practices

This section contains some recommendations for designing library APIs for use with Eio.
Expand Down Expand Up @@ -1776,3 +1836,4 @@ Some background about the effects system can be found in:
[Eio.Semaphore]: https://ocaml-multicore.github.io/eio/eio/Eio/Semaphore/index.html
[Eio.Condition]: https://ocaml-multicore.github.io/eio/eio/Eio/Condition/index.html
[Domainslib]: https://github.com/ocaml-multicore/domainslib
[kcas]: https://github.com/ocaml-multicore/kcas
34 changes: 34 additions & 0 deletions lib_eio/core/dla.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module Domain_local_await = Kcas.Domain_local_await

let prepare_for_await mode =
let state =
Atomic.make
(mode :> [ Domain_local_await.mode | `Released | `Awaiting of _ ])
in
let release () =
if Atomic.get state != `Released then
match Atomic.exchange state `Released with
| `Awaiting b -> Broadcast.resume_all b
| _ -> ()
and await () =
match Atomic.get state with
| `Released -> ()
| `Awaiting _ -> failwith "await called twice"
| (`Cancelable | `Protected) as was ->
let b = Broadcast.create () in
if Atomic.compare_and_set state was (`Awaiting b) then
let resume ctx enqueue =
match Broadcast.suspend b (fun () -> enqueue (Ok ())) with
| None -> ()
| Some request -> (
match Atomic.get state with
| `Awaiting _ ->
Cancel.Fiber_context.set_cancel_fn ctx @@ fun ex ->
if Broadcast.cancel request then enqueue (Error ex)
| `Cancelable | `Protected | `Released ->
if Broadcast.cancel request then enqueue (Ok ()))
in
if was == `Cancelable then Suspend.enter resume
else Suspend.enter_unchecked resume
in
Domain_local_await.{ release; await }
2 changes: 1 addition & 1 deletion lib_eio/core/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name eio__core)
(public_name eio.core)
(libraries cstruct hmap lwt-dllist fmt optint))
(libraries cstruct hmap lwt-dllist fmt optint kcas))
2 changes: 2 additions & 0 deletions lib_eio/core/eio__core.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ module Private = struct
| Fork = Fiber.Fork
| Get_context = Cancel.Get_context
end

module Dla = Dla
end
3 changes: 3 additions & 0 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -806,4 +806,7 @@ module Private : sig
(** Backends should use this for {!Eio.Stdenv.debug}. *)
end

module Dla : sig
val prepare_for_await : Kcas.Domain_local_await.mode -> Kcas.Domain_local_await.t
end
end
6 changes: 5 additions & 1 deletion lib_eio/mock/backend.ml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ let run main =
in
let new_fiber = Fiber_context.make_root () in
let result = ref None in
let Exit_scheduler = fork ~new_fiber (fun () -> result := Some (main ())) in
let Exit_scheduler =
Kcas.Domain_local_await.using
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () -> result := Some (main ()))) in
match !result with
| None -> raise Deadlock_detected
| Some x -> x
20 changes: 12 additions & 8 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,18 @@ let run ~extra_effects st main arg =
let result = ref None in
let `Exit_scheduler =
let new_fiber = Fiber_context.make_root () in
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
| x -> result := Some (Ok x)
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
result := Some (Error (ex, bt))
Kcas.Domain_local_await.using
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
| x -> result := Some (Ok x)
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
result := Some (Error (ex, bt))
)
)
)
in
Expand Down
8 changes: 6 additions & 2 deletions lib_eio_posix/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,12 @@ let run ~extra_effects t main x =
let result = ref None in
let `Exit_scheduler =
let new_fiber = Fiber_context.make_root () in
fork ~new_fiber (fun () ->
result := Some (with_op t main x);
Kcas.Domain_local_await.using
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () ->
result := Some (with_op t main x);
)
)
in
match !result with
Expand Down

0 comments on commit 0a7d1fd

Please sign in to comment.