From f55c105f8acf5a66cffb94333393a8f4b7787027 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Mon, 24 Apr 2023 21:21:02 +0300 Subject: [PATCH] Add support for domain local await Co-authored-by: Thomas Leonard --- Dockerfile | 2 +- README.md | 61 ++++++++++++++++++++++++++++++++++++++ dune-project | 2 ++ eio.opam | 2 ++ lib_eio/core/dla.ml | 22 ++++++++++++++ lib_eio/core/dune | 2 +- lib_eio/core/eio__core.ml | 2 ++ lib_eio/core/eio__core.mli | 3 ++ lib_eio/mock/backend.ml | 6 +++- lib_eio_linux/sched.ml | 20 ++++++++----- lib_eio_posix/sched.ml | 8 +++-- 11 files changed, 117 insertions(+), 13 deletions(-) create mode 100644 lib_eio/core/dla.ml diff --git a/Dockerfile b/Dockerfile index f6a3084b8..4081cf20c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ FROM ocaml/opam:debian-11-ocaml-5.0 # Make sure we're using opam-2.1: RUN sudo ln -sf /usr/bin/opam-2.1 /usr/bin/opam # Ensure opam-repository is up-to-date: -RUN cd opam-repository && git pull -q origin 055c6b85dfb77ee9d7a1ae35dced61b6fd64838a && opam update +RUN cd opam-repository && git pull -q origin 24ff26d7dbf5de564035bbb7d39414bc7f7262d3 && opam update # Install utop for interactive use: RUN opam install utop fmt # Install Eio's dependencies (adding just the opam files first to help with caching): diff --git a/README.md b/README.md index 76afe122a..135694092 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ Eio replaces existing concurrency libraries such as Lwt * [Lwt](#lwt) * [Unix and System Threads](#unix-and-system-threads) * [Domainslib](#domainslib) + * [kcas](#kcas) * [Best Practices](#best-practices) * [Switches](#switches-1) * [Casting](#casting) @@ -1604,6 +1605,65 @@ while most Eio functions can only be used from Eio domains. The bridge function `run_in_pool` makes use of the fact that `Domainslib.Task.async` is able to run from an Eio domain, and `Eio.Promise.resolve` is able to run from a Domainslib one. +### kcas + +Eio provides the support [kcas][] requires to implement blocking in the +lock-free software transactional memory (STM) implementation that it provides. +This means that one can use all the composable lock-free data structures and +primitives for communication and synchronization implemented using **kcas** to +communicate and synchronize between Eio fibers, raw domains, and any other +schedulers that provide the domain local await mechanism. + +To demonstrate **kcas** + +```ocaml +# #require "kcas" +# open Kcas +``` + +let's first create a couple of shared memory locations + +```ocaml +# let x = Loc.make 0 +val x : int Loc.t = +# let y = Loc.make 0 +val y : int Loc.t = +``` + +and spawn a domain + +```ocaml +# let foreign_domain = Domain.spawn @@ fun () -> + let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in + Loc.set y 22; + x +val foreign_domain : int Domain.t = +``` + +that first waits for one of the locations to change value and then writes to the +other location. + +Then we run a Eio program + +```ocaml +# let y = Eio_main.run @@ fun _env -> + Loc.set x 20; + Loc.get_as (fun y -> Retry.unless (y <> 0); y) y +val y : int = 22 +``` + +that first writes to the location the other domain is waiting on and then waits +for the other domain to write to the other location. + +Joining with the other domain + +```ocaml +# y + Domain.join foreign_domain +- : int = 42 +``` + +we arrive at the answer. + ## Best Practices This section contains some recommendations for designing library APIs for use with Eio. @@ -1762,5 +1822,6 @@ Some background about the effects system can be found in: [Eio.Semaphore]: https://ocaml-multicore.github.io/eio/eio/Eio/Semaphore/index.html [Eio.Condition]: https://ocaml-multicore.github.io/eio/eio/Eio/Condition/index.html [Domainslib]: https://github.com/ocaml-multicore/domainslib +[kcas]: https://github.com/ocaml-multicore/kcas [Meio]: https://github.com/tarides/meio [Lambda Capabilities]: https://roscidus.com/blog/blog/2023/04/26/lambda-capabilities/ diff --git a/dune-project b/dune-project index 620fdf225..3af6ed896 100644 --- a/dune-project +++ b/dune-project @@ -25,8 +25,10 @@ (psq (>= 0.2.0)) (fmt (>= 0.8.9)) (hmap (>= 0.8.1)) + (domain-local-await (>= 0.1.0)) (crowbar (and (>= 0.2) :with-test)) (mtime (>= 2.0.0)) + (kcas (and (>= 0.3.0) :with-test)) (mdx (and (>= 2.2.0) :with-test)) (alcotest (and (>= 1.4.0) :with-test)) (dscheck (and (>= 0.1.0) :with-test)))) diff --git a/eio.opam b/eio.opam index 5400b05f6..c5fcecd6b 100644 --- a/eio.opam +++ b/eio.opam @@ -18,8 +18,10 @@ depends: [ "psq" {>= "0.2.0"} "fmt" {>= "0.8.9"} "hmap" {>= "0.8.1"} + "domain-local-await" {>= "0.1.0"} "crowbar" {>= "0.2" & with-test} "mtime" {>= "2.0.0"} + "kcas" {>= "0.3.0" & with-test} "mdx" {>= "2.2.0" & with-test} "alcotest" {>= "1.4.0" & with-test} "dscheck" {>= "0.1.0" & with-test} diff --git a/lib_eio/core/dla.ml b/lib_eio/core/dla.ml new file mode 100644 index 000000000..165ee3545 --- /dev/null +++ b/lib_eio/core/dla.ml @@ -0,0 +1,22 @@ +let prepare_for_await () = + let state = Atomic.make `Init in + let release () = + if Atomic.get state != `Released then + match Atomic.exchange state `Released with + | `Awaiting enqueue -> enqueue (Ok ()) + | _ -> () + and await () = + if Atomic.get state != `Released then + Suspend.enter @@ fun ctx enqueue -> + let awaiting = `Awaiting enqueue in + if Atomic.compare_and_set state `Init awaiting then ( + Cancel.Fiber_context.set_cancel_fn ctx (fun ex -> + if Atomic.compare_and_set state awaiting `Released then ( + enqueue (Error ex) + ) + ) + ) else ( + enqueue (Ok ()) + ) + in + Domain_local_await.{ release; await } diff --git a/lib_eio/core/dune b/lib_eio/core/dune index 4063be78f..17cda9c66 100644 --- a/lib_eio/core/dune +++ b/lib_eio/core/dune @@ -1,4 +1,4 @@ (library (name eio__core) (public_name eio.core) - (libraries cstruct hmap lwt-dllist fmt optint)) + (libraries cstruct hmap lwt-dllist fmt optint domain-local-await)) diff --git a/lib_eio/core/eio__core.ml b/lib_eio/core/eio__core.ml index a5d4e4b2c..fc7e072c9 100644 --- a/lib_eio/core/eio__core.ml +++ b/lib_eio/core/eio__core.ml @@ -19,4 +19,6 @@ module Private = struct | Fork = Fiber.Fork | Get_context = Cancel.Get_context end + + module Dla = Dla end diff --git a/lib_eio/core/eio__core.mli b/lib_eio/core/eio__core.mli index adfc10322..1e0a9d0fc 100644 --- a/lib_eio/core/eio__core.mli +++ b/lib_eio/core/eio__core.mli @@ -806,4 +806,7 @@ module Private : sig (** Backends should use this for {!Eio.Stdenv.debug}. *) end + module Dla : sig + val prepare_for_await : unit -> Domain_local_await.t + end end diff --git a/lib_eio/mock/backend.ml b/lib_eio/mock/backend.ml index e7683daac..557ca1682 100644 --- a/lib_eio/mock/backend.ml +++ b/lib_eio/mock/backend.ml @@ -63,7 +63,11 @@ let run main = in let new_fiber = Fiber_context.make_root () in let result = ref None in - let Exit_scheduler = fork ~new_fiber (fun () -> result := Some (main ())) in + let Exit_scheduler = + Domain_local_await.using + ~prepare_for_await:Eio.Private.Dla.prepare_for_await + ~while_running:(fun () -> + fork ~new_fiber (fun () -> result := Some (main ()))) in match !result with | None -> raise Deadlock_detected | Some x -> x diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml index 68ad739cc..a357e80e8 100644 --- a/lib_eio_linux/sched.ml +++ b/lib_eio_linux/sched.ml @@ -465,14 +465,18 @@ let run ~extra_effects st main arg = let result = ref None in let `Exit_scheduler = let new_fiber = Fiber_context.make_root () in - fork ~new_fiber (fun () -> - Switch.run_protected (fun sw -> - Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st); - match main arg with - | x -> result := Some (Ok x) - | exception ex -> - let bt = Printexc.get_raw_backtrace () in - result := Some (Error (ex, bt)) + Domain_local_await.using + ~prepare_for_await:Eio.Private.Dla.prepare_for_await + ~while_running:(fun () -> + fork ~new_fiber (fun () -> + Switch.run_protected (fun sw -> + Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st); + match main arg with + | x -> result := Some (Ok x) + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + result := Some (Error (ex, bt)) + ) ) ) in diff --git a/lib_eio_posix/sched.ml b/lib_eio_posix/sched.ml index 5c15f43c5..954e15409 100644 --- a/lib_eio_posix/sched.ml +++ b/lib_eio_posix/sched.ml @@ -351,8 +351,12 @@ let run ~extra_effects t main x = let result = ref None in let `Exit_scheduler = let new_fiber = Fiber_context.make_root () in - fork ~new_fiber (fun () -> - result := Some (with_op t main x); + Domain_local_await.using + ~prepare_for_await:Eio.Private.Dla.prepare_for_await + ~while_running:(fun () -> + fork ~new_fiber (fun () -> + result := Some (with_op t main x); + ) ) in match !result with