Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eio_linux: expose more functions in the Low_level module #705

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,18 @@ end = struct
let d = v ~label ~path:(native_internal t path) (Low_level.FD fd) in
Eio.Resource.T (d, Dir_handler.v)

let mkdir t ~perm path = Low_level.mkdir_beneath ~perm t.fd path
let mkdir t ~perm path = Low_level.mkdir ~perm t.fd path

let read_dir t path =
Switch.run ~name:"read_dir" @@ fun sw ->
let fd = Low_level.open_dir ~sw t.fd (if path = "" then "." else path) in
let path = if path = "" then "." else path in
let fd =
Low_level.openat ~sw t.fd path
~seekable:false
~access:`R
~flags:Uring.Open_flags.(cloexec + directory)
~perm:0
in
Low_level.read_dir fd

let read_link t path = Low_level.read_link t.fd path
Expand All @@ -562,7 +569,7 @@ end = struct
if !Sched.statx_works then (
let module X = Uring.Statx in
let x = X.create () in
Low_level.statx_confined ~follow ~mask:X.Mask.basic_stats t.fd path x;
Low_level.statx ~follow ~mask:X.Mask.basic_stats t.fd path x;
{ Eio.File.Stat.
dev = X.dev x;
ino = X.ino x;
Expand Down
205 changes: 1 addition & 204 deletions lib_eio_linux/eio_linux.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

open Eio.Std

type fd := Eio_unix.Fd.t

(** {1 Main Loop} *)

type stdenv = Eio_unix.Stdenv.base
Expand All @@ -51,203 +47,4 @@ val run :
The argument is a message describing the problem (for logging).
The default simply raises an exception. *)

(** {1 Low-level API} *)

(** Low-level API for using uring directly. *)
module Low_level : sig
val noop : unit -> unit
(** [noop ()] performs a uring noop. This is only useful for benchmarking. *)

(** {1 Time functions} *)

val sleep_until : Mtime.t -> unit
(** [sleep_until time] blocks until the current time is [time]. *)

(** {1 Fixed-buffer memory allocation functions}

The size of the fixed buffer is set when calling {!run}, which attempts to allocate a fixed buffer.
However, that may fail due to resource limits. *)

val alloc_fixed : unit -> Uring.Region.chunk option
(** Allocate a chunk of memory from the fixed buffer.

Warning: The memory is NOT zeroed out.

Passing such memory to Linux can be faster than using normal memory, in certain cases.
There is a limited amount of such memory, and this will return [None] if none is available at present. *)

val alloc_fixed_or_wait : unit -> Uring.Region.chunk
(** Like {!alloc_fixed}, but if there are no chunks available then it waits until one is. *)

val free_fixed : Uring.Region.chunk -> unit

val with_chunk : fallback:(unit -> 'a) -> (Uring.Region.chunk -> 'a) -> 'a
(** [with_chunk ~fallback fn] runs [fn chunk] with a freshly allocated chunk and then frees it.

If no chunks are available, it runs [fallback ()] instead. *)

(** {1 File manipulation functions} *)

val openat2 :
sw:Switch.t ->
?seekable:bool ->
access:[`R|`W|`RW] ->
flags:Uring.Open_flags.t ->
perm:Unix.file_perm ->
resolve:Uring.Resolve.t ->
?dir:fd -> string -> fd
(** [openat2 ~sw ~flags ~perm ~resolve ~dir path] opens [dir/path].

See {!Uring.openat2} for details. *)

val read_upto : ?file_offset:Optint.Int63.t -> fd -> Uring.Region.chunk -> int -> int
(** [read_upto fd chunk len] reads at most [len] bytes from [fd],
returning as soon as some data is available.

@param file_offset Read from the given position in [fd] (default: 0).
@raise End_of_file Raised if all data has already been read. *)

val read_exactly : ?file_offset:Optint.Int63.t -> fd -> Uring.Region.chunk -> int -> unit
(** [read_exactly fd chunk len] reads exactly [len] bytes from [fd],
performing multiple read operations if necessary.

@param file_offset Read from the given position in [fd] (default: 0).
@raise End_of_file Raised if the stream ends before [len] bytes have been read. *)

val readv : ?file_offset:Optint.Int63.t -> fd -> Cstruct.t list -> int
(** [readv] is like {!read_upto} but can read into any cstruct(s),
not just chunks of the pre-shared buffer.

If multiple buffers are given, they are filled in order. *)

val write : ?file_offset:Optint.Int63.t -> fd -> Uring.Region.chunk -> int -> unit
(** [write fd buf len] writes exactly [len] bytes from [buf] to [fd].

It blocks until the OS confirms the write is done,
and resubmits automatically if the OS doesn't write all of it at once. *)

val writev : ?file_offset:Optint.Int63.t -> fd -> Cstruct.t list -> unit
(** [writev] is like {!write} but can write from any cstruct(s),
not just chunks of the pre-shared buffer.

If multiple buffers are given, they are sent in order.
It will make multiple OS calls if the OS doesn't write all of it at once. *)

val writev_single : ?file_offset:Optint.Int63.t -> fd -> Cstruct.t list -> int
(** [writev_single] is like [writev] but only performs a single write operation.
It returns the number of bytes written, which may be smaller than the requested amount. *)

val splice : fd -> dst:fd -> len:int -> int
(** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst].

@return The number of bytes copied.
@raise End_of_file [src] is at the end of the file.
@raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *)

val connect : fd -> Unix.sockaddr -> unit
(** [connect fd addr] attempts to connect socket [fd] to [addr]. *)

val await_readable : fd -> unit
(** [await_readable fd] blocks until [fd] is readable (or has an error). *)

val await_writable : fd -> unit
(** [await_writable fd] blocks until [fd] is writable (or has an error). *)

val fstat : fd -> Eio.File.Stat.t
(** Like {!Unix.LargeFile.fstat}. *)

val statx : ?fd:fd -> mask:Uring.Statx.Mask.t -> string -> Uring.Statx.t -> Uring.Statx.Flags.t -> unit
(** [statx t ?fd ~mask path buf flags] stats [path], which is resolved relative to [fd]
(or the current directory if [fd] is not given).

The results are written to [buf]. *)

val read_dir : fd -> string list
(** [read_dir dir] reads all directory entries from [dir].
The entries are not returned in any particular order
(not even necessarily the order in which Linux returns them). *)

val lseek : fd -> Optint.Int63.t -> [`Set | `Cur | `End] -> Optint.Int63.t
(** Set and/or get the current file position.

Like {!Unix.lseek}. *)

val fsync : fd -> unit
(** Flush file buffers to disk.

Like {!Unix.fsync}. *)

val ftruncate : fd -> Optint.Int63.t -> unit
(** Set the length of a file.

Like {!Unix.ftruncate}. *)

(** {1 Sockets} *)

val accept : sw:Switch.t -> fd -> (fd * Unix.sockaddr)
(** [accept ~sw t] blocks until a new connection is received on listening socket [t].

It returns the new connection and the address of the connecting peer.
The new connection has the close-on-exec flag set automatically.
The new connection is attached to [sw] and will be closed when that finishes, if
not already closed manually by then. *)

val shutdown : fd -> Unix.shutdown_command -> unit
(** Like {!Unix.shutdown}. *)

val send_msg : fd -> ?fds:fd list -> ?dst:Unix.sockaddr -> Cstruct.t list -> int
(** [send_msg socket bufs] is like [writev socket bufs], but also allows setting the destination address
(for unconnected sockets) and attaching FDs (for Unix-domain sockets). *)

val recv_msg : fd -> Cstruct.t list -> Uring.Sockaddr.t * int
(** [recv_msg socket bufs] is like [readv socket bufs] but also returns the address of the sender. *)

val recv_msg_with_fds : sw:Switch.t -> max_fds:int -> fd -> Cstruct.t list -> Uring.Sockaddr.t * int * fd list
(** [recv_msg_with_fds] is like [recv_msg] but also allows receiving up to [max_fds] file descriptors
(sent using SCM_RIGHTS over a Unix domain socket). *)

(** {1 Randomness} *)

val getrandom : Cstruct.t -> unit
(**[getrandom buf] fills [buf] with random bytes.

It uses Linux's [getrandom] call, which is like reading from /dev/urandom
except that it will block (the whole domain) if used at early boot
when the random system hasn't been initialised yet. *)

(** {1 DNS functions} *)

val getaddrinfo : service:string -> string -> Eio.Net.Sockaddr.t list
(** [getaddrinfo host] returns a list of IP addresses for [host]. [host] is either a domain name or
an ipaddress. *)

(** {1 Processes} *)

module Process : sig
type t
(** A child process. *)

module Fork_action = Eio_unix.Private.Fork_action
(** Setup actions to perform in the child process. *)

val spawn : sw:Switch.t -> Fork_action.t list -> t
(** [spawn ~sw actions] forks a child process, which executes [actions].
The last action should be {!Fork_action.execve}.

You will typically want to do [Promise.await (exit_status child)] after this.

@param sw The child will be sent {!Sys.sigkill} if [sw] finishes. *)

val signal : t -> int -> unit
(** [signal t x] sends signal [x] to [t].

This is similar to doing [Unix.kill t.pid x],
except that it ensures no signal is sent after [t] has been reaped. *)

val pid : t -> int

val exit_status : t -> Unix.process_status Promise.t
(** [exit_status t] is a promise for the process's exit status. *)
end

end
module Low_level = Low_level
21 changes: 8 additions & 13 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ let with_parent_dir op dir path fn =
Fd.use_exn op parent @@ fun parent ->
fn parent leaf

let statx ?fd ~mask path buf flags =
let statx_raw ?fd ~mask path buf flags =
let res =
match fd with
| None -> Sched.enter "statx" (enqueue_statx (None, path, buf, flags, mask))
Expand All @@ -411,24 +411,25 @@ let statx ?fd ~mask path buf flags =
in
if res <> 0 then raise @@ Err.wrap_fs (Uring.error_of_errno res) "statx" path

let statx_confined ~mask ~follow fd path buf =
let statx ~mask ~follow fd path buf =
let module X = Uring.Statx in
let flags = if follow then X.Flags.empty else X.Flags.symlink_nofollow in
let flags = if follow then X.Flags.empty_path else X.Flags.(empty_path + symlink_nofollow) in
match fd with
| Fs -> statx ~mask path buf flags
| Fs -> statx_raw ~mask path buf flags
| FD fd when path = "" -> statx_raw ~fd ~mask "" buf flags
| Cwd | FD _ when not follow ->
with_parent_dir_fd fd path @@ fun parent leaf ->
statx ~mask ~fd:parent leaf buf flags
statx_raw ~mask ~fd:parent leaf buf flags
| Cwd | FD _ ->
Switch.run ~name:"statx" @@ fun sw ->
let fd = openat ~sw ~seekable:false fd (if path = "" then "." else path)
~access:`R
~flags:Uring.Open_flags.(cloexec + path)
~perm:0
in
statx ~fd ~mask "" buf Uring.Statx.Flags.(flags + empty_path)
statx_raw ~fd ~mask "" buf flags

let mkdir_beneath ~perm dir path =
let mkdir ~perm dir path =
(* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *)
with_parent_dir "mkdir" dir path @@ fun parent leaf ->
try eio_mkdirat parent leaf perm
Expand Down Expand Up @@ -470,12 +471,6 @@ let accept ~sw fd =
client, client_addr
)

let open_dir ~sw dir path =
openat ~sw ~seekable:false dir path
~access:`R
~flags:Uring.Open_flags.(cloexec + directory)
~perm:0

let read_dir fd =
Fd.use_exn "read_dir" fd @@ fun fd ->
let rec read_all acc fd =
Expand Down
Loading
Loading