diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 61d06a0b5..2142ea6bf 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -32,7 +32,31 @@ jobs: - run: opam --cli=2.1 pin -yn --with-version=dev . - run: opam install ${{ matrix.local-packages }} --deps-only --with-test - run: opam install ${{ matrix.local-packages }} --with-test + windows: + runs-on: windows-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set-up OCaml + uses: ocaml/setup-ocaml@v2 + with: + opam-pin: false + opam-depext: false + ocaml-compiler: ocaml.5.0.0,ocaml-option-mingw + opam-repositories: | + dra27: https://github.com/dra27/opam-repository.git#windows-5.0 + duniverse: git+https://github.com/dune-universe/opam-overlays + normal: https://github.com/ocaml/opam-repository.git + default: https://github.com/ocaml-opam/opam-repository-mingw.git#opam2 + # --with-version=dev is not available, and --with-test also tries running tests for packages (like MDX) which fail... + - run: | + opam pin -yn eio.dev . + opam pin -yn eio_windows.dev . + opam install ocamlfind.1.9.5 kcas alcotest mdx crowbar -y + opam install eio eio_windows --deps-only + - run: opam exec -- dune runtest docker: runs-on: ubuntu-latest steps: diff --git a/doc/dune b/doc/dune index e1a3c0d78..8ea33088a 100644 --- a/doc/dune +++ b/doc/dune @@ -1,4 +1,5 @@ (mdx (package eio_main) (deps (package eio_main) (env_var "EIO_BACKEND")) + (enabled_if (<> %{os_type} "Win32")) (files multicore.md)) diff --git a/dune b/dune index 60c8c18b0..e3e6643d1 100644 --- a/dune +++ b/dune @@ -2,4 +2,5 @@ (package eio_main) (deps (package eio_main) (env_var "EIO_BACKEND")) (preludes doc/prelude.ml) + (enabled_if (<> %{os_type} "Win32")) (files README.md)) diff --git a/dune-project b/dune-project index 5f8734aee..ecb88cb3a 100644 --- a/dune-project +++ b/dune-project @@ -56,10 +56,13 @@ (package (name eio_windows) (synopsis "Eio implementation for Windows") - (description "An Eio implementation using I/O Completion Ports") + (description "An Eio implementation using OCaml's Unix.select") (allow_empty) ; Work-around for dune bug #6938 (depends - (eio (= :version)))) + (eio (= :version)) + (cstruct-unix (= dev)) + (kcas (and (>= 0.3.0) :with-test)) + (alcotest (and (>= 1.4.0) :with-test)))) (package (name eio_main) (synopsis "Effect-based direct-style IO mainloop for OCaml") diff --git a/eio_windows.opam b/eio_windows.opam old mode 100644 new mode 100755 index bf718229f..b007be8d2 --- a/eio_windows.opam +++ b/eio_windows.opam @@ -1,7 +1,7 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" synopsis: "Eio implementation for Windows" -description: "An Eio implementation using I/O Completion Ports" +description: "An Eio implementation using OCaml's Unix.select" maintainer: ["anil@recoil.org"] authors: ["Anil Madhavapeddy" "Thomas Leonard"] license: "ISC" @@ -11,6 +11,9 @@ bug-reports: "https://github.com/ocaml-multicore/eio/issues" depends: [ "dune" {>= "3.7"} "eio" {= version} + "cstruct-unix" {= "dev"} + "kcas" {>= "0.3.0" & with-test} + "alcotest" {>= "1.4.0" & with-test} "odoc" {with-doc} ] build: [ @@ -28,3 +31,10 @@ build: [ ] ] dev-repo: "git+https://github.com/ocaml-multicore/eio.git" +pin-depends: [ + # Removes base bytes for crowbar + [ "ocplib-endian.dev" "git+https://github.com/Leonidas-from-XIV/ocplib-endian#fda4d5525063c8444020be369c63de23d39c246e" ] + # Needed for the cstruct read and writes without copying + [ "cstruct.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] + [ "cstruct-unix.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] +] \ No newline at end of file diff --git a/eio_windows.opam.template b/eio_windows.opam.template new file mode 100755 index 000000000..7f38c1a6a --- /dev/null +++ b/eio_windows.opam.template @@ -0,0 +1,7 @@ +pin-depends: [ + # Removes base bytes for crowbar + [ "ocplib-endian.dev" "git+https://github.com/Leonidas-from-XIV/ocplib-endian#fda4d5525063c8444020be369c63de23d39c246e" ] + # Needed for the cstruct read and writes without copying + [ "cstruct.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] + [ "cstruct-unix.dev" "git+https://github.com/djs55/ocaml-cstruct#471ca03b49b3a372945fcf13c89e0447a8bd3932" ] +] \ No newline at end of file diff --git a/lib_eio/tests/dune b/lib_eio/tests/dune index 48fdb1987..217338572 100644 --- a/lib_eio/tests/dune +++ b/lib_eio/tests/dune @@ -1,5 +1,6 @@ (mdx (package eio) + (enabled_if (<> %{os_type} "Win32")) (deps (package eio) (file ./dscheck/fake_sched.ml) diff --git a/lib_eio/unix/fork_action.c b/lib_eio/unix/fork_action.c index 2c429862d..7f6ef0c33 100644 --- a/lib_eio/unix/fork_action.c +++ b/lib_eio/unix/fork_action.c @@ -5,9 +5,11 @@ #include #include +#include #include "fork_action.h" +#ifndef _WIN32 void eio_unix_run_fork_actions(int errors, value v_actions) { int old_flags = fcntl(errors, F_GETFL, 0); fcntl(errors, F_SETFL, old_flags & ~O_NONBLOCK); @@ -19,6 +21,7 @@ void eio_unix_run_fork_actions(int errors, value v_actions) { } _exit(1); } +#endif static void try_write_all(int fd, char *buf) { int len = strlen(buf); @@ -67,6 +70,9 @@ CAMLprim value eio_unix_fork_execve(value v_unit) { } static void action_fchdir(int errors, value v_config) { + #ifdef _WIN32 + eio_unix_fork_error(errors, "action_fchdir", "Unsupported operation on windows"); + #else value v_fd = Field(v_config, 1); int r; r = fchdir(Int_val(v_fd)); @@ -74,6 +80,7 @@ static void action_fchdir(int errors, value v_config) { eio_unix_fork_error(errors, "fchdir", strerror(errno)); _exit(1); } + #endif } CAMLprim value eio_unix_fork_fchdir(value v_unit) { @@ -95,6 +102,9 @@ CAMLprim value eio_unix_fork_chdir(value v_unit) { } static void set_blocking(int errors, int fd, int blocking) { + #ifdef _WIN32 + eio_unix_fork_error(errors, "set_blocking", "Unsupported operation on windows"); + #else int r = fcntl(fd, F_GETFL, 0); if (r != -1) { int flags = blocking @@ -108,9 +118,13 @@ static void set_blocking(int errors, int fd, int blocking) { eio_unix_fork_error(errors, "fcntl", strerror(errno)); _exit(1); } + #endif } static void set_cloexec(int errors, int fd, int cloexec) { + #ifdef _WIN32 + eio_unix_fork_error(errors, "set_cloexec", "Unsupported operation on windows"); + #else int r = fcntl(fd, F_GETFD, 0); if (r != -1) { int flags = cloexec @@ -124,6 +138,7 @@ static void set_cloexec(int errors, int fd, int cloexec) { eio_unix_fork_error(errors, "fcntl", strerror(errno)); _exit(1); } + #endif } static void action_dups(int errors, value v_config) { diff --git a/lib_eio/unix/stubs.c b/lib_eio/unix/stubs.c index 968f3c20c..bc0b400bb 100644 --- a/lib_eio/unix/stubs.c +++ b/lib_eio/unix/stubs.c @@ -5,10 +5,15 @@ #include CAMLprim value eio_unix_is_blocking(value v_fd) { + #ifdef _WIN32 + // We should not call this function from Windows + uerror("Unsupported blocking check on Windows", Nothing); + #else int fd = Int_val(v_fd); int r = fcntl(fd, F_GETFL, 0); if (r == -1) uerror("fcntl", Nothing); return Val_bool((r & O_NONBLOCK) == 0); + #endif } diff --git a/lib_eio_windows/domain_mgr.ml b/lib_eio_windows/domain_mgr.ml new file mode 100755 index 000000000..f6c2df733 --- /dev/null +++ b/lib_eio_windows/domain_mgr.ml @@ -0,0 +1,86 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +open Eio.Std + +[@@@alert "-unstable"] + +module Fd = Eio_unix.Fd + +(* Run an event loop in the current domain, using [fn x] as the root fiber. *) +let run_event_loop fn x = + Sched.with_sched @@ fun sched -> + let open Effect.Deep in + let extra_effects : _ effect_handler = { + effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option -> + match e with + | Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k (Time.mono_clock : Eio.Time.Mono.t)) + | Eio_unix.Private.Socket_of_fd (sw, close_unix, unix_fd) -> Some (fun k -> + let fd = Fd.of_unix ~sw ~blocking:false ~close_unix unix_fd in + (* TODO: On Windows, if the FD from Unix.pipe () is passed this will fail *) + (try Unix.set_nonblock unix_fd with Unix.Unix_error (Unix.ENOTSOCK, _, _) -> ()); + continue k (Flow.of_fd fd :> Eio_unix.socket) + ) + | Eio_unix.Private.Socketpair (sw, domain, ty, protocol) -> Some (fun k -> + match + let unix_a, unix_b = Unix.socketpair ~cloexec:true domain ty protocol in + let a = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_a in + let b = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_b in + Unix.set_nonblock unix_a; + Unix.set_nonblock unix_b; + (Flow.of_fd a :> Eio_unix.socket), (Flow.of_fd b :> Eio_unix.socket) + with + | r -> continue k r + | exception Unix.Unix_error (code, name, arg) -> + discontinue k (Err.wrap code name arg) + ) + | Eio_unix.Private.Pipe sw -> Some (fun k -> + match + let r, w = Low_level.pipe ~sw in + let source = (Flow.of_fd r :> Eio_unix.source) in + let sink = (Flow.of_fd w :> Eio_unix.sink) in + (source, sink) + with + | r -> continue k r + | exception Unix.Unix_error (code, name, arg) -> + discontinue k (Err.wrap code name arg) + ) + | _ -> None + } + in + Sched.run ~extra_effects sched fn x + +let v = object + inherit Eio.Domain_manager.t + + method run_raw fn = + let domain = ref None in + Eio.Private.Suspend.enter (fun _ctx enqueue -> + domain := Some (Domain.spawn (fun () -> Fun.protect fn ~finally:(fun () -> enqueue (Ok ())))) + ); + Domain.join (Option.get !domain) + + method run fn = + let domain = ref None in + Eio.Private.Suspend.enter (fun ctx enqueue -> + let cancelled, set_cancelled = Promise.create () in + Eio.Private.Fiber_context.set_cancel_fn ctx (Promise.resolve set_cancelled); + domain := Some (Domain.spawn (fun () -> + Fun.protect (run_event_loop (fun () -> fn ~cancelled)) + ~finally:(fun () -> enqueue (Ok ())))) + ); + Domain.join (Option.get !domain) +end diff --git a/lib_eio_windows/dune b/lib_eio_windows/dune index 9e2203057..ab9a1e166 100644 --- a/lib_eio_windows/dune +++ b/lib_eio_windows/dune @@ -1,5 +1,14 @@ (library (name eio_windows) (public_name eio_windows) + (library_flags :standard -ccopt -lbcrypt) (enabled_if (= %{os_type} "Win32")) - (libraries eio eio.utils fmt)) + (foreign_stubs + (language c) + (include_dirs ../lib_eio/unix/include) + (names eio_windows_stubs)) + (libraries eio eio.unix eio.utils fmt cstruct-unix)) + +(rule + (targets config.ml) + (action (run ./include/discover.exe))) \ No newline at end of file diff --git a/lib_eio_windows/eio_windows.ml b/lib_eio_windows/eio_windows.ml old mode 100644 new mode 100755 index 2cee04113..c69d00008 --- a/lib_eio_windows/eio_windows.ml +++ b/lib_eio_windows/eio_windows.ml @@ -1,3 +1,49 @@ -(* Can base this on the eio_posix directory structure. - See HACKING.md for instructions on creating a new backend. *) -let run _main = failwith "TODO: Windows support." +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +module Low_level = Low_level + +type stdenv = < + stdin : ; + stdout : ; + stderr : ; + net : Eio.Net.t; + domain_mgr : Eio.Domain_manager.t; + clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; + fs : Eio.Fs.dir Eio.Path.t; + cwd : Eio.Fs.dir Eio.Path.t; + secure_random : Eio.Flow.source; + debug : Eio.Debug.t; +> + +let run main = + let stdin = (Flow.of_fd Eio_unix.Fd.stdin :> ) in + let stdout = (Flow.of_fd Eio_unix.Fd.stdout :> ) in + let stderr = (Flow.of_fd Eio_unix.Fd.stderr :> ) in + Domain_mgr.run_event_loop main @@ object (_ : stdenv) + method stdin = stdin + method stdout = stdout + method stderr = stderr + method debug = Eio.Private.Debug.v + method clock = Time.clock + method mono_clock = Time.mono_clock + method net = Net.v + method domain_mgr = Domain_mgr.v + method cwd = failwith "file-system operations not supported on Windows yet" + method fs = failwith "file-system operations not supported on Windows yet" + method secure_random = Flow.secure_random + end diff --git a/lib_eio_windows/eio_windows.mli b/lib_eio_windows/eio_windows.mli new file mode 100755 index 000000000..300c97b61 --- /dev/null +++ b/lib_eio_windows/eio_windows.mli @@ -0,0 +1,24 @@ +(** Fallback Eio backend for Windows using OCaml's [Unix.select]. *) + +type stdenv = < + stdin : ; + stdout : ; + stderr : ; + net : Eio.Net.t; + domain_mgr : Eio.Domain_manager.t; + clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; + fs : Eio.Fs.dir Eio.Path.t; + cwd : Eio.Fs.dir Eio.Path.t; + secure_random : Eio.Flow.source; + debug : Eio.Debug.t; +> +(** An extended version of {!Eio.Stdenv.t} with some extra features available on Windows. *) + +val run : (stdenv -> 'a) -> 'a +(** [run main] runs an event loop and calls [main stdenv] inside it. + + For portable code, you should use {!Eio_main.run} instead, which will call this for you if appropriate. *) + +module Low_level = Low_level +(** Low-level API. *) diff --git a/lib_eio_windows/eio_windows_stubs.c b/lib_eio_windows/eio_windows_stubs.c new file mode 100755 index 000000000..6120788b9 --- /dev/null +++ b/lib_eio_windows/eio_windows_stubs.c @@ -0,0 +1,90 @@ +#define _FILE_OFFSET_BITS 64 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#ifdef ARCH_SIXTYFOUR +#define Int63_val(v) Long_val(v) +#else +#define Int63_val(v) (Int64_val(v)) >> 1 +#endif + +static void caml_stat_free_preserving_errno(void *ptr) +{ + int saved = errno; + caml_stat_free(ptr); + errno = saved; +} + +CAMLprim value caml_eio_windows_getrandom(value v_ba, value v_off, value v_len) +{ + CAMLparam1(v_ba); + ssize_t ret; + ssize_t off = (ssize_t)Long_val(v_off); + ssize_t len = (ssize_t)Long_val(v_len); + do + { + void *buf = (uint8_t *)Caml_ba_data_val(v_ba) + off; + caml_enter_blocking_section(); + ret = BCryptGenRandom(NULL, buf, len, BCRYPT_USE_SYSTEM_PREFERRED_RNG); + caml_leave_blocking_section(); + } while (errno == EINTR); + if (ret != STATUS_SUCCESS) + uerror("getrandom", Nothing); + CAMLreturn(Val_long(len)); +} + +CAMLprim value caml_eio_windows_readv(value v_fd, value v_bufs) +{ + uerror("readv is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_preadv(value v_fd, value v_bufs, value v_offset) +{ + uerror("preadv is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_pwritev(value v_fd, value v_bufs, value v_offset) +{ + uerror("pwritev is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_openat(value v_dirfd, value v_pathname, value v_flags, value v_mode) +{ + uerror("openat is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_mkdirat(value v_fd, value v_path, value v_perm) +{ + uerror("mkdirat is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_unlinkat(value v_fd, value v_path, value v_dir) +{ + uerror("unlinkat is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_renameat(value v_old_fd, value v_old_path, value v_new_fd, value v_new_path) +{ + uerror("renameat is not supported on windows yet", Nothing); +} + +CAMLprim value caml_eio_windows_spawn(value v_errors, value v_actions) +{ + uerror("processes are not supported on windows yet", Nothing); +} diff --git a/lib_eio_windows/err.ml b/lib_eio_windows/err.ml new file mode 100755 index 000000000..f813a06c1 --- /dev/null +++ b/lib_eio_windows/err.ml @@ -0,0 +1,29 @@ +type Eio.Exn.Backend.t += + | Outside_sandbox of string * string + | Absolute_path + | Invalid_leaf of string + +let unclassified_error e = Eio.Exn.create (Eio.Exn.X e) + +let () = + Eio.Exn.Backend.register_pp (fun f -> function + | Outside_sandbox (path, dir) -> Fmt.pf f "Outside_sandbox (%S, %S)" path dir; true + | Absolute_path -> Fmt.pf f "Absolute_path"; true + | Invalid_leaf x -> Fmt.pf f "Invalid_leaf %S" x; true + | _ -> false + ) + +let wrap code name arg = + let e = Eio_unix.Unix_error (code, name, arg) in + match code with + | EEXIST -> Eio.Fs.err (Already_exists e) + | ENOENT -> Eio.Fs.err (Not_found e) + | EXDEV | EACCES | EPERM -> Eio.Fs.err (Permission_denied e) + | ECONNREFUSED -> Eio.Net.err (Connection_failure (Refused e)) + | ECONNRESET | EPIPE -> Eio.Net.err (Connection_reset e) + | _ -> unclassified_error e + +let run fn x = + try fn x + with Unix.Unix_error (code, name, arg) -> + raise (wrap code name arg) diff --git a/lib_eio_windows/flow.ml b/lib_eio_windows/flow.ml new file mode 100755 index 000000000..e541b18e2 --- /dev/null +++ b/lib_eio_windows/flow.ml @@ -0,0 +1,86 @@ +module Fd = Eio_unix.Fd + +let fstat fd = + try + let ust = Low_level.fstat fd in + let st_kind : Eio.File.Stat.kind = + match ust.st_kind with + | Unix.S_REG -> `Regular_file + | Unix.S_DIR -> `Directory + | Unix.S_CHR -> `Character_special + | Unix.S_BLK -> `Block_device + | Unix.S_LNK -> `Symbolic_link + | Unix.S_FIFO -> `Fifo + | Unix.S_SOCK -> `Socket + in + Eio.File.Stat.{ + dev = ust.st_dev |> Int64.of_int; + ino = ust.st_ino |> Int64.of_int; + kind = st_kind; + perm = ust.st_perm; + nlink = ust.st_nlink |> Int64.of_int; + uid = ust.st_uid |> Int64.of_int; + gid = ust.st_gid |> Int64.of_int; + rdev = ust.st_rdev |> Int64.of_int; + size = ust.st_size |> Optint.Int63.of_int64; + atime = ust.st_atime; + mtime = ust.st_mtime; + ctime = ust.st_ctime; + } + with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg + +let write_bufs fd bufs = + try + Low_level.writev fd bufs + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + +let copy src dst = + let buf = Cstruct.create 4096 in + try + while true do + let got = Eio.Flow.single_read src buf in + write_bufs dst [Cstruct.sub buf 0 got] + done + with End_of_file -> () + +let read fd buf = + match Low_level.read_cstruct fd buf with + | 0 -> raise End_of_file + | got -> got + | exception (Unix.Unix_error (code, name, arg)) -> raise (Err.wrap code name arg) + +let shutdown fd cmd = + try + Low_level.shutdown fd @@ match cmd with + | `Receive -> Unix.SHUTDOWN_RECEIVE + | `Send -> Unix.SHUTDOWN_SEND + | `All -> Unix.SHUTDOWN_ALL + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + +let of_fd fd = object (_ : ) + method fd = fd + + method read_methods = [] + method copy src = copy src fd + + method pread ~file_offset bufs = Low_level.preadv ~file_offset fd (Array.of_list bufs) + method pwrite ~file_offset bufs = Low_level.pwritev ~file_offset fd (Array.of_list bufs) + + method stat = fstat fd + method read_into buf = read fd buf + method write bufs = write_bufs fd bufs + method shutdown cmd = shutdown fd cmd + method close = Fd.close fd + + method probe : type a. a Eio.Generic.ty -> a option = function + | Eio_unix.Resource.FD -> Some fd + | _ -> None +end + +let secure_random = object + inherit Eio.Flow.source + + method read_into buf = + Low_level.getrandom buf; + Cstruct.length buf +end diff --git a/lib_eio_windows/include/discover.ml b/lib_eio_windows/include/discover.ml new file mode 100755 index 000000000..136d1200a --- /dev/null +++ b/lib_eio_windows/include/discover.ml @@ -0,0 +1,27 @@ +module C = Configurator.V1 + +let () = + C.main ~name:"discover" (fun c -> + let defs = + C.C_define.import c ~c_flags:["-D_LARGEFILE64_SOURCE"] + ~includes:["sys/types.h"; "sys/stat.h"; "fcntl.h"] + C.C_define.Type.[ + "_O_RDONLY", Int; + "_O_RDWR", Int; + "_O_WRONLY", Int; + "_O_APPEND", Int; + "_O_CREAT", Int; + "_O_NOINHERIT", Int; + "_O_TRUNC", Int; + "_O_EXCL", Int; + ] + |> List.map (function + | name, C.C_define.Value.Int v -> + let name_length = String.length name in + let name = String.sub name 1 (name_length - 1) in + Printf.sprintf "let %s = 0x%x" (String.lowercase_ascii name) v + | _ -> assert false + ) + in + C.Flags.write_lines "config.ml" defs + ) diff --git a/lib_eio_windows/include/dune b/lib_eio_windows/include/dune new file mode 100755 index 000000000..db98d61d5 --- /dev/null +++ b/lib_eio_windows/include/dune @@ -0,0 +1,4 @@ +(executable + (name discover) + (modules discover) + (libraries dune-configurator)) diff --git a/lib_eio_windows/low_level.ml b/lib_eio_windows/low_level.ml new file mode 100755 index 000000000..e52bbb20f --- /dev/null +++ b/lib_eio_windows/low_level.ml @@ -0,0 +1,213 @@ +open Eio.Std + +(* There are some things that should be improved here: + + - Blocking FDs (e.g. stdout) wait for the FD to become ready and then do a blocking operation. + This might not succeed, and will block the whole domain in that case. + Ideally, all blocking operations should happen in a sys-thread instead. + + - Various other operations, such as listing a directory, should also be done in a sys-thread + to avoid high latencies in the main domain. *) + +type ty = Read | Write + +(* todo: keeping a pool of workers is probably faster *) +let in_worker_thread = Eio_unix.run_in_systhread + +module Fd = Eio_unix.Fd + +let await_readable fd = + Fd.use_exn "await_readable" fd @@ fun fd -> + Sched.enter @@ fun t k -> + Sched.await_readable t k fd + +let await_writable fd = + Fd.use_exn "await_writable" fd @@ fun fd -> + Sched.enter @@ fun t k -> + Sched.await_writable t k fd + +let rec do_nonblocking ty fn fd = + Fiber.yield (); + try fn fd with + | Unix.Unix_error (EINTR, _, _) -> + do_nonblocking ty fn fd (* Just in case *) + | Unix.Unix_error((EAGAIN | EWOULDBLOCK), _, _) -> + Sched.enter (fun t k -> + match ty with + | Read -> Sched.await_readable t k fd + | Write -> Sched.await_writable t k fd + ); + do_nonblocking ty fn fd + +let read fd buf start len = + Fd.use_exn "read" fd @@ fun fd -> + do_nonblocking Read (fun fd -> Unix.read fd buf start len) fd + +let read_cstruct fd buf = + Fd.use_exn "read_cstruct" fd @@ fun fd -> + do_nonblocking Read (fun fd -> Unix_cstruct.read fd buf) fd + +let write fd buf start len = + Fd.use_exn "write" fd @@ fun fd -> + do_nonblocking Write (fun fd -> Unix.write fd buf start len) fd + +let sleep_until time = + Sched.enter @@ fun t k -> + Sched.await_timeout t k time + +let socket ~sw socket_domain socket_type protocol = + Switch.check sw; + let sock_unix = Unix.socket ~cloexec:true socket_domain socket_type protocol in + Unix.set_nonblock sock_unix; + Fd.of_unix ~sw ~blocking:false ~close_unix:true sock_unix + +let connect fd addr = + try + Fd.use_exn "connect" fd (fun fd -> Unix.connect fd addr) + with + | Unix.Unix_error ((EINTR | EAGAIN | EWOULDBLOCK | EINPROGRESS), _, _) -> + await_writable fd; + match Fd.use_exn "connect" fd Unix.getsockopt_error with + | None -> () + | Some code -> raise (Err.wrap code "connect-in-progress" "") + +let accept ~sw sock = + Fd.use_exn "accept" sock @@ fun sock -> + let client, addr = + do_nonblocking Read (fun fd -> Switch.check sw; Unix.accept ~cloexec:true fd) sock + in + Unix.set_nonblock client; + Fd.of_unix ~sw ~blocking:false ~close_unix:true client, addr + +let shutdown sock cmd = + Fd.use_exn "shutdown" sock (fun fd -> Unix.shutdown fd cmd) + +let send_msg fd ~dst buf = + Fd.use_exn "send_msg" fd @@ fun fd -> + do_nonblocking Write (fun fd -> Unix.sendto fd buf 0 (Bytes.length buf) [] dst) fd + +let recv_msg fd buf = + Fd.use_exn "recv_msg" fd @@ fun fd -> + do_nonblocking Read (fun fd -> Unix.recvfrom fd buf 0 (Bytes.length buf) []) fd + +external eio_getrandom : Cstruct.buffer -> int -> int -> int = "caml_eio_windows_getrandom" + +let getrandom { Cstruct.buffer; off; len } = + let rec loop n = + if n = len then + () + else + loop (n + eio_getrandom buffer (off + n) (len - n)) + in + in_worker_thread @@ fun () -> + loop 0 + +let fstat fd = + Fd.use_exn "fstat" fd Unix.LargeFile.fstat + +let lstat path = + in_worker_thread @@ fun () -> + Unix.LargeFile.lstat path + +let realpath path = + in_worker_thread @@ fun () -> + Unix.realpath path + +let read_entries h = + let rec aux acc = + match Unix.readdir h with + | "." | ".." -> aux acc + | leaf -> aux (leaf :: acc) + | exception End_of_file -> Array.of_list acc + in + aux [] + +let readdir path = + in_worker_thread @@ fun () -> + let h = Unix.opendir path in + match read_entries h with + | r -> Unix.closedir h; r + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + Unix.closedir h; Printexc.raise_with_backtrace ex bt + +external eio_readv : Unix.file_descr -> Cstruct.t array -> int = "caml_eio_windows_readv" + +external eio_preadv : Unix.file_descr -> Cstruct.t array -> Optint.Int63.t -> int = "caml_eio_windows_preadv" +external eio_pwritev : Unix.file_descr -> Cstruct.t array -> Optint.Int63.t -> int = "caml_eio_windows_pwritev" + +let readv fd bufs = + Fd.use_exn "readv" fd @@ fun fd -> + do_nonblocking Read (fun fd -> eio_readv fd bufs) fd + +let writev fd bufs = + Fd.use_exn "writev" fd @@ fun fd -> + do_nonblocking Write (fun fd -> Unix_cstruct.writev fd bufs) fd + +let preadv ~file_offset fd bufs = + Fd.use_exn "preadv" fd @@ fun fd -> + do_nonblocking Read (fun fd -> eio_preadv fd bufs file_offset) fd + +let pwritev ~file_offset fd bufs = + Fd.use_exn "pwritev" fd @@ fun fd -> + do_nonblocking Write (fun fd -> eio_pwritev fd bufs file_offset) fd + +module Open_flags = struct + type t = int + + let rdonly = Config.o_rdonly + let rdwr = Config.o_rdwr + let wronly = Config.o_wronly + let append = Config.o_append + let cloexec = Config.o_noinherit + let creat = Config.o_creat + let excl = Config.o_excl + let trunc = Config.o_trunc + + let empty = 0 + let ( + ) = ( lor ) +end + +let rec with_dirfd op dirfd fn = + match dirfd with + | None -> fn (Obj.magic (failwith "TODO AT_FDCWD") : Unix.file_descr) + | Some dirfd -> Fd.use_exn op dirfd fn + | exception Unix.Unix_error(Unix.EINTR, _, "") -> with_dirfd op dirfd fn + +external eio_openat : Unix.file_descr -> string -> Open_flags.t -> int -> Unix.file_descr = "caml_eio_windows_openat" + +let openat ?dirfd ~sw ~mode path flags = + with_dirfd "openat" dirfd @@ fun dirfd -> + Switch.check sw; + in_worker_thread (fun () -> eio_openat dirfd path Open_flags.(flags + cloexec (* + nonblock *)) mode) + |> Fd.of_unix ~sw ~blocking:false ~close_unix:true + +external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_windows_mkdirat" + +let mkdir ?dirfd ~mode path = + with_dirfd "mkdirat" dirfd @@ fun dirfd -> + in_worker_thread @@ fun () -> + eio_mkdirat dirfd path mode + +external eio_unlinkat : Unix.file_descr -> string -> bool -> unit = "caml_eio_windows_unlinkat" + +let unlink ?dirfd ~dir path = + with_dirfd "unlink" dirfd @@ fun dirfd -> + in_worker_thread @@ fun () -> + eio_unlinkat dirfd path dir + +external eio_renameat : Unix.file_descr -> string -> Unix.file_descr -> string -> unit = "caml_eio_windows_renameat" + +let rename ?old_dir old_path ?new_dir new_path = + with_dirfd "rename-old" old_dir @@ fun old_dir -> + with_dirfd "rename-new" new_dir @@ fun new_dir -> + in_worker_thread @@ fun () -> + eio_renameat old_dir old_path new_dir new_path + +let pipe ~sw = + let unix_r, unix_w = Unix.pipe ~cloexec:true () in + let r = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_r in + let w = Fd.of_unix ~sw ~blocking:false ~close_unix:true unix_w in + Unix.set_nonblock unix_r; + Unix.set_nonblock unix_w; + r, w diff --git a/lib_eio_windows/low_level.mli b/lib_eio_windows/low_level.mli new file mode 100755 index 000000000..8765bc28d --- /dev/null +++ b/lib_eio_windows/low_level.mli @@ -0,0 +1,72 @@ +(** This module provides an effects-based API for calling POSIX functions. + + Normally it's better to use the cross-platform {!Eio} APIs instead, + which uses these functions automatically where appropriate. + + These functions mostly copy the POSIX APIs directly, except that: + + + They suspend the calling fiber instead of returning [EAGAIN] or similar. + + They handle [EINTR] by automatically restarting the call. + + They wrap {!Unix.file_descr} in {!Fd}, to avoid use-after-close bugs. + + They attach new FDs to switches, to avoid resource leaks. *) + +open Eio.Std + +type fd := Eio_unix.Fd.t + +val await_readable : fd -> unit +val await_writable : fd -> unit + +val sleep_until : Mtime.t -> unit + +val read : fd -> bytes -> int -> int -> int +val read_cstruct : fd -> Cstruct.t -> int +val write : fd -> bytes -> int -> int -> int + +val socket : sw:Switch.t -> Unix.socket_domain -> Unix.socket_type -> int -> fd +val connect : fd -> Unix.sockaddr -> unit +val accept : sw:Switch.t -> fd -> fd * Unix.sockaddr + +val shutdown : fd -> Unix.shutdown_command -> unit + +val recv_msg : fd -> bytes -> int * Unix.sockaddr +val send_msg : fd -> dst:Unix.sockaddr -> bytes -> int + +val getrandom : Cstruct.t -> unit + +val fstat : fd -> Unix.LargeFile.stats +val lstat : string -> Unix.LargeFile.stats + +val realpath : string -> string + +val mkdir : ?dirfd:fd -> mode:int -> string -> unit +val unlink : ?dirfd:fd -> dir:bool -> string -> unit +val rename : ?old_dir:fd -> string -> ?new_dir:fd -> string -> unit + +val readdir : string -> string array + +val readv : fd -> Cstruct.t array -> int +val writev : fd -> Cstruct.t list -> unit + +val preadv : file_offset:Optint.Int63.t -> fd -> Cstruct.t array -> int +val pwritev : file_offset:Optint.Int63.t -> fd -> Cstruct.t array -> int + +val pipe : sw:Switch.t -> fd * fd + +module Open_flags : sig + type t + + val rdonly : t + val rdwr : t + val wronly : t + val append : t + val creat : t + val excl : t + val trunc : t + + val empty : t + val ( + ) : t -> t -> t +end + +val openat : ?dirfd:fd -> sw:Switch.t -> mode:int -> string -> Open_flags.t -> fd +(** Note: the returned FD is always non-blocking and close-on-exec. *) diff --git a/lib_eio_windows/net.ml b/lib_eio_windows/net.ml new file mode 100755 index 000000000..4f771ef4f --- /dev/null +++ b/lib_eio_windows/net.ml @@ -0,0 +1,160 @@ +open Eio.Std + +module Fd = Eio_unix.Fd + +let socket_domain_of = function + | `Unix _ -> Unix.PF_UNIX + | `UdpV4 -> Unix.PF_INET + | `UdpV6 -> Unix.PF_INET6 + | `Udp (host, _) + | `Tcp (host, _) -> + Eio.Net.Ipaddr.fold host + ~v4:(fun _ -> Unix.PF_INET) + ~v6:(fun _ -> Unix.PF_INET6) + +let listening_socket ~hook fd = object + inherit Eio.Net.listening_socket + + method close = + Switch.remove_hook hook; + Fd.close fd + + method accept ~sw = + let client, client_addr = Err.run (Low_level.accept ~sw) fd in + let client_addr = match client_addr with + | Unix.ADDR_UNIX path -> `Unix path + | Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Ipaddr.of_unix host, port) + in + let flow = (Flow.of_fd client :> ) in + flow, client_addr + + method! probe : type a. a Eio.Generic.ty -> a option = function + | Eio_unix.Resource.FD -> Some fd + | _ -> None +end + +(* todo: would be nice to avoid copying between bytes and cstructs here *) +let udp_socket sock = object + inherit Eio.Net.datagram_socket + + method close = Fd.close sock + + method send sockaddr buf = + let addr = match sockaddr with + | `Udp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.ADDR_INET (host, port) + in + let sent = Err.run (Low_level.send_msg sock ~dst:addr) (Cstruct.to_bytes buf) in + assert (sent = Cstruct.length buf) + + method recv buf = + let b = Bytes.create (Cstruct.length buf) in + let recv, addr = Err.run (Low_level.recv_msg sock) b in + Cstruct.blit_from_bytes b 0 buf 0 recv; + match addr with + | Unix.ADDR_INET (inet, port) -> + `Udp (Eio_unix.Ipaddr.of_unix inet, port), recv + | Unix.ADDR_UNIX _ -> + raise (Failure "Expected INET UDP socket address but got Unix domain socket address.") +end + +(* https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml *) +let getaddrinfo ~service node = + let to_eio_sockaddr_t {Unix.ai_family; ai_addr; ai_socktype; ai_protocol; _ } = + match ai_family, ai_socktype, ai_addr with + | (Unix.PF_INET | PF_INET6), + (Unix.SOCK_STREAM | SOCK_DGRAM), + Unix.ADDR_INET (inet_addr,port) -> ( + match ai_protocol with + | 6 -> Some (`Tcp (Eio_unix.Ipaddr.of_unix inet_addr, port)) + | 17 -> Some (`Udp (Eio_unix.Ipaddr.of_unix inet_addr, port)) + | _ -> None) + | _ -> None + in + Err.run Eio_unix.run_in_systhread @@ fun () -> + let rec aux () = + try + Unix.getaddrinfo node service [] + |> List.filter_map to_eio_sockaddr_t + with Unix.Unix_error (EINTR, _, _) -> aux () + in + aux () + +let listen ~reuse_addr ~reuse_port ~backlog ~sw (listen_addr : Eio.Net.Sockaddr.stream) = + let socket_type, addr, is_unix_socket = + match listen_addr with + | `Unix path -> + if reuse_addr then ( + match Low_level.lstat path with + | Unix.{ st_kind = S_SOCK; _ } -> Unix.unlink path + | _ -> () + | exception Unix.Unix_error (Unix.ENOENT, _, _) -> () + | exception Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg + ); + Unix.SOCK_STREAM, Unix.ADDR_UNIX path, true + | `Tcp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.SOCK_STREAM, Unix.ADDR_INET (host, port), false + in + let sock = Low_level.socket ~sw (socket_domain_of listen_addr) socket_type 0 in + (* For Unix domain sockets, remove the path when done (except for abstract sockets). *) + let hook = + match listen_addr with + | `Unix path when String.length path > 0 && path.[0] <> Char.chr 0 -> + Switch.on_release_cancellable sw (fun () -> Unix.unlink path) + | `Unix _ | `Tcp _ -> + Switch.null_hook + in + Fd.use_exn "listen" sock (fun fd -> + (* REUSEADDR cannot be set on a Windows UNIX domain socket, + otherwise the Unix.bind will fail! *) + if not is_unix_socket && reuse_addr then + Unix.setsockopt fd Unix.SO_REUSEADDR true; + if reuse_port then + Unix.setsockopt fd Unix.SO_REUSEPORT true; + Unix.bind fd addr; + Unix.listen fd backlog + ); + listening_socket ~hook sock + +let connect ~sw connect_addr = + let socket_type, addr = + match connect_addr with + | `Unix path -> Unix.SOCK_STREAM, Unix.ADDR_UNIX path + | `Tcp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) + in + let sock = Low_level.socket ~sw (socket_domain_of connect_addr) socket_type 0 in + try + Low_level.connect sock addr; + (Flow.of_fd sock :> ) + with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) + +let datagram_socket ~reuse_addr ~reuse_port ~sw saddr = + let sock = Low_level.socket ~sw (socket_domain_of saddr) Unix.SOCK_DGRAM 0 in + begin match saddr with + | `Udp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + let addr = Unix.ADDR_INET (host, port) in + Fd.use_exn "datagram_socket" sock (fun fd -> + if reuse_addr then + Unix.setsockopt fd Unix.SO_REUSEADDR true; + if reuse_port then + Unix.setsockopt fd Unix.SO_REUSEPORT true; + Unix.bind fd addr + ) + | `UdpV4 | `UdpV6 -> () + end; + udp_socket sock + +let v = object + inherit Eio.Net.t + + method listen = listen + method connect = connect + method datagram_socket = datagram_socket + method getaddrinfo = getaddrinfo + method getnameinfo = Eio_unix.getnameinfo +end diff --git a/lib_eio_windows/sched.ml b/lib_eio_windows/sched.ml new file mode 100755 index 000000000..0cc587c9e --- /dev/null +++ b/lib_eio_windows/sched.ml @@ -0,0 +1,368 @@ +(* + * Copyright (C) 2023 Thomas Leonard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +module Suspended = Eio_utils.Suspended +module Zzz = Eio_utils.Zzz +module Lf_queue = Eio_utils.Lf_queue +module Fiber_context = Eio.Private.Fiber_context +module Ctf = Eio.Private.Ctf +module Rcfd = Eio_unix.Private.Rcfd + +type exit = [`Exit_scheduler] + +let system_thread = Ctf.mint_id () + +(* The type of items in the run queue. *) +type runnable = + | IO : runnable (* Reminder to check for IO *) + | Thread : 'a Suspended.t * 'a -> runnable (* Resume a fiber with a result value *) + | Failed_thread : 'a Suspended.t * exn -> runnable (* Resume a fiber with an exception *) + +(* For each FD we track which fibers are waiting for it to become readable/writeable. *) +type fd_event_waiters = { + read : unit Suspended.t Lwt_dllist.t; + write : unit Suspended.t Lwt_dllist.t; +} + +module FdCompare = struct + type t = Unix.file_descr + let compare = Stdlib.compare +end + +module FdSet = Set.Make (FdCompare) + +(* A structure for storing the file descriptors for select. *) +type poll = { + mutable to_read : FdSet.t; + mutable to_write : FdSet.t; +} + +type t = { + (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) + run_q : runnable Lf_queue.t; + + poll : poll; + fd_map : (Unix.file_descr, fd_event_waiters) Hashtbl.t; + + (* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event. + In that case, [need_wakeup = true] and you must signal using [eventfd]. *) + eventfd : Rcfd.t; (* For sending events. *) + eventfd_r : Unix.file_descr; (* For reading events. *) + + mutable active_ops : int; (* Exit when this is zero and [run_q] and [sleep_q] are empty. *) + + (* If [false], the main thread will check [run_q] before sleeping again + (possibly because an event has been or will be sent to [eventfd]). + It can therefore be set to [false] in either of these cases: + - By the receiving thread because it will check [run_q] before sleeping, or + - By the sending thread because it will signal the main thread later *) + need_wakeup : bool Atomic.t; + + sleep_q: Zzz.t; (* Fibers waiting for timers. *) +} + +(* The message to send to [eventfd] (any character would do). *) +let wake_buffer = Bytes.of_string "!" + +(* This can be called from any systhread (including ones not running Eio), + and also from signal handlers or GC finalizers. It must not take any locks. *) +let wakeup t = + Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *) + Rcfd.use t.eventfd + ~if_closed:ignore (* Domain has shut down (presumably after handling the event) *) + (fun fd -> + (* This can fail if the pipe is full, but then a wake up is pending anyway. *) + ignore (Unix.single_write fd wake_buffer 0 1 : int); + ) + +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) +let enqueue_thread t k x = + Lf_queue.push t.run_q (Thread (k, x)); + if Atomic.get t.need_wakeup then wakeup t + +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) +let enqueue_failed_thread t k ex = + Lf_queue.push t.run_q (Failed_thread (k, ex)); + if Atomic.get t.need_wakeup then wakeup t + +(* Can only be called from our own domain, so no need to check for wakeup. *) +let enqueue_at_head t k = + Lf_queue.push_head t.run_q (Thread (k, ())) + +let get_waiters t fd = + match Hashtbl.find_opt t.fd_map fd with + | Some x -> x + | None -> + let x = { read = Lwt_dllist.create (); write = Lwt_dllist.create () } in + Hashtbl.add t.fd_map fd x; + x + +(* The OS told us that the event pipe is ready. Remove events. *) +let clear_event_fd t = + let buf = Bytes.create 8 in (* Read up to 8 events at a time *) + let got = Unix.read t.eventfd_r buf 0 (Bytes.length buf) in + assert (got > 0) + +(* Update [t.poll]'s entry for [fd] to match [waiters]. *) +let update t waiters fd = + let flags = + match not (Lwt_dllist.is_empty waiters.read), + not (Lwt_dllist.is_empty waiters.write) with + | false, false -> `Empty + | true, false -> `R + | false, true -> `W + | true, true -> `RW + in + match flags with + | `Empty -> ( + t.poll.to_read <- FdSet.remove fd t.poll.to_read; + t.poll.to_write <- FdSet.remove fd t.poll.to_write; + Hashtbl.remove t.fd_map fd + ) + | `R -> t.poll.to_read <- FdSet.add fd t.poll.to_read + | `W -> t.poll.to_write <- FdSet.add fd t.poll.to_write + | `RW -> + t.poll.to_read <- FdSet.add fd t.poll.to_read; + t.poll.to_write <- FdSet.add fd t.poll.to_write + +let resume t node = + t.active_ops <- t.active_ops - 1; + let k : unit Suspended.t = Lwt_dllist.get node in + Fiber_context.clear_cancel_fn k.fiber; + enqueue_thread t k () + +(* Called when poll indicates that an event we requested for [fd] is ready. *) +let ready t revents fd = + if fd == t.eventfd_r then ( + clear_event_fd t + (* The scheduler will now look at the run queue again and notice any new items. *) + ) else ( + let waiters = Hashtbl.find t.fd_map fd in + let pending = Lwt_dllist.create () in + if List.mem `W revents then + Lwt_dllist.transfer_l waiters.write pending; + if List.mem `R revents then + Lwt_dllist.transfer_l waiters.read pending; + (* If pending has things, it means we modified the waiters, refresh our view *) + if not (Lwt_dllist.is_empty pending) then + update t waiters fd; + Lwt_dllist.iter_node_r (resume t) pending + ) + +(* Switch control to the next ready continuation. + If none is ready, wait until we get an event to wake one and then switch. + Returns only if there is nothing to do and no active operations. *) +let rec next t : [`Exit_scheduler] = + (* Wakeup any paused fibers *) + match Lf_queue.pop t.run_q with + | None -> assert false (* We should always have an IO job, at least *) + | Some Thread (k, v) -> (* We already have a runnable task *) + Fiber_context.clear_cancel_fn k.fiber; + Suspended.continue k v + | Some Failed_thread (k, ex) -> + Fiber_context.clear_cancel_fn k.fiber; + Suspended.discontinue k ex + | Some IO -> (* Note: be sure to re-inject the IO task before continuing! *) + (* This is not a fair scheduler: timers always run before all other IO *) + let now = Mtime_clock.now () in + match Zzz.pop ~now t.sleep_q with + | `Due k -> + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + Suspended.continue k () (* A sleeping task is now due *) + | `Wait_until _ | `Nothing as next_due -> + let timeout = + match next_due with + | `Wait_until time -> + let time = Mtime.to_uint64_ns time in + let now = Mtime.to_uint64_ns now in + let diff_ns = Int64.sub time now in + (* Convert to seconds for Unix.select *) + let diff = Int64.(to_float diff_ns) /. 1_000_000_000. in + diff + | `Nothing -> (-1.) + in + if timeout < 0. && t.active_ops = 0 then ( + (* Nothing further can happen at this point. *) + Lf_queue.close t.run_q; (* Just to catch bugs if something tries to enqueue later *) + `Exit_scheduler + ) else ( + Atomic.set t.need_wakeup true; + if Lf_queue.is_empty t.run_q then ( + (* At this point we're not going to check [run_q] again before sleeping. + If [need_wakeup] is still [true], this is fine because we don't promise to do that. + If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) + Ctf.(note_hiatus Wait_for_work); + let cons fd acc = fd :: acc in + let read = FdSet.fold cons t.poll.to_read [] in + let write = FdSet.fold cons t.poll.to_write [] in + match Unix.select read write [] timeout with + | exception Unix.(Unix_error (EINTR, _, _)) -> next t + | readable, writeable, _ -> + Ctf.note_resume system_thread; + Atomic.set t.need_wakeup false; + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + List.iter (ready t [ `W ]) writeable; + List.iter (ready t [ `R ]) readable; + next t + ) else ( + (* Someone added a new job while we were setting [need_wakeup] to [true]. + They might or might not have seen that, so we can't be sure they'll send an event. *) + Atomic.set t.need_wakeup false; + Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *) + next t + ) + ) + +let with_sched fn = + let run_q = Lf_queue.create () in + Lf_queue.push run_q IO; + let sleep_q = Zzz.create () in + (* Pipes on Windows cannot be nonblocking through the OCaml API. *) + let eventfd_r, eventfd_w = Unix.socketpair ~cloexec:true Unix.PF_UNIX Unix.SOCK_STREAM 0 in + Unix.set_nonblock eventfd_r; + Unix.set_nonblock eventfd_w; + let eventfd = Rcfd.make eventfd_w in + let cleanup () = + Unix.close eventfd_r; + let was_open = Rcfd.close eventfd in + assert was_open + in + let poll = { to_read = FdSet.empty; to_write = FdSet.empty } in + let fd_map = Hashtbl.create 10 in + let t = { run_q; poll; fd_map; eventfd; eventfd_r; + active_ops = 0; need_wakeup = Atomic.make false; sleep_q } in + t.poll.to_read <- FdSet.add eventfd_r t.poll.to_read; + match fn t with + | x -> cleanup (); x + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + cleanup (); + Printexc.raise_with_backtrace ex bt + +let await_readable t (k : unit Suspended.t) fd = + match Fiber_context.get_error k.fiber with + | Some e -> Suspended.discontinue k e + | None -> + t.active_ops <- t.active_ops + 1; + let waiters = get_waiters t fd in + let was_empty = Lwt_dllist.is_empty waiters.read in + let node = Lwt_dllist.add_l k waiters.read in + if was_empty then update t waiters fd; + Fiber_context.set_cancel_fn k.fiber (fun ex -> + Lwt_dllist.remove node; + t.active_ops <- t.active_ops - 1; + enqueue_failed_thread t k ex + ); + next t + +let await_writable t (k : unit Suspended.t) fd = + match Fiber_context.get_error k.fiber with + | Some e -> Suspended.discontinue k e + | None -> + t.active_ops <- t.active_ops + 1; + let waiters = get_waiters t fd in + let was_empty = Lwt_dllist.is_empty waiters.write in + let node = Lwt_dllist.add_l k waiters.write in + if was_empty then update t waiters fd; + Fiber_context.set_cancel_fn k.fiber (fun ex -> + Lwt_dllist.remove node; + t.active_ops <- t.active_ops - 1; + enqueue_failed_thread t k ex + ); + next t + +let get_enqueue t k = function + | Ok v -> enqueue_thread t k v + | Error ex -> enqueue_failed_thread t k ex + +let await_timeout t (k : unit Suspended.t) time = + match Fiber_context.get_error k.fiber with + | Some e -> Suspended.discontinue k e + | None -> + let node = Zzz.add t.sleep_q time k in + Fiber_context.set_cancel_fn k.fiber (fun ex -> + Zzz.remove t.sleep_q node; + enqueue_failed_thread t k ex + ); + next t + +let with_op t fn x = + t.active_ops <- t.active_ops + 1; + match fn x with + | r -> + t.active_ops <- t.active_ops - 1; + r + | exception ex -> + t.active_ops <- t.active_ops - 1; + raise ex + +[@@@alert "-unstable"] + +type _ Effect.t += Enter : (t -> 'a Eio_utils.Suspended.t -> [`Exit_scheduler]) -> 'a Effect.t +let enter fn = Effect.perform (Enter fn) + +let run ~extra_effects t main x = + let rec fork ~new_fiber:fiber fn = + let open Effect.Deep in + Ctf.note_switch (Fiber_context.tid fiber); + match_with fn () + { retc = (fun () -> Fiber_context.destroy fiber; next t); + exnc = (fun ex -> + Fiber_context.destroy fiber; + Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ()) + ); + effc = fun (type a) (e : a Effect.t) -> + match e with + | Enter fn -> Some (fun k -> + match Fiber_context.get_error fiber with + | Some e -> discontinue k e + | None -> fn t { Suspended.k; fiber } + ) + | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fiber) + | Eio.Private.Effects.Suspend f -> Some (fun k -> + let k = { Suspended.k; fiber } in + let enqueue = get_enqueue t k in + f fiber enqueue; + next t + ) + | Eio.Private.Effects.Fork (new_fiber, f) -> Some (fun k -> + let k = { Suspended.k; fiber } in + enqueue_at_head t k; + fork ~new_fiber f + ) + | Eio_unix.Private.Await_readable fd -> Some (fun k -> + await_readable t { Suspended.k; fiber } fd + ) + | Eio_unix.Private.Await_writable fd -> Some (fun k -> + await_writable t { Suspended.k; fiber } fd + ) + | e -> extra_effects.Effect.Deep.effc e + } + in + let result = ref None in + let `Exit_scheduler = + let new_fiber = Fiber_context.make_root () in + Domain_local_await.using + ~prepare_for_await:Eio.Private.Dla.prepare_for_await + ~while_running:(fun () -> + fork ~new_fiber (fun () -> + result := Some (with_op t main x); + ) + ) + in + match !result with + | Some x -> x + | None -> failwith "BUG in scheduler: deadlock detected" diff --git a/lib_eio_windows/sched.mli b/lib_eio_windows/sched.mli new file mode 100755 index 000000000..01fc1cbcb --- /dev/null +++ b/lib_eio_windows/sched.mli @@ -0,0 +1,44 @@ +(** The scheduler keeps track of all suspended fibers and resumes them as appropriate. + + Each Eio domain has one scheduler, which keeps a queue of runnable + processes plus a record of all fibers waiting for IO operations to complete. *) + +type t + +type exit +(** This is equivalent to [unit], but indicates that a function returning this will call {!next} + and so does not return until the whole event loop is finished. Such functions should normally + be called in tail position. *) + +val with_sched : (t -> 'a) -> 'a +(** [with_sched fn] sets up a scheduler and calls [fn t]. + Typically [fn] will call {!run}. + When [fn] returns, the scheduler's resources are freed. *) + +val run : + extra_effects:exit Effect.Deep.effect_handler -> + t -> ('a -> 'b) -> 'a -> 'b [@@alert "-unstable"] +(** [run ~extra_effects t f x] starts an event loop using [t] and runs [f x] as the root fiber within it. + + Unknown effects are passed to [extra_effects]. *) + +val next : t -> exit +(** [next t] asks the scheduler to transfer control to the next runnable fiber, + or wait for an event from the OS if there is none. This should normally be + called in tail position from an effect handler. *) + +val await_readable : t -> unit Eio_utils.Suspended.t -> Unix.file_descr -> exit +(** [await_readable t k fd] arranges for [k] to be resumed when [fd] is ready for reading. *) + +val await_writable : t -> unit Eio_utils.Suspended.t -> Unix.file_descr -> exit +(** [await_readable t k fd] arranges for [k] to be resumed when [fd] is ready for writing. *) + +val await_timeout : t -> unit Eio_utils.Suspended.t -> Mtime.t -> exit +(** [await_timeout t k time] adds [time, k] to the timer. + + When [time] is reached, [k] is resumed. Cancelling [k] removes the entry from the timer. *) + +val enter : (t -> 'a Eio_utils.Suspended.t -> exit) -> 'a +(** [enter fn] suspends the current fiber and runs [fn t k] in the scheduler's context. + + [fn] should either resume [k] immediately itself, or call one of the [await_*] functions above. *) diff --git a/lib_eio_windows/test/dune b/lib_eio_windows/test/dune new file mode 100755 index 000000000..0b97f06ca --- /dev/null +++ b/lib_eio_windows/test/dune @@ -0,0 +1,13 @@ +(* -*- tuareg -*- *) + +let win32 = List.mem ("os_type", "Win32") Jbuild_plugin.V1.ocamlc_config + +let () = Jbuild_plugin.V1.send @@ if not win32 then "" else {| + +(test + (name test) + (package eio_windows) + (enabled_if (= %{os_type} "Win32")) + (libraries alcotest kcas eio.mock eio_windows)) + +|} \ No newline at end of file diff --git a/lib_eio_windows/test/test.ml b/lib_eio_windows/test/test.ml new file mode 100755 index 000000000..510c40964 --- /dev/null +++ b/lib_eio_windows/test/test.ml @@ -0,0 +1,59 @@ +module Timeout = struct + let test clock () = + let t0 = Unix.gettimeofday () in + Eio.Time.sleep clock 0.01; + let t1 = Unix.gettimeofday () in + let diff = t1 -. t0 in + if diff >= 0.01 then () else Alcotest.failf "Expected bigger difference than %f" diff + + + let tests env = [ + "timeout", `Quick, test env#clock + ] +end + +module Random = struct + let test_random env () = + let src = Eio.Stdenv.secure_random env in + let b1 = Cstruct.create 8 in + let b2 = Cstruct.create 8 in + Eio.Flow.read_exact src b1; + Eio.Flow.read_exact src b2; + Alcotest.(check bool) "different random" (not (Cstruct.equal b1 b2)) true + + let tests env = [ + "different", `Quick, test_random env + ] +end + +module Dla = struct + + let test_dla () = + let open Kcas in + let x = Loc.make 0 in + let y = Loc.make 0 in + let foreign_domain = Domain.spawn @@ fun () -> + let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in + Loc.set y 22; + x + in + Loc.set x 20; + let y' = Loc.get_as (fun y -> Retry.unless (y <> 0); y) y in + Alcotest.(check int) "correct y" y' 22; + let ans = y' + Domain.join foreign_domain in + Alcotest.(check int) "answer" ans 42 + + let tests = [ + "dla", `Quick, test_dla + ] +end + + +let () = + Eio_windows.run @@ fun env -> + Alcotest.run "eio_windows" [ + "net", Test_net.tests env; + "timeout", Timeout.tests env; + "random", Random.tests env; + "dla", Dla.tests + ] \ No newline at end of file diff --git a/lib_eio_windows/test/test_net.ml b/lib_eio_windows/test/test_net.ml new file mode 100755 index 000000000..0877c9f0a --- /dev/null +++ b/lib_eio_windows/test/test_net.ml @@ -0,0 +1,118 @@ +open Eio.Std + +let read_all flow = + let b = Buffer.create 100 in + Eio.Flow.copy flow (Eio.Flow.buffer_sink b); + Buffer.contents b + +let run_client ~sw ~net ~addr = + traceln "Connecting to server..."; + let flow = Eio.Net.connect ~sw net addr in + Eio.traceln "connected"; + Eio.Flow.copy_string "Hello from client" flow; + Eio.Flow.shutdown flow `Send; + let msg = read_all flow in + msg + +let run_server ~sw msg socket = + Eio.Net.accept_fork socket ~sw (fun flow _addr -> + traceln "Server accepted connection from client"; + Fun.protect (fun () -> + let msg = read_all flow in + traceln "Server received: %S" msg + ) ~finally:(fun () -> Eio.Flow.copy_string msg flow) + ) + ~on_error:(function + | ex -> traceln "Error handling connection: %s" (Printexc.to_string ex) + ) + +let test_client_server env addr () = + Eio.Switch.run @@ fun sw -> + let server = Eio.Net.listen env#net ~sw ~reuse_addr:true ~backlog:5 addr in + let msg = "From the server" in + Fiber.both + (fun () -> run_server ~sw msg server) + (fun () -> + let client_msg = run_client ~sw ~net:env#net ~addr in + Alcotest.(check string) "same message" msg client_msg + ) + +let run_dgram addr ~net sw msg = + let e1 = `Udp (addr, 8081) in + let e2 = `Udp (addr, 8082) in + let listening_socket = Eio.Net.datagram_socket ~sw net e2 in + Fiber.both + (fun () -> + let buf = Cstruct.create 20 in + traceln "Waiting to receive data on %a" Eio.Net.Sockaddr.pp e2; + let addr, recv = Eio.Net.recv listening_socket buf in + traceln "Received message from %a" + Eio.Net.Sockaddr.pp addr; + Alcotest.(check string) "same udp msg" msg (Cstruct.(to_string (sub buf 0 recv))) + ) + (fun () -> + let e = Eio.Net.datagram_socket ~sw net e1 in + traceln "Sending data from %a to %a" Eio.Net.Sockaddr.pp e1 Eio.Net.Sockaddr.pp e2; + Eio.Net.send e e2 (Cstruct.of_string msg)) + +let test_udp env addr () = + Eio.Switch.run @@ fun sw -> + run_dgram addr ~net:env#net sw "UDP on Windows" + +let test_fd env addr () = + Eio.Switch.run @@ fun sw -> + let addr = `Tcp (addr, 8081) in + let server = Eio.Net.listen env#net ~sw ~reuse_addr:true ~backlog:5 addr in + Alcotest.(check bool) "Listening socket has Unix FD" (Eio_unix.Resource.fd_opt server <> None) true; + let have_client, have_server = + Fiber.pair + (fun () -> + let flow = Eio.Net.connect ~sw env#net addr in + (Eio_unix.Resource.fd_opt flow <> None) + ) + (fun () -> + let flow, _addr = Eio.Net.accept ~sw server in + (Eio_unix.Resource.fd_opt flow <> None) + ) + in + Alcotest.(check bool) "Client-side socket has Unix FD" have_client true; + Alcotest.(check bool) "Server-side socket has Unix FD" have_server true + +let test_wrap_socket pipe_or_socketpair () = + Switch.run @@ fun sw -> + let r, w = + match pipe_or_socketpair with + | `Pipe -> Unix.pipe () + | `Socketpair -> Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 + in + let source = (Eio_unix.import_socket_stream ~sw ~close_unix:true r :> Eio.Flow.source) in + let sink = (Eio_unix.import_socket_stream ~sw ~close_unix:true w :> Eio.Flow.sink) in + let msg = "Hello" in + Fiber.both + (fun () -> Eio.Flow.copy_string (msg ^ "\n") sink) + (fun () -> + let b = Eio.Buf_read.of_flow source ~max_size:1000 in + Alcotest.(check string) "same message" (Eio.Buf_read.line b) msg + ) + +let test_eio_socketpair () = + Switch.run @@ fun sw -> + let a, b = Eio_unix.socketpair ~sw () in + ignore (Eio_unix.Resource.fd a : Eio_unix.Fd.t); + ignore (Eio_unix.Resource.fd b : Eio_unix.Fd.t); + Eio.Flow.copy_string "foo" a; + Eio.Flow.close a; + let msg = Eio.Buf_read.of_flow b ~max_size:10 |> Eio.Buf_read.take_all in + Alcotest.(check string) "same messagw" "foo" msg + +let tests env = [ + "tcp-ip4", `Quick, test_client_server env (`Tcp (Eio.Net.Ipaddr.V4.loopback, 8081)); + "tcp-ip6", `Quick, test_client_server env (`Tcp (Eio.Net.Ipaddr.V6.loopback, 8081)); + "unix", `Quick, test_client_server env (`Unix "eio-test.sock"); + "udp-ip4", `Quick, test_udp env Eio.Net.Ipaddr.V4.loopback; + "udp-ip6", `Quick, test_udp env Eio.Net.Ipaddr.V6.loopback; + "fds", `Quick, test_fd env Eio.Net.Ipaddr.V4.loopback; + "wrap-pipe", `Quick, test_wrap_socket `Pipe; + "wrap-socketpair", `Quick, test_wrap_socket `Socketpair; + "eio-socketpair", `Quick, test_eio_socketpair +] \ No newline at end of file diff --git a/lib_eio_windows/time.ml b/lib_eio_windows/time.ml new file mode 100755 index 000000000..b11aaffa3 --- /dev/null +++ b/lib_eio_windows/time.ml @@ -0,0 +1,19 @@ +let mono_clock = object + inherit Eio.Time.Mono.t + + method now = Mtime_clock.now () + + method sleep_until = Low_level.sleep_until +end + +let clock = object + inherit Eio.Time.clock + + method now = Unix.gettimeofday () + + method sleep_until time = + (* todo: use the realtime clock directly instead of converting to monotonic time. + That is needed to handle adjustments to the system clock correctly. *) + let d = time -. Unix.gettimeofday () in + Eio.Time.Mono.sleep mono_clock d +end diff --git a/tests/dune b/tests/dune index a4836c50f..31aec547a 100644 --- a/tests/dune +++ b/tests/dune @@ -1,5 +1,6 @@ (mdx (package eio_main) + (enabled_if (<> %{os_type} "Win32")) (deps (env_var "EIO_BACKEND") (package eio_main)))