From 6beace512c2425e55d10db2e4e7d2c787717e1e3 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Mon, 6 Feb 2023 11:19:10 +0000 Subject: [PATCH] Initial Eio_posix backend Co-authored-by: Christiano Haesbaert --- Makefile | 3 + dune-project | 14 +- eio_linux.opam | 2 +- eio_luv.opam | 2 +- eio_main.opam | 1 + eio_posix.opam | 33 +++ lib_eio_posix/domain_mgr.ml | 77 +++++++ lib_eio_posix/dune | 9 + lib_eio_posix/eio_posix.ml | 48 ++++ lib_eio_posix/eio_posix.mli | 24 ++ lib_eio_posix/eio_posix_stubs.c | 107 +++++++++ lib_eio_posix/err.ml | 27 +++ lib_eio_posix/fd.ml | 60 +++++ lib_eio_posix/fd.mli | 50 ++++ lib_eio_posix/flow.ml | 95 ++++++++ lib_eio_posix/fs.ml | 134 +++++++++++ lib_eio_posix/low_level.ml | 157 +++++++++++++ lib_eio_posix/low_level.mli | 53 +++++ lib_eio_posix/net.ml | 151 ++++++++++++ lib_eio_posix/sched.ml | 356 +++++++++++++++++++++++++++++ lib_eio_posix/sched.mli | 45 ++++ lib_eio_posix/test/dune | 4 + lib_eio_posix/test/poll.md | 22 ++ lib_eio_posix/time.ml | 19 ++ lib_main/dune | 3 + lib_main/eio_main.ml | 5 +- lib_main/posix_backend.disabled.ml | 1 + lib_main/posix_backend.enabled.ml | 1 + 28 files changed, 1498 insertions(+), 5 deletions(-) create mode 100644 eio_posix.opam create mode 100644 lib_eio_posix/domain_mgr.ml create mode 100644 lib_eio_posix/dune create mode 100644 lib_eio_posix/eio_posix.ml create mode 100644 lib_eio_posix/eio_posix.mli create mode 100644 lib_eio_posix/eio_posix_stubs.c create mode 100644 lib_eio_posix/err.ml create mode 100644 lib_eio_posix/fd.ml create mode 100644 lib_eio_posix/fd.mli create mode 100644 lib_eio_posix/flow.ml create mode 100644 lib_eio_posix/fs.ml create mode 100644 lib_eio_posix/low_level.ml create mode 100644 lib_eio_posix/low_level.mli create mode 100644 lib_eio_posix/net.ml create mode 100644 lib_eio_posix/sched.ml create mode 100644 lib_eio_posix/sched.mli create mode 100644 lib_eio_posix/test/dune create mode 100644 lib_eio_posix/test/poll.md create mode 100644 lib_eio_posix/time.ml create mode 100644 lib_main/posix_backend.disabled.ml create mode 100644 lib_main/posix_backend.enabled.ml diff --git a/Makefile b/Makefile index 73b136040..d508248b7 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ bench: test_luv: EIO_BACKEND=luv dune runtest +test_posix: + EIO_BACKEND=posix dune runtest + dscheck: dune exec -- ./lib_eio/tests/dscheck/test_rcfd.exe dune exec -- ./lib_eio/tests/dscheck/test_sync.exe diff --git a/dune-project b/dune-project index 31856e41c..4566ce110 100644 --- a/dune-project +++ b/dune-project @@ -32,7 +32,7 @@ (package (name eio_linux) (synopsis "Eio implementation for Linux using io-uring") - (description "An eio implementation for Linux using io-uring.") + (description "An Eio implementation for Linux using io-uring.") (depends (alcotest (and (>= 1.4.0) :with-test)) (eio (= :version)) @@ -41,10 +41,19 @@ (fmt (>= 0.8.9)) (cmdliner (and (>= 1.1.0) :with-test)) (uring (>= 0.5)))) +(package + (name eio_posix) + (synopsis "Eio implementation for POSIX systems") + (description "An Eio implementation for most Unix-like platforms") + (depends + (eio (= :version)) + (iomux (>= 0.2)) + (mdx (and (>= 1.10.0) :with-test)) + (fmt (>= 0.8.9)))) (package (name eio_luv) (synopsis "Eio implementation using luv (libuv)") - (description "An eio implementation for most platforms, using luv.") + (description "An Eio implementation for most platforms, using luv.") (depends (eio (= :version)) (luv (>= 0.5.11)) @@ -58,5 +67,6 @@ (depends (eio_linux (and (= :version) (= :os "linux"))) (mdx (and (>= 1.10.0) :with-test)) + (eio_posix (= :version)) (eio_luv (= :version)))) (using mdx 0.2) diff --git a/eio_linux.opam b/eio_linux.opam index 6cdc8612c..31a39da62 100644 --- a/eio_linux.opam +++ b/eio_linux.opam @@ -1,7 +1,7 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" synopsis: "Eio implementation for Linux using io-uring" -description: "An eio implementation for Linux using io-uring." +description: "An Eio implementation for Linux using io-uring." maintainer: ["anil@recoil.org"] authors: ["Anil Madhavapeddy" "Thomas Leonard"] license: "ISC" diff --git a/eio_luv.opam b/eio_luv.opam index 7abb6772b..1d03e4ffd 100644 --- a/eio_luv.opam +++ b/eio_luv.opam @@ -1,7 +1,7 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" synopsis: "Eio implementation using luv (libuv)" -description: "An eio implementation for most platforms, using luv." +description: "An Eio implementation for most platforms, using luv." maintainer: ["anil@recoil.org"] authors: ["Anil Madhavapeddy" "Thomas Leonard"] license: "ISC" diff --git a/eio_main.opam b/eio_main.opam index d39cb783a..b9ba29bb6 100644 --- a/eio_main.opam +++ b/eio_main.opam @@ -12,6 +12,7 @@ depends: [ "dune" {>= "3.0"} "eio_linux" {= version & os = "linux"} "mdx" {>= "1.10.0" & with-test} + "eio_posix" {= version} "eio_luv" {= version} "odoc" {with-doc} ] diff --git a/eio_posix.opam b/eio_posix.opam new file mode 100644 index 000000000..bf38e83a6 --- /dev/null +++ b/eio_posix.opam @@ -0,0 +1,33 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Eio implementation for POSIX systems" +description: "An Eio implementation for most Unix-like platforms" +maintainer: ["anil@recoil.org"] +authors: ["Anil Madhavapeddy" "Thomas Leonard"] +license: "ISC" +homepage: "https://github.com/ocaml-multicore/eio" +doc: "https://ocaml-multicore.github.io/eio/" +bug-reports: "https://github.com/ocaml-multicore/eio/issues" +depends: [ + "dune" {>= "3.0"} + "eio" {= version} + "iomux" {>= "0.2"} + "mdx" {>= "1.10.0" & with-test} + "fmt" {>= "0.8.9"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/ocaml-multicore/eio.git" diff --git a/lib_eio_posix/domain_mgr.ml b/lib_eio_posix/domain_mgr.ml new file mode 100644 index 000000000..47829a356 --- /dev/null +++ b/lib_eio_posix/domain_mgr.ml @@ -0,0 +1,77 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +open Eio.Std + +[@@@alert "-unstable"] + +(* Run an event loop in the current domain, using [fn x] as the root fiber. *) +let run_event_loop fn x = + Sched.with_sched @@ fun sched -> + let open Effect.Deep in + let extra_effects : _ effect_handler = { + effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option -> + match e with + | Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k (Time.mono_clock : Eio.Time.Mono.t)) + | Eio_unix.Private.Socket_of_fd (sw, close_unix, fd) -> Some (fun k -> + Unix.set_nonblock fd; + let fd = Fd.of_unix ~sw ~blocking:false ~close_unix fd in + continue k (Flow.of_fd fd :> Eio_unix.socket) + ) + | Eio_unix.Private.Socketpair (sw, domain, ty, protocol) -> Some (fun k -> + let a, b = Unix.socketpair ~cloexec:true domain ty protocol in + Unix.set_nonblock a; + Unix.set_nonblock b; + let a = Fd.of_unix ~sw ~blocking:false ~close_unix:true a |> Flow.of_fd in + let b = Fd.of_unix ~sw ~blocking:false ~close_unix:true b |> Flow.of_fd in + continue k ((a :> Eio_unix.socket), (b :> Eio_unix.socket)) + ) + | Eio_unix.Private.Pipe sw -> Some (fun k -> + let r, w = Unix.pipe ~cloexec:true () in + (* See issue #319, PR #327 *) + Unix.set_nonblock r; + Unix.set_nonblock w; + let make x = Flow.of_fd (Fd.of_unix ~sw ~blocking:false ~close_unix:true x) in + let r = (make r :> ) in + let w = (make w :> ) in + continue k (r, w) + ) + | _ -> None + } + in + Sched.run ~extra_effects sched fn x + +let v = object + inherit Eio.Domain_manager.t + + method run_raw fn = + let domain = ref None in + Eio.Private.Suspend.enter (fun _ctx enqueue -> + domain := Some (Domain.spawn (fun () -> Fun.protect fn ~finally:(fun () -> enqueue (Ok ())))) + ); + Domain.join (Option.get !domain) + + method run fn = + let domain = ref None in + Eio.Private.Suspend.enter (fun ctx enqueue -> + let cancelled, set_cancelled = Promise.create () in + Eio.Private.Fiber_context.set_cancel_fn ctx (Promise.resolve set_cancelled); + domain := Some (Domain.spawn (fun () -> + Fun.protect (run_event_loop (fun () -> fn ~cancelled)) + ~finally:(fun () -> enqueue (Ok ())))) + ); + Domain.join (Option.get !domain) +end diff --git a/lib_eio_posix/dune b/lib_eio_posix/dune new file mode 100644 index 000000000..cac4e9627 --- /dev/null +++ b/lib_eio_posix/dune @@ -0,0 +1,9 @@ +(library + (name eio_posix) + (public_name eio_posix) + (enabled_if (= %{os_type} "Unix")) + (foreign_stubs + (language c) + (flags :standard -D_LARGEFILE64_SOURCE) + (names eio_posix_stubs)) + (libraries eio eio.utils eio.unix fmt iomux)) diff --git a/lib_eio_posix/eio_posix.ml b/lib_eio_posix/eio_posix.ml new file mode 100644 index 000000000..30db9c03e --- /dev/null +++ b/lib_eio_posix/eio_posix.ml @@ -0,0 +1,48 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +module Low_level = Low_level + +type stdenv = < + stdin : ; + stdout : ; + stderr : ; + net : Eio.Net.t; + domain_mgr : Eio.Domain_manager.t; + clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; + fs : Eio.Fs.dir Eio.Path.t; + cwd : Eio.Fs.dir Eio.Path.t; + secure_random : Eio.Flow.source; + debug : Eio.Debug.t; +> + +let run main = + (* SIGPIPE makes no sense in a modern application. *) + Sys.(set_signal sigpipe Signal_ignore); + Domain_mgr.run_event_loop main @@ object (_ : stdenv) + method stdin = (Flow.of_fd Low_level.Fd.stdin :> ) + method stdout = (Flow.of_fd Low_level.Fd.stdout :> ) + method stderr = (Flow.of_fd Low_level.Fd.stderr :> ) + method debug = Eio.Private.Debug.v + method clock = Time.clock + method mono_clock = Time.mono_clock + method net = Net.v + method domain_mgr = Domain_mgr.v + method cwd = ((Fs.cwd, "") :> Eio.Fs.dir Eio.Path.t) + method fs = ((Fs.fs, "") :> Eio.Fs.dir Eio.Path.t) + method secure_random = Flow.secure_random + end diff --git a/lib_eio_posix/eio_posix.mli b/lib_eio_posix/eio_posix.mli new file mode 100644 index 000000000..25e912ca0 --- /dev/null +++ b/lib_eio_posix/eio_posix.mli @@ -0,0 +1,24 @@ +(** Fallback Eio backend for POSIX systems. *) + +type stdenv = < + stdin : ; + stdout : ; + stderr : ; + net : Eio.Net.t; + domain_mgr : Eio.Domain_manager.t; + clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; + fs : Eio.Fs.dir Eio.Path.t; + cwd : Eio.Fs.dir Eio.Path.t; + secure_random : Eio.Flow.source; + debug : Eio.Debug.t; +> +(** An extended version of {!Eio.Stdenv.t} with some extra features available on POSIX systems. *) + +val run : (stdenv -> 'a) -> 'a +(** [run main] runs an event loop and calls [main stdenv] inside it. + + For portable code, you should use {!Eio_main.run} instead, which will call this for you if appropriate. *) + +module Low_level = Low_level +(** Low-level API for making POSIX calls directly. *) diff --git a/lib_eio_posix/eio_posix_stubs.c b/lib_eio_posix/eio_posix_stubs.c new file mode 100644 index 000000000..0af7b9605 --- /dev/null +++ b/lib_eio_posix/eio_posix_stubs.c @@ -0,0 +1,107 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#ifdef ARCH_SIXTYFOUR +#define Int63_val(v) Long_val(v) +#else +#define Int63_val(v) (Int64_val(v)) >> 1 +#endif + +CAMLprim value caml_eio_posix_getrandom(value v_ba, value v_off, value v_len) { + CAMLparam1(v_ba); + ssize_t ret; + ssize_t off = (ssize_t)Long_val(v_off); + ssize_t len = (ssize_t)Long_val(v_len); + do { + void *buf = Caml_ba_data_val(v_ba) + off; + caml_enter_blocking_section(); + ret = getrandom(buf, len, 0); + caml_leave_blocking_section(); + } while (ret == -1 && errno == EINTR); + if (ret == -1) uerror("getrandom", Nothing); + CAMLreturn(Val_long(ret)); +} + +static void fill_iov(struct iovec *iov, value v_bufs) { + int n_bufs = Wosize_val(v_bufs); + for (int i = 0; i < n_bufs; i++) { + value v_cs = Field(v_bufs, i); + value v_ba = Field(v_cs, 0); + value v_off = Field(v_cs, 1); + value v_len = Field(v_cs, 2); + iov[i].iov_base = Caml_ba_data_val(v_ba) + Long_val(v_off); + iov[i].iov_len = Long_val(v_len); + } +} + +CAMLprim value caml_eio_posix_readv(value v_fd, value v_bufs) { + CAMLparam1(v_bufs); + ssize_t r; + int n_bufs = Wosize_val(v_bufs); + struct iovec iov[n_bufs]; + + fill_iov(iov, v_bufs); + + r = readv(Int_val(v_fd), iov, n_bufs); + if (r < 0) uerror("readv", Nothing); + + CAMLreturn(Val_long(r)); +} + +CAMLprim value caml_eio_posix_writev(value v_fd, value v_bufs) { + CAMLparam1(v_bufs); + ssize_t r; + int n_bufs = Wosize_val(v_bufs); + struct iovec iov[n_bufs]; + + fill_iov(iov, v_bufs); + + r = writev(Int_val(v_fd), iov, n_bufs); + if (r < 0) uerror("writev", Nothing); + + CAMLreturn(Val_long(r)); +} + +CAMLprim value caml_eio_posix_preadv(value v_fd, value v_bufs, value v_offset) { + CAMLparam2(v_bufs, v_offset); + ssize_t r; + int n_bufs = Wosize_val(v_bufs); + struct iovec iov[n_bufs]; + + fill_iov(iov, v_bufs); + + r = preadv(Int_val(v_fd), iov, n_bufs, Int63_val(v_offset)); + if (r < 0) uerror("preadv", Nothing); + + CAMLreturn(Val_long(r)); +} + +CAMLprim value caml_eio_posix_pwritev(value v_fd, value v_bufs, value v_offset) { + CAMLparam2(v_bufs, v_offset); + ssize_t r; + int n_bufs = Wosize_val(v_bufs); + struct iovec iov[n_bufs]; + + fill_iov(iov, v_bufs); + + r = pwritev(Int_val(v_fd), iov, n_bufs, Int63_val(v_offset)); + if (r < 0) uerror("pwritev", Nothing); + + CAMLreturn(Val_long(r)); +} diff --git a/lib_eio_posix/err.ml b/lib_eio_posix/err.ml new file mode 100644 index 000000000..f8783ef89 --- /dev/null +++ b/lib_eio_posix/err.ml @@ -0,0 +1,27 @@ +type Eio.Exn.Backend.t += + | Outside_sandbox of string * string + | Absolute_path + +let unclassified_error e = Eio.Exn.create (Eio.Exn.X e) + +let () = + Eio.Exn.Backend.register_pp (fun f -> function + | Outside_sandbox (path, dir) -> Fmt.pf f "Outside_sandbox (%S, %S)" path dir; true + | Absolute_path -> Fmt.pf f "Absolute_path"; true + | _ -> false + ) + +let wrap code name arg = + let e = Eio_unix.Unix_error (code, name, arg) in + match code with + | EEXIST -> Eio.Fs.err (Already_exists e) + | ENOENT -> Eio.Fs.err (Not_found e) + | EXDEV | EACCES | EPERM -> Eio.Fs.err (Permission_denied e) + | ECONNREFUSED -> Eio.Net.err (Connection_failure (Refused e)) + | ECONNRESET | EPIPE -> Eio.Net.err (Connection_reset e) + | _ -> unclassified_error e + +let run fn x = + try fn x + with Unix.Unix_error(code, name, arg) -> + raise (wrap code name arg) diff --git a/lib_eio_posix/fd.ml b/lib_eio_posix/fd.ml new file mode 100644 index 000000000..e7ec8d63a --- /dev/null +++ b/lib_eio_posix/fd.ml @@ -0,0 +1,60 @@ +open Eio.Std + +module Rcfd = Eio_unix.Private.Rcfd + +type t = { + fd : Rcfd.t; + + (* stdin, stdout and stderr are blocking, and so need special care. + For these, we first wait for them to become e.g. readable and then hope + that the read doesn't block. This may fail if multiple fibers try to read + at the same time. We could check that it's still readable after being + resumed, but that still won't work if multiple domains read at the same + time. Same problem for writes. *) + blocking : bool; + close_unix : bool; (* Whether closing this also closes the underlying FD. *) + mutable release_hook : Eio.Switch.hook; (* Use this on close to remove switch's [on_release] hook. *) +} + +let err_closed op = Invalid_argument (op ^ ": file descriptor used after calling close!") + +let use_exn op t f = + Rcfd.use t.fd f ~if_closed:(fun () -> raise (err_closed op)) + +let close t = + Switch.remove_hook t.release_hook; + if t.close_unix then ( + if not (Rcfd.close t.fd) then raise (err_closed "close") + ) else ( + match Rcfd.remove t.fd with + | Some _ -> () + | None -> raise (err_closed "close") + ) + +let of_unix_no_hook ?(close_unix=true) ~blocking fd = + { fd = Rcfd.make fd; blocking; close_unix; release_hook = Switch.null_hook } + +let of_unix ~sw ~blocking ~close_unix fd = + let t = of_unix_no_hook ~blocking ~close_unix fd in + t.release_hook <- Switch.on_release_cancellable sw (fun () -> close t); + t + +let is_blocking t = t.blocking + +let stdin = of_unix_no_hook ~blocking:true Unix.stdin +let stdout = of_unix_no_hook ~blocking:true Unix.stdout +let stderr= of_unix_no_hook ~blocking:true Unix.stderr + +let to_unix op t = + match op with + | `Peek -> Rcfd.peek t.fd + | `Take -> + Switch.remove_hook t.release_hook; + match Rcfd.remove t.fd with + | Some fd -> fd + | None -> raise (err_closed "to_unix") + +type has_fd = < fd : t > + +type _ Eio.Generic.ty += FD : t Eio.Generic.ty +let get_fd_opt t = Eio.Generic.probe t FD diff --git a/lib_eio_posix/fd.mli b/lib_eio_posix/fd.mli new file mode 100644 index 000000000..1192a8691 --- /dev/null +++ b/lib_eio_posix/fd.mli @@ -0,0 +1,50 @@ +(** A safe wrapper for {!Unix.file_descr}. *) + +open Eio.Std + +type t +(** A wrapper around a {!Unix.file_descr}. *) + +val of_unix : sw:Switch.t -> blocking:bool -> close_unix:bool -> Unix.file_descr -> t +(** [of_unix ~sw ~blocking ~close_unix fd] wraps [fd]. + + @param sw Close [fd] automatically when [sw] is finished. + @param blocking Indicates whether [fd] is in blocking mode. + Normally you should call [Unix.set_nonblock fd] first and pass [false] here. + @param close_unix Whether {!close} also closes [fd] (this should normally be [true]). *) + +val use_exn : string -> t -> (Unix.file_descr -> 'a) -> 'a +(** [use_exn op t fn] calls [fn wrapped_fd], ensuring that [wrapped_fd] will not be closed + before [fn] returns. + + If [t] is already closed, it raises an exception, using [op] as the name of the failing operation. *) + +val close : t -> unit +(** [close t] marks [t] as closed, so that {!use_exn} can no longer be used to start new operations. + + The wrapped FD will be closed once all current users of the FD have finished (unless [close_unix = false]). + + @raise Invalid_argument if [t] is closed by another fiber first. *) + +val is_blocking : t -> bool +(** [is_blocking t] returns the value of [blocking] passed to {!of_unix}. *) + +val stdin : t +val stdout : t +val stderr : t + +val to_unix : [`Peek | `Take] -> t -> Unix.file_descr +(** [to_unix `Take t] closes [t] without closing the wrapped FD, which it returns to the caller once all operations on it have finished. + + [to_unix `Peek t] returns the wrapped FD directly. You must ensure that it is not closed while using it. *) + +type has_fd = < fd : t > +(** Resources that have FDs are sub-types of [has_fd]. *) + +type _ Eio.Generic.ty += FD : t Eio.Generic.ty +(** Resources that wrap FDs can handle this in their [probe] method to expose the FD. *) + +val get_fd_opt : #Eio.Generic.t -> t option +(** [get_fd_opt r] returns the [t] being wrapped by a resource, if any. + + This just probes [r] using {!FD}. *) diff --git a/lib_eio_posix/flow.ml b/lib_eio_posix/flow.ml new file mode 100644 index 000000000..50801ba8f --- /dev/null +++ b/lib_eio_posix/flow.ml @@ -0,0 +1,95 @@ +let fstat fd = + try + let ust = Low_level.fstat fd in + let st_kind : Eio.File.Stat.kind = + match ust.st_kind with + | Unix.S_REG -> `Regular_file + | Unix.S_DIR -> `Directory + | Unix.S_CHR -> `Character_special + | Unix.S_BLK -> `Block_device + | Unix.S_LNK -> `Symbolic_link + | Unix.S_FIFO -> `Fifo + | Unix.S_SOCK -> `Socket + in + Eio.File.Stat.{ + dev = ust.st_dev |> Int64.of_int; + ino = ust.st_ino |> Int64.of_int; + kind = st_kind; + perm = ust.st_perm; + nlink = ust.st_nlink |> Int64.of_int; + uid = ust.st_uid |> Int64.of_int; + gid = ust.st_gid |> Int64.of_int; + rdev = ust.st_rdev |> Int64.of_int; + size = ust.st_size |> Optint.Int63.of_int64; + atime = ust.st_atime; + mtime = ust.st_mtime; + ctime = ust.st_ctime; + } + with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg + +let write_bufs fd bufs = + try + let rec loop = function + | [] -> () + | bufs -> + let wrote = Low_level.writev fd (Array.of_list bufs) in + loop (Cstruct.shiftv bufs wrote) + in + loop bufs + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + +let copy src dst = + let buf = Cstruct.create 4096 in + try + while true do + let got = Eio.Flow.single_read src buf in + write_bufs dst [Cstruct.sub buf 0 got] + done + with End_of_file -> () + +(* todo: This is very inefficient! Replace with readv. *) +let read fd buf = + let got = + try Low_level.readv fd [| buf |] + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + in + if got = 0 then raise End_of_file + else got + +let shutdown fd cmd = + try + Low_level.shutdown fd @@ match cmd with + | `Receive -> Unix.SHUTDOWN_RECEIVE + | `Send -> Unix.SHUTDOWN_SEND + | `All -> Unix.SHUTDOWN_ALL + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + +let of_fd fd = object (_ : ) + method unix_fd op = Fd.to_unix op fd + method fd = fd + + method read_methods = [] + method copy src = copy src fd + + method pread ~file_offset bufs = Low_level.preadv ~file_offset fd (Array.of_list bufs) + method pwrite ~file_offset bufs = Low_level.pwritev ~file_offset fd (Array.of_list bufs) + + method stat = fstat fd + method read_into buf = read fd buf + method write bufs = write_bufs fd bufs + method shutdown cmd = shutdown fd cmd + method close = Fd.close fd + + method probe : type a. a Eio.Generic.ty -> a option = function + | Fd.FD -> Some fd + | Eio_unix.Private.Unix_file_descr op -> Some (Fd.to_unix op fd) + | _ -> None +end + +let secure_random = object + inherit Eio.Flow.source + + method read_into buf = + Low_level.getrandom buf; + Cstruct.length buf +end diff --git a/lib_eio_posix/fs.ml b/lib_eio_posix/fs.ml new file mode 100644 index 000000000..0c4106634 --- /dev/null +++ b/lib_eio_posix/fs.ml @@ -0,0 +1,134 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +open Eio.Std + +module Fiber_context = Eio.Private.Fiber_context + +type _ Eio.Generic.ty += Dir_resolve_new : (string -> string) Eio.Generic.ty +let dir_resolve_new x = Eio.Generic.probe x Dir_resolve_new + +(* TODO: use openat, etc, to make this race-free. + For now, we make a best-efforts attempt to enforce the sandboxing using realpath and [`NOFOLLOW]. *) +class dir ~label (dir_path : string) = object (self) + inherit Eio.Fs.dir + + val mutable closed = false + + method! probe : type a. a Eio.Generic.ty -> a option = function + | Dir_resolve_new -> Some self#resolve_new + | _ -> None + + (* Resolve a relative path to an absolute one, with no symlinks. + @raise Eio.Fs.Permission_denied if it's outside of [dir_path]. *) + method private resolve path = + if closed then Fmt.invalid_arg "Attempt to use closed directory %S" dir_path; + if Filename.is_relative path then ( + let dir_path = Err.run Low_level.realpath dir_path in + let full = Err.run Low_level.realpath (Filename.concat dir_path path) in + let prefix_len = String.length dir_path + 1 in + if String.length full >= prefix_len && String.sub full 0 prefix_len = dir_path ^ Filename.dir_sep then + full + else if full = dir_path then + full + else + raise @@ Eio.Fs.err (Permission_denied (Err.Outside_sandbox (full, dir_path))) + ) else ( + raise @@ Eio.Fs.err (Permission_denied Err.Absolute_path) + ) + + (* We want to create [path]. Check that the parent is in the sandbox. *) + method private resolve_new path = + let dir, leaf = Filename.dirname path, Filename.basename path in + if leaf = ".." then Fmt.failwith "New path %S ends in '..'!" path + else + let dir = self#resolve dir in + Filename.concat dir leaf + + method open_in ~sw path = + let fd = Low_level.openfile ~sw (self#resolve path) [(* O_NOFOLLOW; *) O_RDONLY] 0 in (* XXX *) + (Flow.of_fd fd :> ) + + method open_out ~sw ~append ~create path = + let mode, flags = + match create with + | `Never -> 0, [] + | `If_missing perm -> perm, Unix.[O_CREAT] + | `Or_truncate perm -> perm, Unix.[O_CREAT; O_TRUNC] + | `Exclusive perm -> perm, Unix.[O_CREAT; O_EXCL] + in + let flags = if append then Unix.O_APPEND :: flags else flags in + let flags = Unix.O_RDWR :: O_NONBLOCK (* :: O_NOFOLLOW *) :: flags in (* XXX *) + let real_path = + if create = `Never then self#resolve path + else self#resolve_new path + in + let fd = Err.run (Low_level.openfile ~sw real_path flags) mode in + (Flow.of_fd fd :> ) + + method open_dir ~sw path = + Switch.check sw; + let label = Filename.basename path in + let d = new dir ~label (self#resolve path) in + Switch.on_release sw (fun () -> d#close); + d + + (* libuv doesn't seem to provide a race-free way to do this. *) + method mkdir ~perm path = + let real_path = self#resolve_new path in + Err.run (Low_level.mkdir real_path) perm + + (* libuv doesn't seem to provide a race-free way to do this. *) + method unlink path = + let dir_path = Filename.dirname path in + let leaf = Filename.basename path in + let real_dir_path = self#resolve dir_path in + Err.run Low_level.unlink (Filename.concat real_dir_path leaf) + + (* libuv doesn't seem to provide a race-free way to do this. *) + method rmdir path = + let dir_path = Filename.dirname path in + let leaf = Filename.basename path in + let real_dir_path = self#resolve dir_path in + Err.run Low_level.rmdir (Filename.concat real_dir_path leaf) + + method read_dir path = + let path = self#resolve path in + Err.run Low_level.readdir path + |> Array.to_list + + method rename old_path new_dir new_path = + match dir_resolve_new new_dir with + | None -> invalid_arg "Target is not an eio_posix directory!" + | Some new_resolve_new -> + let old_path = self#resolve old_path in + let new_path = new_resolve_new new_path in + Err.run (Low_level.rename old_path) new_path + + method close = closed <- true + + method pp f = Fmt.string f (String.escaped label) +end + +(* Full access to the filesystem. *) +let fs = object + inherit dir ~label:"fs" "." + + (* No checks *) + method! private resolve path = path +end + +let cwd = new dir ~label:"cwd" "." diff --git a/lib_eio_posix/low_level.ml b/lib_eio_posix/low_level.ml new file mode 100644 index 000000000..d33395d12 --- /dev/null +++ b/lib_eio_posix/low_level.ml @@ -0,0 +1,157 @@ +open Eio.Std + +(* There are some things that should be improved here: + + - Blocking FDs (e.g. stdout) wait for the FD to become ready and then do a blocking operation. + This might not succeed, and will block the whole domain in that case. + Ideally, all blocking operations should happen in a sys-thread instead. + + - Various other operations, such as listing a directory, should also be done in a sys-thread + to avoid high latencies in the main domain. *) + +type ty = Read | Write + +module Fd = Fd + +let await_readable fd = + Fd.use_exn "await_readable" fd @@ fun fd -> + Sched.enter @@ fun t k -> + Sched.await_readable t k fd + +let await_writable fd = + Fd.use_exn "await_writable" fd @@ fun fd -> + Sched.enter @@ fun t k -> + Sched.await_writable t k fd + +let rec do_nonblocking ty fn fd = + try fn fd with + | Unix.Unix_error (EINTR, _, _) -> do_nonblocking ty fn fd (* Just in case *) + | Unix.Unix_error((EAGAIN | EWOULDBLOCK | EINPROGRESS), _, _) -> + Sched.enter (fun t k -> + match ty with + | Read -> Sched.await_readable t k fd + | Write -> Sched.await_writable t k fd + ); + do_nonblocking ty fn fd + +let read fd buf start len = + if Fd.is_blocking fd then await_readable fd; + Fd.use_exn "read" fd @@ fun fd -> + do_nonblocking Read (fun fd -> Unix.read fd buf start len) fd + +let write fd buf start len = + if Fd.is_blocking fd then await_writable fd; + Fd.use_exn "write" fd @@ fun fd -> + do_nonblocking Write (fun fd -> Unix.write fd buf start len) fd + +let sleep_until time = + Sched.enter @@ fun t k -> + Sched.await_timeout t k time + +let socket ~sw socket_domain socket_type protocol = + Switch.check sw; + let sock_unix = Unix.socket ~cloexec:true socket_domain socket_type protocol in + Unix.set_nonblock sock_unix; + Fd.of_unix ~sw ~blocking:false ~close_unix:true sock_unix + +let rec connect fd addr = + try + Fd.use_exn "connect" fd (fun fd -> Unix.connect fd addr) + with + | Unix.Unix_error (EINTR, _, _) -> connect fd addr (* Just in case *) + | Unix.Unix_error ((EAGAIN | EWOULDBLOCK | EALREADY | EINPROGRESS), _, _) -> + await_writable fd; + match Fd.use_exn "connect" fd Unix.getsockopt_error with + | None -> connect fd addr + | Some code -> raise (Err.wrap code "connect" "") + +let accept ~sw sock = + Fd.use_exn "accept" sock @@ fun sock -> + let client, addr = + do_nonblocking Read (fun fd -> Switch.check sw; Unix.accept ~cloexec:true fd) sock + in + Unix.set_nonblock client; + Fd.of_unix ~sw ~blocking:false ~close_unix:true client, addr + +let shutdown sock cmd = + Fd.use_exn "shutdown" sock (fun fd -> Unix.shutdown fd cmd) + +let send_msg fd ~dst buf = + Fd.use_exn "send_msg" fd @@ fun fd -> + do_nonblocking Write (fun fd -> Unix.sendto fd buf 0 (Bytes.length buf) [] dst) fd + +let recv_msg fd buf = + Fd.use_exn "recv_msg" fd @@ fun fd -> + do_nonblocking Read (fun fd -> Unix.recvfrom fd buf 0 (Bytes.length buf) []) fd + +external eio_getrandom : Cstruct.buffer -> int -> int -> int = "caml_eio_posix_getrandom" + +let getrandom { Cstruct.buffer; off; len } = + let rec loop n = + if n = len then + () + else + loop (n + eio_getrandom buffer (off + n) (len - n)) + in + loop 0 + +let fstat fd = + Fd.use_exn "fstat" fd Unix.LargeFile.fstat + +let lstat = Unix.LargeFile.lstat + +let rec openfile ~sw path flags perm = + match Unix.openfile path flags perm with + | fd -> + Unix.set_nonblock fd; + Fd.of_unix ~sw ~blocking:false ~close_unix:true fd + | exception Unix.Unix_error(Unix.EINTR, _, "") -> openfile ~sw path flags perm + +let realpath = Unix.realpath +let rmdir = Unix.rmdir +let mkdir = Unix.mkdir +let unlink = Unix.unlink +let rename = Unix.rename + +let read_entries h = + let rec aux acc = + match Unix.readdir h with + | "." | ".." -> aux acc + | leaf -> aux (leaf :: acc) + | exception End_of_file -> Array.of_list acc + in + aux [] + +let readdir path = + let h = Unix.opendir path in + match read_entries h with + | r -> Unix.closedir h; r + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + Unix.closedir h; Printexc.raise_with_backtrace ex bt + +external eio_readv : Unix.file_descr -> Cstruct.t array -> int = "caml_eio_posix_readv" +external eio_writev : Unix.file_descr -> Cstruct.t array -> int = "caml_eio_posix_writev" + +external eio_preadv : Unix.file_descr -> Cstruct.t array -> Optint.Int63.t -> int = "caml_eio_posix_preadv" +external eio_pwritev : Unix.file_descr -> Cstruct.t array -> Optint.Int63.t -> int = "caml_eio_posix_pwritev" + +let readv fd bufs = + if Fd.is_blocking fd then await_readable fd; + Fd.use_exn "readv" fd @@ fun fd -> + do_nonblocking Read (fun fd -> eio_readv fd bufs) fd + +let writev fd bufs = + if Fd.is_blocking fd then await_writable fd; + Fd.use_exn "writev" fd @@ fun fd -> + do_nonblocking Write (fun fd -> eio_writev fd bufs) fd + +let preadv ~file_offset fd bufs = + if Fd.is_blocking fd then await_readable fd; + Fd.use_exn "preadv" fd @@ fun fd -> + do_nonblocking Read (fun fd -> eio_preadv fd bufs file_offset) fd + +let pwritev ~file_offset fd bufs = + if Fd.is_blocking fd then await_writable fd; + Fd.use_exn "pwritev" fd @@ fun fd -> + do_nonblocking Write (fun fd -> eio_pwritev fd bufs file_offset) fd diff --git a/lib_eio_posix/low_level.mli b/lib_eio_posix/low_level.mli new file mode 100644 index 000000000..56ca7c388 --- /dev/null +++ b/lib_eio_posix/low_level.mli @@ -0,0 +1,53 @@ +(** This module provides an effects-based API for calling POSIX functions. + + Normally it's better to use the cross-platform {!Eio} APIs instead, + which uses these functions automatically where appropriate. + + These functions mostly copy the POSIX APIs directly, except that: + + + They suspend the calling fiber instead of returning [EAGAIN] or similar. + + They handle [EINTR] by automatically restarting the call. + + They wrap {!Unix.file_descr} in {!Fd}, to avoid use-after-close bugs. + + They attach new FDs to switches, to avoid resource leaks. *) + +open Eio.Std + +module Fd = Fd + +val await_readable : Fd.t -> unit +val await_writable : Fd.t -> unit + +val sleep_until : Mtime.t -> unit + +val read : Fd.t -> bytes -> int -> int -> int +val write : Fd.t -> bytes -> int -> int -> int + +val socket : sw:Switch.t -> Unix.socket_domain -> Unix.socket_type -> int -> Fd.t +val connect : Fd.t -> Unix.sockaddr -> unit +val accept : sw:Switch.t -> Fd.t -> Fd.t * Unix.sockaddr + +val shutdown : Fd.t -> Unix.shutdown_command -> unit + +val recv_msg : Fd.t -> bytes -> int * Unix.sockaddr +val send_msg : Fd.t -> dst:Unix.sockaddr -> bytes -> int + +val getrandom : Cstruct.t -> unit + +val fstat : Fd.t -> Unix.LargeFile.stats +val lstat : string -> Unix.LargeFile.stats + +val openfile : sw:Switch.t -> string -> Unix.open_flag list -> int -> Fd.t +val realpath : string -> string + +val mkdir : string -> int -> unit +val unlink : string -> unit +val rmdir : string -> unit +val rename : string -> string -> unit + +val readdir : string -> string array + +val readv : Fd.t -> Cstruct.t array -> int +val writev : Fd.t -> Cstruct.t array -> int + +val preadv : file_offset:Optint.Int63.t -> Fd.t -> Cstruct.t array -> int +val pwritev : file_offset:Optint.Int63.t -> Fd.t -> Cstruct.t array -> int diff --git a/lib_eio_posix/net.ml b/lib_eio_posix/net.ml new file mode 100644 index 000000000..37933e1f5 --- /dev/null +++ b/lib_eio_posix/net.ml @@ -0,0 +1,151 @@ +open Eio.Std + +let socket_domain_of = function + | `Unix _ -> Unix.PF_UNIX + | `UdpV4 -> Unix.PF_INET + | `UdpV6 -> Unix.PF_INET6 + | `Udp (host, _) + | `Tcp (host, _) -> + Eio.Net.Ipaddr.fold host + ~v4:(fun _ -> Unix.PF_INET) + ~v6:(fun _ -> Unix.PF_INET6) + +let listening_socket fd = object + inherit Eio.Net.listening_socket + + method! probe : type a. a Eio.Generic.ty -> a option = function + | Eio_unix.Private.Unix_file_descr op -> Some (Fd.to_unix op fd) + | Fd.FD -> Some fd + | _ -> None + + method close = Fd.close fd + + method accept ~sw = + let client, client_addr = Err.run (Low_level.accept ~sw) fd in + let client_addr = match client_addr with + | Unix.ADDR_UNIX path -> `Unix path + | Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Ipaddr.of_unix host, port) + in + let flow = (Flow.of_fd client :> ) in + flow, client_addr +end + +let udp_socket sock = object + inherit Eio.Net.datagram_socket + + method close = Fd.close sock + + method send sockaddr buf = + let addr = match sockaddr with + | `Udp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.ADDR_INET (host, port) + in + let sent = Err.run (Low_level.send_msg sock ~dst:addr) (Cstruct.to_bytes buf) in + assert (sent = Cstruct.length buf) + + method recv buf = + let b = Bytes.create (Cstruct.length buf) in + let recv, addr = Err.run (Low_level.recv_msg sock) b in + Cstruct.blit_from_bytes b 0 buf 0 recv; + match addr with + | Unix.ADDR_INET (inet, port) -> + `Udp (Eio_unix.Ipaddr.of_unix inet, port), recv + | Unix.ADDR_UNIX _ -> + raise (Failure "Expected INET UDP socket address but got Unix domain socket address.") +end + +(* https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml *) +let getaddrinfo ~service node = + let to_eio_sockaddr_t {Unix.ai_family; ai_addr; ai_socktype; ai_protocol; _ } = + match ai_family, ai_socktype, ai_addr with + | (Unix.PF_INET | PF_INET6), + (Unix.SOCK_STREAM | SOCK_DGRAM), + Unix.ADDR_INET (inet_addr,port) -> ( + match ai_protocol with + | 6 -> Some (`Tcp (Eio_unix.Ipaddr.of_unix inet_addr, port)) + | 17 -> Some (`Udp (Eio_unix.Ipaddr.of_unix inet_addr, port)) + | _ -> None) + | _ -> None + in + Err.run Eio_unix.run_in_systhread @@ fun () -> + let rec aux () = + try + Unix.getaddrinfo node service [] + |> List.filter_map to_eio_sockaddr_t + with Unix.Unix_error (EINTR, _, _) -> aux () + in + aux () + +let v = object + inherit Eio.Net.t + + method listen ~reuse_addr ~reuse_port ~backlog ~sw listen_addr = + let socket_type, addr = + match listen_addr with + | `Unix path -> + if reuse_addr then ( + match Low_level.lstat path with + | Unix.{ st_kind = S_SOCK; _ } -> Unix.unlink path + | _ -> () + | exception Unix.Unix_error (Unix.ENOENT, _, _) -> () + | exception Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg + ); + Unix.SOCK_STREAM, Unix.ADDR_UNIX path + | `Tcp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) + in + let sock = Low_level.socket ~sw (socket_domain_of listen_addr) socket_type 0 in + (* For Unix domain sockets, remove the path when done (except for abstract sockets). *) + begin match listen_addr with + | `Unix path -> + if String.length path > 0 && path.[0] <> Char.chr 0 then + Switch.on_release sw (fun () -> Unix.unlink path) + | `Tcp _ -> () + end; + Low_level.Fd.use_exn "listen" sock (fun fd -> + if reuse_addr then + Unix.setsockopt fd Unix.SO_REUSEADDR true; + if reuse_port then + Unix.setsockopt fd Unix.SO_REUSEPORT true; + Unix.bind fd addr; + Unix.listen fd backlog; + ); + listening_socket sock + + method connect ~sw connect_addr = + let socket_type, addr = + match connect_addr with + | `Unix path -> Unix.SOCK_STREAM, Unix.ADDR_UNIX path + | `Tcp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) + in + let sock = Low_level.socket ~sw (socket_domain_of connect_addr) socket_type 0 in + try + Low_level.connect sock addr; + (Flow.of_fd sock :> ) + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + + method datagram_socket ~reuse_addr ~reuse_port ~sw saddr = + let sock = Low_level.socket ~sw (socket_domain_of saddr) Unix.SOCK_DGRAM 0 in + begin match saddr with + | `Udp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + let addr = Unix.ADDR_INET (host, port) in + Fd.use_exn "datagram_socket" sock (fun fd -> + if reuse_addr then + Unix.setsockopt fd Unix.SO_REUSEADDR true; + if reuse_port then + Unix.setsockopt fd Unix.SO_REUSEPORT true; + Unix.bind fd addr + ) + | `UdpV4 | `UdpV6 -> () + end; + udp_socket sock + + method getaddrinfo = getaddrinfo + + method getnameinfo = Eio_unix.getnameinfo +end diff --git a/lib_eio_posix/sched.ml b/lib_eio_posix/sched.ml new file mode 100644 index 000000000..c88730615 --- /dev/null +++ b/lib_eio_posix/sched.ml @@ -0,0 +1,356 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +module Suspended = Eio_utils.Suspended +module Zzz = Eio_utils.Zzz +module Lf_queue = Eio_utils.Lf_queue +module Fiber_context = Eio.Private.Fiber_context +module Ctf = Eio.Private.Ctf +module Rcfd = Eio_unix.Private.Rcfd +module Poll = Iomux.Poll + +type exit = [`Exit_scheduler] + +let system_thread = Ctf.mint_id () + +type runnable = + | IO : runnable + | Thread : 'a Suspended.t * 'a -> runnable + | Failed_thread : 'a Suspended.t * exn -> runnable + +type fd_event_waiters = { + read : unit Suspended.t Lwt_dllist.t; + write : unit Suspended.t Lwt_dllist.t; +} + +type t = { + (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) + run_q : runnable Lf_queue.t; + + poll : Poll.t; + mutable poll_maxi : int; (* The highest index ever used in [poll]. *) + + (* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event. + In that case, [need_wakeup = true] and you must signal using [eventfd]. *) + eventfd : Rcfd.t; + eventfd_r : Unix.file_descr; + + fd_map : (Unix.file_descr, fd_event_waiters) Hashtbl.t; + + mutable active_ops : int; + + (* If [false], the main thread will check [run_q] before sleeping again + (possibly because an event has been or will be sent to [eventfd]). + It can therefore be set to [false] in either of these cases: + - By the receiving thread because it will check [run_q] before sleeping, or + - By the sending thread because it will signal the main thread later *) + need_wakeup : bool Atomic.t; + + sleep_q: Zzz.t; +} + +let wake_buffer = Bytes.of_string "!" + +(* This can be called from any systhread (including ones not running Eio), + and also from signal handlers or GC finalizers. It must not take any locks. *) +let wakeup t = + Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *) + Rcfd.use t.eventfd + ~if_closed:ignore (* Domain has shut down (presumably after handling the event) *) + (fun fd -> + (* This can fail if the pipe is full, but then a wake up is pending anyway. *) + ignore (Unix.single_write fd wake_buffer 0 1 : int); + ) + +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) +let enqueue_thread t k x = + Lf_queue.push t.run_q (Thread (k, x)); + if Atomic.get t.need_wakeup then wakeup t + +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) +let enqueue_failed_thread t k ex = + Lf_queue.push t.run_q (Failed_thread (k, ex)); + if Atomic.get t.need_wakeup then wakeup t + +(* Can only be called from our own domain, so no need to check for wakeup. *) +let enqueue_at_head t k = + Lf_queue.push_head t.run_q (Thread (k, ())) + +let get_waiters t fd = + match Hashtbl.find_opt t.fd_map fd with + | Some x -> x + | None -> + let x = { read = Lwt_dllist.create (); write = Lwt_dllist.create () } in + Hashtbl.add t.fd_map fd x; + x + +let clear_event_fd t = + let buf = Bytes.create 8 in + let got = Unix.read t.eventfd_r buf 0 (Bytes.length buf) in + assert (got > 0) + +(* Update [t.poll]'s entry for [fd] to match [waiters]. *) +let update t waiters fd = + let fdi = Iomux.Util.fd_of_unix fd in + let flags = + match not (Lwt_dllist.is_empty waiters.read), + not (Lwt_dllist.is_empty waiters.write) with + | false, false -> Poll.Flags.empty + | true, false -> Poll.Flags.pollin + | false, true -> Poll.Flags.pollout + | true, true -> Poll.Flags.(pollin + pollout) + in + if flags = Poll.Flags.empty then ( + Poll.invalidate_index t.poll fdi; + Hashtbl.remove t.fd_map fd + ) else ( + Poll.set_index t.poll fdi fd flags; + if fdi > t.poll_maxi then + t.poll_maxi <- fdi + ) + +let resume t node = + t.active_ops <- t.active_ops - 1; + let k : unit Suspended.t = Lwt_dllist.get node in + Fiber_context.clear_cancel_fn k.fiber; + enqueue_thread t k () + +(* Called when poll indicates that an event we requested for [fd] is ready. *) +let ready t _index fd revents = + if fd == t.eventfd_r then ( + clear_event_fd t + (* The scheduler will now look at the run queue again and notice any new items. *) + ) else ( + let waiters = Hashtbl.find t.fd_map fd in + let pending = Lwt_dllist.create () in + if Poll.Flags.(mem revents (pollout + pollhup + pollerr)) then + Lwt_dllist.transfer_l waiters.write pending; + if Poll.Flags.(mem revents (pollin + pollhup + pollerr)) then + Lwt_dllist.transfer_l waiters.read pending; + (* If pending has things, it means we modified the waiters, refresh our view *) + if not (Lwt_dllist.is_empty pending) then + update t waiters fd; + Lwt_dllist.iter_node_r (resume t) pending + ) + +let add_read t fd k = + let waiters = get_waiters t fd in + let was_empty = Lwt_dllist.is_empty waiters.read in + let node = Lwt_dllist.add_l k waiters.read in + if was_empty then update t waiters fd; + node + +let add_write t fd k = + let waiters = get_waiters t fd in + let was_empty = Lwt_dllist.is_empty waiters.write in + let node = Lwt_dllist.add_l k waiters.write in + if was_empty then update t waiters fd; + node + +(* Switch control to the next ready continuation. + If none is ready, wait until we get an event to wake one and then switch. + Returns only if there is nothing to do and no queued operations. *) +let rec next t : [`Exit_scheduler] = + (* Wakeup any paused fibers *) + match Lf_queue.pop t.run_q with + | None -> assert false (* We should always have an IO job, at least *) + | Some Thread (k, v) -> (* We already have a runnable task *) + Fiber_context.clear_cancel_fn k.fiber; + Suspended.continue k v + | Some Failed_thread (k, ex) -> + Fiber_context.clear_cancel_fn k.fiber; + Suspended.discontinue k ex + | Some IO -> (* Note: be sure to re-inject the IO task before continuing! *) + (* This is not a fair scheduler: timers always run before all other IO *) + let now = Mtime_clock.now () in + match Zzz.pop ~now t.sleep_q with + | `Due k -> + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + Suspended.continue k () (* A sleeping task is now due *) + | `Wait_until _ | `Nothing as next_due -> + let timeout = + match next_due with + | `Wait_until time -> + let time = Mtime.to_uint64_ns time in + let now = Mtime.to_uint64_ns now in + let diff_ns = Int64.sub time now in + Poll.Nanoseconds diff_ns + | `Nothing -> Poll.Infinite + in + if not (Lf_queue.is_empty t.run_q) then ( + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + next t + ) else if timeout = Infinite && t.active_ops = 0 then ( + (* Nothing further can happen at this point. *) + Lf_queue.close t.run_q; (* Just to catch bugs if something tries to enqueue later *) + `Exit_scheduler + ) else ( + Atomic.set t.need_wakeup true; + if Lf_queue.is_empty t.run_q then ( + (* At this point we're not going to check [run_q] again before sleeping. + If [need_wakeup] is still [true], this is fine because we don't promise to do that. + If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) + Ctf.(note_hiatus Wait_for_work); + let nready = + try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout + with Unix.Unix_error(Unix.EINTR, _, "") -> 0 + in + Ctf.note_resume system_thread; + Atomic.set t.need_wakeup false; + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + Poll.iter_ready t.poll nready (ready t); + next t + ) else ( + (* Someone added a new job while we were setting [need_wakeup] to [true]. + They might or might not have seen that, so we can't be sure they'll send an event. *) + Atomic.set t.need_wakeup false; + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + next t + ) + ) + +let with_sched fn = + let run_q = Lf_queue.create () in + Lf_queue.push run_q IO; + let sleep_q = Zzz.create () in + let eventfd_r, eventfd_w = Unix.pipe ~cloexec:true () in + Unix.set_nonblock eventfd_r; + Unix.set_nonblock eventfd_w; + let eventfd = Rcfd.make eventfd_w in + let cleanup () = + Unix.close eventfd_r; + let was_open = Rcfd.close eventfd in + assert was_open + in + let poll = Poll.create () in + let fd_map = Hashtbl.create 10 in + let t = { run_q; poll; poll_maxi = 0; fd_map; eventfd; eventfd_r; + active_ops = 0; need_wakeup = Atomic.make false; sleep_q } in + let eventfd_ri = Iomux.Util.fd_of_unix eventfd_r in + Poll.set_index t.poll eventfd_ri eventfd_r Poll.Flags.pollin; + if eventfd_ri > t.poll_maxi then + t.poll_maxi <- eventfd_ri; + match fn t with + | x -> cleanup (); x + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + cleanup (); + Printexc.raise_with_backtrace ex bt + +let await_readable t (k : unit Suspended.t) fd = + match Fiber_context.get_error k.fiber with + | Some e -> Suspended.discontinue k e + | None -> + t.active_ops <- t.active_ops + 1; + let node = add_read t fd k in + Fiber_context.set_cancel_fn k.fiber (fun ex -> + Lwt_dllist.remove node; + t.active_ops <- t.active_ops - 1; + enqueue_failed_thread t k ex + ); + next t + +let await_writable t (k : unit Suspended.t) fd = + match Fiber_context.get_error k.fiber with + | Some e -> Suspended.discontinue k e + | None -> + t.active_ops <- t.active_ops + 1; + let node = add_write t fd k in + Fiber_context.set_cancel_fn k.fiber (fun ex -> + Lwt_dllist.remove node; + t.active_ops <- t.active_ops - 1; + enqueue_failed_thread t k ex + ); + next t + +let get_enqueue t k = function + | Ok v -> enqueue_thread t k v + | Error ex -> enqueue_failed_thread t k ex + +let await_timeout t (k : unit Suspended.t) time = + match Fiber_context.get_error k.fiber with + | Some e -> Suspended.discontinue k e + | None -> + let node = Zzz.add t.sleep_q time k in + Fiber_context.set_cancel_fn k.fiber (fun ex -> + Zzz.remove t.sleep_q node; + enqueue_failed_thread t k ex + ); + next t + +let with_op t fn x = + t.active_ops <- t.active_ops + 1; + match fn x with + | r -> + t.active_ops <- t.active_ops - 1; + r + | exception ex -> + t.active_ops <- t.active_ops - 1; + raise ex + +[@@@alert "-unstable"] + +type _ Effect.t += Enter : (t -> 'a Eio_utils.Suspended.t -> [`Exit_scheduler]) -> 'a Effect.t +let enter fn = Effect.perform (Enter fn) + +let run ~extra_effects t main x = + let rec fork ~new_fiber:fiber fn = + let open Effect.Deep in + Ctf.note_switch (Fiber_context.tid fiber); + match_with fn () + { retc = (fun () -> Fiber_context.destroy fiber; next t); + exnc = (fun ex -> + Fiber_context.destroy fiber; + Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ()) + ); + effc = fun (type a) (e : a Effect.t) -> + match e with + | Enter fn -> Some (fun k -> + match Fiber_context.get_error fiber with + | Some e -> discontinue k e + | None -> fn t { Suspended.k; fiber } + ) + | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fiber) + | Eio.Private.Effects.Suspend f -> Some (fun k -> + let k = { Suspended.k; fiber } in + let enqueue = get_enqueue t k in + f fiber enqueue; + next t + ) + | Eio.Private.Effects.Fork (new_fiber, f) -> Some (fun k -> + let k = { Suspended.k; fiber } in + enqueue_at_head t k; + fork ~new_fiber f + ) + | Eio_unix.Private.Await_readable fd -> Some (fun k -> + await_readable t { Suspended.k; fiber } fd + ) + | Eio_unix.Private.Await_writable fd -> Some (fun k -> + await_writable t { Suspended.k; fiber } fd + ) + | e -> extra_effects.Effect.Deep.effc e + } + in + let result = ref None in + let `Exit_scheduler = + let new_fiber = Fiber_context.make_root () in + fork ~new_fiber (fun () -> + result := Some (with_op t main x); + ) + in + match !result with + | Some x -> x + | None -> failwith "BUG in scheduler: deadlock detected" diff --git a/lib_eio_posix/sched.mli b/lib_eio_posix/sched.mli new file mode 100644 index 000000000..77bb9e921 --- /dev/null +++ b/lib_eio_posix/sched.mli @@ -0,0 +1,45 @@ +(** The scheduler keeps track of all suspended fibers and resumes them as appropriate. + + Each Eio domain has one scheduler, which keeps a queue of runnable + processes plus a record of all fibers waiting for IO operations to complete. *) + +type t + +type exit +(** This is equivalent to [unit], but indicates that a function returning this will call {!next} + and so does not return until the whole event loop is finished. Such functions should normally + be called in tail position. *) + +val with_sched : (t -> 'a) -> 'a +(** [with_sched fn] sets up a scheduler and calls [fn t]. + Typically [fn] will call {!run}. + When [fn] returns, the scheduler's resources are freed. *) + +val run : + extra_effects:exit Effect.Deep.effect_handler -> + t -> ('a -> 'b) -> 'a -> 'b [@@alert "-unstable"] +(** [run ~extra_effects t f x] starts an event loop using [t] and runs [f x] as the root fiber within it. + + Unknown effects are passed to [extra_effects]. *) + +val next : t -> exit +(** [next t] asks the scheduler to transfer control to the next runnable fiber, + or wait for an event from the OS if there is none. This should normally be + called in tail position from an effect handler. *) + +val await_readable : t -> unit Eio_utils.Suspended.t -> Unix.file_descr -> exit +(** [await_readable t k fd] arranges for [k] to be resumed when [fd] is ready for reading. *) + +val await_writable : t -> unit Eio_utils.Suspended.t -> Unix.file_descr -> exit +(** [await_readable t k fd] arranges for [k] to be resumed when [fd] is ready for writing. *) + +val await_timeout : t -> unit Eio_utils.Suspended.t -> Mtime.t -> exit +(** [await_timeout t k time] adds [time, k] to the timer. + + When [time] is reached, [k] is resumed. Cancelling [k] removes the entry from the timer. *) + +val enter : (t -> 'a Eio_utils.Suspended.t -> exit) -> 'a +(** [enter fn] suspends the current fiber and runs [fn t k] in the scheduler's context. + + [fn] should either resume [k] immediately itself, or use {!get_enqueue} to arrange for + it to be called later. *) diff --git a/lib_eio_posix/test/dune b/lib_eio_posix/test/dune new file mode 100644 index 000000000..2b12d5166 --- /dev/null +++ b/lib_eio_posix/test/dune @@ -0,0 +1,4 @@ +(mdx + (package eio_posix) + (enabled_if (= %{os_type} "Unix")) + (deps (package eio_posix))) diff --git a/lib_eio_posix/test/poll.md b/lib_eio_posix/test/poll.md new file mode 100644 index 000000000..ce46d111c --- /dev/null +++ b/lib_eio_posix/test/poll.md @@ -0,0 +1,22 @@ +```ocaml +# #require "eio_posix";; +``` + +```ocaml +open Eio.Std +``` + +## Closing an FD removes it from epoll + +```ocaml +# Eio_posix.run @@ fun _env -> + Switch.run (fun sw -> + let r, w = Eio_unix.pipe sw in + Eio_unix.await_writable (Eio_unix.FD.peek w) + ); + Switch.run (fun sw -> + let r, w = Eio_unix.pipe sw in + Eio_unix.await_writable (Eio_unix.FD.peek w) + );; +- : unit = () +``` diff --git a/lib_eio_posix/time.ml b/lib_eio_posix/time.ml new file mode 100644 index 000000000..b11aaffa3 --- /dev/null +++ b/lib_eio_posix/time.ml @@ -0,0 +1,19 @@ +let mono_clock = object + inherit Eio.Time.Mono.t + + method now = Mtime_clock.now () + + method sleep_until = Low_level.sleep_until +end + +let clock = object + inherit Eio.Time.clock + + method now = Unix.gettimeofday () + + method sleep_until time = + (* todo: use the realtime clock directly instead of converting to monotonic time. + That is needed to handle adjustments to the system clock correctly. *) + let d = time -. Unix.gettimeofday () in + Eio.Time.Mono.sleep mono_clock d +end diff --git a/lib_main/dune b/lib_main/dune index 727bd8490..a2e412ef0 100644 --- a/lib_main/dune +++ b/lib_main/dune @@ -5,6 +5,9 @@ (select linux_backend.ml from (eio_linux -> linux_backend.enabled.ml) ( -> linux_backend.disabled.ml)) + (select posix_backend.ml from + (eio_posix -> posix_backend.enabled.ml) + ( -> posix_backend.disabled.ml)) (select luv_backend.ml from (eio_luv -> luv_backend.enabled.ml) ( -> luv_backend.disabled.ml)) diff --git a/lib_main/eio_main.ml b/lib_main/eio_main.ml index 35957a2d6..89a1ff09c 100644 --- a/lib_main/eio_main.ml +++ b/lib_main/eio_main.ml @@ -4,9 +4,12 @@ let force run fn = let run fn = match Sys.getenv_opt "EIO_BACKEND" with | Some ("io-uring" | "linux") -> force Linux_backend.run fn + | Some "posix" -> force Posix_backend.run fn | Some "luv" -> force Luv_backend.run fn | None | Some "" -> Linux_backend.run fn ~fallback:(fun _ -> - force Luv_backend.run fn + Posix_backend.run fn ~fallback:(fun _ -> + force Luv_backend.run fn + ) ) | Some x -> Fmt.failwith "Unknown Eio backend %S (from $EIO_BACKEND)" x diff --git a/lib_main/posix_backend.disabled.ml b/lib_main/posix_backend.disabled.ml new file mode 100644 index 000000000..5d7a1b9b6 --- /dev/null +++ b/lib_main/posix_backend.disabled.ml @@ -0,0 +1 @@ +let run ~fallback _ = fallback (`Msg "The POSIX backend was disabled at compile-time") diff --git a/lib_main/posix_backend.enabled.ml b/lib_main/posix_backend.enabled.ml new file mode 100644 index 000000000..d3c66a390 --- /dev/null +++ b/lib_main/posix_backend.enabled.ml @@ -0,0 +1 @@ +let run ~fallback:_ fn = Eio_posix.run (fun env -> fn (env :> Eio.Stdenv.t))