From e9027f6ff90ef54d5731bc80f448c828b6f7bfbd Mon Sep 17 00:00:00 2001 From: Pierre Chambart Date: Wed, 4 Mar 2020 16:06:15 +0100 Subject: [PATCH 1/7] Add Lwt_unix.pread and pwrite --- src/unix/dune | 8 +++ src/unix/lwt_unix.cppo.ml | 30 +++++++++ src/unix/lwt_unix.cppo.mli | 24 +++++++ src/unix/unix_c/unix_pread.c | 22 +++++++ src/unix/unix_c/unix_pread_job.c | 80 +++++++++++++++++++++++ src/unix/unix_c/unix_pwrite.c | 23 +++++++ src/unix/unix_c/unix_pwrite_job.c | 53 +++++++++++++++ src/unix/windows_c/windows_pread.c | 50 +++++++++++++++ src/unix/windows_c/windows_pread_job.c | 85 +++++++++++++++++++++++++ src/unix/windows_c/windows_pwrite.c | 50 +++++++++++++++ src/unix/windows_c/windows_pwrite_job.c | 73 +++++++++++++++++++++ src/unix/windows_c/windows_read.c | 2 +- 12 files changed, 499 insertions(+), 1 deletion(-) create mode 100644 src/unix/unix_c/unix_pread.c create mode 100644 src/unix/unix_c/unix_pread_job.c create mode 100644 src/unix/unix_c/unix_pwrite.c create mode 100644 src/unix/unix_c/unix_pwrite_job.c create mode 100644 src/unix/windows_c/windows_pread.c create mode 100644 src/unix/windows_c/windows_pread_job.c create mode 100644 src/unix/windows_c/windows_pwrite.c create mode 100644 src/unix/windows_c/windows_pwrite_job.c diff --git a/src/unix/dune b/src/unix/dune index 15308961b6..3c1d6fe55f 100644 --- a/src/unix/dune +++ b/src/unix/dune @@ -45,11 +45,15 @@ windows_get_page_size unix_mincore unix_read + unix_pread windows_read + windows_pread unix_bytes_read windows_bytes_read unix_write + unix_pwrite windows_write + windows_pwrite unix_bytes_write windows_bytes_write unix_readv_writev_utils @@ -84,11 +88,15 @@ unix_wait_mincore_job unix_open_job unix_read_job + unix_pread_job windows_read_job + windows_pread_job unix_bytes_read_job windows_bytes_read_job unix_write_job windows_write_job + unix_pwrite_job + windows_pwrite_job unix_bytes_write_job windows_bytes_write_job unix_stat_job_utils diff --git a/src/unix/lwt_unix.cppo.ml b/src/unix/lwt_unix.cppo.ml index a4b8e66b97..f35418dc54 100644 --- a/src/unix/lwt_unix.cppo.ml +++ b/src/unix/lwt_unix.cppo.ml @@ -631,6 +631,8 @@ let wait_read ch = external stub_read : Unix.file_descr -> Bytes.t -> int -> int -> int = "lwt_unix_read" external read_job : Unix.file_descr -> Bytes.t -> int -> int -> int job = "lwt_unix_read_job" +external stub_pread : Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int = "lwt_unix_pread" +external pread_job : Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int job = "lwt_unix_pread_job" let read ch buf pos len = if pos < 0 || len < 0 || pos > Bytes.length buf - len then @@ -643,6 +645,17 @@ let read ch buf pos len = | false -> wrap_syscall Read ch (fun () -> stub_read ch.fd buf pos len) +let pread ch buf ~file_offset pos len = + if pos < 0 || len < 0 || pos > Bytes.length buf - len then + invalid_arg "Lwt_unix.pread" + else + Lazy.force ch.blocking >>= function + | true -> + wait_read ch >>= fun () -> + run_job (pread_job ch.fd buf ~file_offset pos len) + | false -> + wrap_syscall Read ch (fun () -> stub_pread ch.fd buf ~file_offset pos len) + external stub_read_bigarray : Unix.file_descr -> bigarray -> int -> int -> int = "lwt_unix_bytes_read" external read_bigarray_job : @@ -672,6 +685,8 @@ let wait_write ch = external stub_write : Unix.file_descr -> Bytes.t -> int -> int -> int = "lwt_unix_write" external write_job : Unix.file_descr -> Bytes.t -> int -> int -> int job = "lwt_unix_write_job" +external stub_pwrite : Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int = "lwt_unix_pwrite" +external pwrite_job : Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int job = "lwt_unix_pwrite_job" let write ch buf pos len = if pos < 0 || len < 0 || pos > Bytes.length buf - len then @@ -684,10 +699,25 @@ let write ch buf pos len = | false -> wrap_syscall Write ch (fun () -> stub_write ch.fd buf pos len) +let pwrite ch buf ~file_offset pos len = + if pos < 0 || len < 0 || pos > Bytes.length buf - len then + invalid_arg "Lwt_unix.write" + else + Lazy.force ch.blocking >>= function + | true -> + wait_write ch >>= fun () -> + run_job (pwrite_job ch.fd buf ~file_offset pos len) + | false -> + wrap_syscall Write ch (fun () -> stub_pwrite ch.fd buf ~file_offset pos len) + let write_string ch buf pos len = let buf = Bytes.unsafe_of_string buf in write ch buf pos len +let pwrite_string ch buf ~file_offset pos len = + let buf = Bytes.unsafe_of_string buf in + pwrite ch buf ~file_offset pos len + external stub_write_bigarray : Unix.file_descr -> bigarray -> int -> int -> int = "lwt_unix_bytes_write" external write_bigarray_job : diff --git a/src/unix/lwt_unix.cppo.mli b/src/unix/lwt_unix.cppo.mli index 16e76a9b83..2d31e6d1e1 100644 --- a/src/unix/lwt_unix.cppo.mli +++ b/src/unix/lwt_unix.cppo.mli @@ -298,6 +298,16 @@ val read : file_descr -> bytes -> int -> int -> int Lwt.t except [Unix.Unix_error Unix.EAGAIN], [Unix.Unix_error Unix.EWOULDBLOCK] or [Unix.Unix_error Unix.EINTR]. *) +val pread : file_descr -> bytes -> file_offset:int -> int -> int -> int Lwt.t +(** [pread fd buf ~file_offset ofs len] on file descriptors allowing seek, + reads up to [len] bytes from [fd] at offset [file_offset] from the + beginning of the file, and writes them to [buf], starting at offset [ofs]. + + The current position of the file descriptor does not change. + + The thread can fail with any exception that can be raised by [read] or + [lseek]. *) + val write : file_descr -> bytes -> int -> int -> int Lwt.t (** [write fd buf ofs len] writes up to [len] bytes to [fd] from [buf], starting at buffer offset [ofs]. The function immediately evaluates to an Lwt thread, @@ -315,9 +325,23 @@ val write : file_descr -> bytes -> int -> int -> int Lwt.t [Unix.single_write], except [Unix.Unix_error Unix.EAGAIN], [Unix.Unix_error Unix.EWOULDBLOCK] or [Unix.Unix_error Unix.EINTR]. *) +val pwrite : file_descr -> bytes -> file_offset:int -> int -> int -> int Lwt.t +(** [pwrite fd buf ~file_offset ofs len] on file descriptors allowing seek, + writes up to [len] bytes to [fd] from [buf], starting at buffer offset + [ofs]. The data is written at offset [file_offset] from the beginning + of [fd]. + + The current position of the file descriptor does not change. + + The thread can fail with any exception that can be raised by [write] or + [lseek]. *) + val write_string : file_descr -> string -> int -> int -> int Lwt.t (** See {!write}. *) +val pwrite_string : file_descr -> string -> file_offset:int -> int -> int -> int Lwt.t + (** See {!pwrite}. *) + (** Sequences of buffer slices for {!writev}. *) module IO_vectors : sig diff --git a/src/unix/unix_c/unix_pread.c b/src/unix/unix_c/unix_pread.c new file mode 100644 index 0000000000..765cb5dc6a --- /dev/null +++ b/src/unix/unix_c/unix_pread.c @@ -0,0 +1,22 @@ +/* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. */ + + + +#include "lwt_config.h" + +#if !defined(LWT_ON_WINDOWS) + +#include +#include + +CAMLprim value lwt_unix_pread(value val_fd, value val_buf, value val_file_ofs, + value val_ofs, value val_len) +{ + long ret; + ret = pread(Int_val(val_fd), &Byte(String_val(val_buf), Long_val(val_ofs)), + Long_val(val_len), Long_val(val_file_ofs)); + if (ret == -1) uerror("pread", Nothing); + return Val_long(ret); +} +#endif diff --git a/src/unix/unix_c/unix_pread_job.c b/src/unix/unix_c/unix_pread_job.c new file mode 100644 index 0000000000..5b15201c8f --- /dev/null +++ b/src/unix/unix_c/unix_pread_job.c @@ -0,0 +1,80 @@ +/* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. */ + + + +#include "lwt_config.h" + +#if !defined(LWT_ON_WINDOWS) + +#include +#include +#include +#include +#include +#include +#include + +#include "lwt_unix.h" + +#if OCAML_VERSION < 40600 +#define Bytes_val(x) String_val(x) +#endif + +struct job_pread { + struct lwt_unix_job job; + /* The file descriptor. */ + int fd; + /* The amount of data to read. */ + long length; + /* The offset in the file */ + off_t file_offset; + /* The OCaml string. */ + value string; + /* The offset in the string. */ + long offset; + /* The result of the pread syscall. */ + long result; + /* The value of errno. */ + int error_code; + /* The temporary buffer. */ + char buffer[]; +}; + +static void worker_pread(struct job_pread *job) +{ + job->result = pread(job->fd, job->buffer, job->length, job->file_offset); + job->error_code = errno; +} + +static value result_pread(struct job_pread *job) +{ + long result = job->result; + if (result < 0) { + int error_code = job->error_code; + caml_remove_generational_global_root(&(job->string)); + lwt_unix_free_job(&job->job); + unix_error(error_code, "pread", Nothing); + } else { + memcpy(Bytes_val(job->string) + job->offset, job->buffer, result); + caml_remove_generational_global_root(&(job->string)); + lwt_unix_free_job(&job->job); + return Val_long(result); + } +} + +CAMLprim value lwt_unix_pread_job(value val_fd, value val_buffer, + value val_file_offset, value val_offset, + value val_length) +{ + long length = Long_val(val_length); + LWT_UNIX_INIT_JOB(job, pread, length); + job->fd = Int_val(val_fd); + job->length = length; + job->file_offset = Long_val(val_file_offset); + job->string = val_buffer; + job->offset = Long_val(val_offset); + caml_register_generational_global_root(&(job->string)); + return lwt_unix_alloc_job(&(job->job)); +} +#endif diff --git a/src/unix/unix_c/unix_pwrite.c b/src/unix/unix_c/unix_pwrite.c new file mode 100644 index 0000000000..03d19efeb4 --- /dev/null +++ b/src/unix/unix_c/unix_pwrite.c @@ -0,0 +1,23 @@ +/* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. */ + + + +#include "lwt_config.h" + +#if !defined(LWT_ON_WINDOWS) + +#include +#include +#include + +CAMLprim value lwt_unix_pwrite(value val_fd, value val_buf, value val_file_ofs, + value val_ofs, value val_len) +{ + long ret; + ret = pwrite(Int_val(val_fd), &Byte(String_val(val_buf), Long_val(val_ofs)), + Long_val(val_len), Long_val(val_file_ofs)); + if (ret == -1) uerror("pwrite", Nothing); + return Val_long(ret); +} +#endif diff --git a/src/unix/unix_c/unix_pwrite_job.c b/src/unix/unix_c/unix_pwrite_job.c new file mode 100644 index 0000000000..e62f9ec949 --- /dev/null +++ b/src/unix/unix_c/unix_pwrite_job.c @@ -0,0 +1,53 @@ +/* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. */ + + + +#include "lwt_config.h" + +#if !defined(LWT_ON_WINDOWS) + +#include +#include +#include +#include + +#include "lwt_unix.h" + +struct job_pwrite { + struct lwt_unix_job job; + int fd; + long length; + off_t file_offset; + long result; + int error_code; + char buffer[]; +}; + +static void worker_pwrite(struct job_pwrite *job) +{ + job->result = pwrite(job->fd, job->buffer, job->length, job->file_offset); + job->error_code = errno; +} + +static value result_pwrite(struct job_pwrite *job) +{ + long result = job->result; + LWT_UNIX_CHECK_JOB(job, result < 0, "pwrite"); + lwt_unix_free_job(&job->job); + return Val_long(result); +} + +CAMLprim value lwt_unix_pwrite_job(value val_fd, value val_string, + value val_file_offset, value val_offset, + value val_length) +{ + long length = Long_val(val_length); + LWT_UNIX_INIT_JOB(job, pwrite, length); + job->fd = Int_val(val_fd); + job->length = length; + job->file_offset = Long_val(val_file_offset); + memcpy(job->buffer, String_val(val_string) + Long_val(val_offset), length); + return lwt_unix_alloc_job(&(job->job)); +} +#endif diff --git a/src/unix/windows_c/windows_pread.c b/src/unix/windows_c/windows_pread.c new file mode 100644 index 0000000000..9511fe6069 --- /dev/null +++ b/src/unix/windows_c/windows_pread.c @@ -0,0 +1,50 @@ +/* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. */ + + + +#include "lwt_config.h" + +#if defined(LWT_ON_WINDOWS) + +#include +#include +#include +#include + +CAMLprim value lwt_unix_pread(value fd, value buf, value vfile_offset, + value vofs, value vlen) +{ + intnat ofs, len, file_offset, written; + DWORD numbytes, numwritten; + DWORD err = 0; + + Begin_root(buf); + ofs = Long_val(vofs); + len = Long_val(vlen); + file_offset = Long_val(vfile_offset); + written = 0; + if (len > 0) { + numbytes = len; + if (Descr_kind_val(fd) == KIND_SOCKET) { + caml_invalid_argument("Lwt_unix.pread"); + } else { + HANDLE h = Handle_val(fd); + OVERLAPPED overlapped; + memset( &overlapped, 0, sizeof(overlapped)); + overlapped.OffsetHigh = (DWORD)(file_offset >> 32); + overlapped.Offset = (DWORD)(file_offset & 0xFFFFFFFFLL); + if (!ReadFile(h, &Byte(buf, ofs), numbytes, &numwritten, + &overlapped)) + err = GetLastError(); + } + if (err) { + win32_maperr(err); + uerror("pread", Nothing); + } + written = numwritten; + } + End_roots(); + return Val_long(written); +} +#endif diff --git a/src/unix/windows_c/windows_pread_job.c b/src/unix/windows_c/windows_pread_job.c new file mode 100644 index 0000000000..2ffc53f7d3 --- /dev/null +++ b/src/unix/windows_c/windows_pread_job.c @@ -0,0 +1,85 @@ +/* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. */ + + + +#include "lwt_config.h" + +#if defined(LWT_ON_WINDOWS) + +#include +#include +#include +#include +#include + +#include "lwt_unix.h" + +#if OCAML_VERSION < 40600 +#define Bytes_val(x) String_val(x) +#endif + +struct job_pread { + struct lwt_unix_job job; + HANDLE handle; + DWORD length; + DWORD Offset; + DWORD OffsetHigh; + DWORD result; + DWORD error_code; + value string; + DWORD offset; + char buffer[]; +}; + +static void worker_pread(struct job_pread *job) +{ + OVERLAPPED overlapped; + memset( &overlapped, 0, sizeof(overlapped)); + overlapped.OffsetHigh = job->OffsetHigh; + overlapped.Offset = job->Offset; + if (!ReadFile(job->handle, job->buffer, job->length, &(job->result), + &overlapped)) + job->error_code = GetLastError(); +} + +static value result_pread(struct job_pread *job) +{ + value result; + DWORD error = job->error_code; + if (error) { + caml_remove_generational_global_root(&job->string); + lwt_unix_free_job(&job->job); + win32_maperr(error); + uerror("pread", Nothing); + } + memcpy(Bytes_val(job->string) + job->offset, job->buffer, job->result); + result = Val_long(job->result); + caml_remove_generational_global_root(&job->string); + lwt_unix_free_job(&job->job); + return result; +} + +CAMLprim value lwt_unix_pread_job(value val_fd, value val_string, + value val_file_offset, value val_offset, + value val_length) +{ + struct filedescr *fd = (struct filedescr *)Data_custom_val(val_fd); + long length = Long_val(val_length); + DWORDLONG file_offset = Long_val(val_file_offset); + if (fd->kind != KIND_HANDLE) { + caml_invalid_argument("Lwt_unix.pread"); + } else { + LWT_UNIX_INIT_JOB(job, pread, length); + job->handle = fd->fd.handle; + job->length = length; + job->OffsetHigh = (DWORD)(file_offset >> 32); + job->Offset = (DWORD)(file_offset & 0xFFFFFFFFLL); + job->error_code = 0; + job->string = val_string; + job->offset = Long_val(val_offset); + caml_register_generational_global_root(&(job->string)); + return lwt_unix_alloc_job(&(job->job)); + } +} +#endif diff --git a/src/unix/windows_c/windows_pwrite.c b/src/unix/windows_c/windows_pwrite.c new file mode 100644 index 0000000000..29ce392d2b --- /dev/null +++ b/src/unix/windows_c/windows_pwrite.c @@ -0,0 +1,50 @@ +/* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. */ + + + +#include "lwt_config.h" + +#if defined(LWT_ON_WINDOWS) + +#include +#include +#include +#include + +CAMLprim value lwt_unix_pwrite(value fd, value buf, value vfile_offset, + value vofs, value vlen) +{ + intnat ofs, len, file_offset, written; + DWORD numbytes, numwritten; + DWORD err = 0; + + Begin_root(buf); + ofs = Long_val(vofs); + len = Long_val(vlen); + file_offset = Long_val(vfile_offset); + written = 0; + if (len > 0) { + numbytes = len; + if (Descr_kind_val(fd) == KIND_SOCKET) { + caml_invalid_argument("Lwt_unix.pwrite"); + } else { + HANDLE h = Handle_val(fd); + OVERLAPPED overlapped; + memset( &overlapped, 0, sizeof(overlapped)); + overlapped.OffsetHigh = (DWORD)(file_offset >> 32); + overlapped.Offset = (DWORD)(file_offset & 0xFFFFFFFFLL); + if (!WriteFile(h, &Byte(buf, ofs), numbytes, &numwritten, + &overlapped)) + err = GetLastError(); + } + if (err) { + win32_maperr(err); + uerror("pwrite", Nothing); + } + written = numwritten; + } + End_roots(); + return Val_long(written); +} +#endif diff --git a/src/unix/windows_c/windows_pwrite_job.c b/src/unix/windows_c/windows_pwrite_job.c new file mode 100644 index 0000000000..4ca9b60552 --- /dev/null +++ b/src/unix/windows_c/windows_pwrite_job.c @@ -0,0 +1,73 @@ +/* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. */ + + + +#include "lwt_config.h" + +#if defined(LWT_ON_WINDOWS) + +#include +#include +#include +#include + +#include "lwt_unix.h" + +struct job_pwrite { + struct lwt_unix_job job; + HANDLE handle; + DWORD length; + DWORD Offset; + DWORD OffsetHigh; + DWORD result; + DWORD error_code; + char buffer[]; +}; + +static void worker_pwrite(struct job_pwrite *job) +{ + OVERLAPPED overlapped; + memset( &overlapped, 0, sizeof(overlapped)); + overlapped.OffsetHigh = job->OffsetHigh; + overlapped.Offset = job->Offset; + if (!WriteFile(job->handle, job->buffer, job->length, &(job->result), + &overlapped)) + job->error_code = GetLastError(); +} + +static value result_pwrite(struct job_pwrite *job) +{ + value result; + DWORD error = job->error_code; + if (error) { + lwt_unix_free_job(&job->job); + win32_maperr(error); + uerror("pwrite", Nothing); + } + result = Val_long(job->result); + lwt_unix_free_job(&job->job); + return result; +} + +CAMLprim value lwt_unix_pwrite_job(value val_fd, value val_string, + value val_file_offset, value val_offset, + value val_length) +{ + struct filedescr *fd = (struct filedescr *)Data_custom_val(val_fd); + long length = Long_val(val_length); + DWORDLONG file_offset = Long_val(val_file_offset); + if (fd->kind != KIND_HANDLE) { + caml_invalid_argument("Lwt_unix.pwrite"); + } else { + LWT_UNIX_INIT_JOB(job, pwrite, length); + job->handle = fd->fd.handle; + memcpy(job->buffer, String_val(val_string) + Long_val(val_offset), length); + job->length = length; + job->OffsetHigh = (DWORD)(file_offset >> 32); + job->Offset = (DWORD)(file_offset & 0xFFFFFFFFLL); + job->error_code = 0; + return lwt_unix_alloc_job(&(job->job)); + } +} +#endif diff --git a/src/unix/windows_c/windows_read.c b/src/unix/windows_c/windows_read.c index fe3b297c57..1beeafa746 100644 --- a/src/unix/windows_c/windows_read.c +++ b/src/unix/windows_c/windows_read.c @@ -36,7 +36,7 @@ CAMLprim value lwt_unix_read(value fd, value buf, value vofs, value vlen) } if (err) { win32_maperr(err); - uerror("write", Nothing); + uerror("read", Nothing); } written = numwritten; } From 0c16ea8533230466b526f9437ef96f4adbf9e46e Mon Sep 17 00:00:00 2001 From: Pierre Chambart Date: Thu, 5 Mar 2020 00:45:41 +0100 Subject: [PATCH 2/7] Test pread and pwrite --- test/unix/test_lwt_unix.cppo.ml | 65 ++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/test/unix/test_lwt_unix.cppo.ml b/test/unix/test_lwt_unix.cppo.ml index 73a424d39c..9637bbd6ef 100644 --- a/test/unix/test_lwt_unix.cppo.ml +++ b/test/unix/test_lwt_unix.cppo.ml @@ -1104,6 +1104,68 @@ let lwt_user_tests = [ end ] +let file_suffix = + let last_file_suffix = ref 0 in + fun () -> + incr last_file_suffix; + !last_file_suffix + +let test_filename name = + Printf.sprintf "%s_%i" name (file_suffix ()) + +let pread_tests = + let test_file = test_filename "test_pread_pwrite" in + let file_contents = "01234567890123456789" in + [ + test ~sequential:true "basic pread" + (fun () -> + Lwt_unix.openfile test_file [O_RDWR; O_TRUNC; O_CREAT] 0o666 >>= fun fd -> + Lwt_unix.write_string fd file_contents 0 (String.length file_contents) >>= fun n -> + assert(n = String.length file_contents); + (* This should always be true in practice, show it if this is the reason for failing *) + let buf = Bytes.make 3 '\x00' in + Lwt_unix.pread fd buf ~file_offset:3 0 3 >>= fun n -> + assert(n = 3); + let read1 = Bytes.to_string buf in + Lwt_unix.pread fd buf ~file_offset:15 0 3 >>= fun n -> + assert(n = 3); + let read2 = Bytes.to_string buf in + Lwt_unix.close fd >>= fun () -> + Lwt.return (read1 = "345" && read2 = "567")); + + test ~sequential:true "pread does not seek" + (fun () -> + Lwt_unix.openfile test_file [O_RDWR] 0o666 >>= fun fd -> + let buf = Bytes.make 3 '\x00' in + Lwt_unix.pread fd buf ~file_offset:3 0 3 >>= fun _n -> + Lwt_unix.read fd buf 0 3 >>= fun n -> + assert(n = 3); + let read = Bytes.to_string buf in + Lwt_unix.close fd >>= fun () -> + Lwt.return (read = "012")); + + test ~sequential:true "basic pwrite" + (fun () -> + Lwt_unix.openfile test_file [O_RDWR] 0o666 >>= fun fd -> + let t1 = Lwt_unix.pwrite_string fd "abcd" ~file_offset:5 0 4 in + let t2 = Lwt_unix.pwrite_string fd "efg" ~file_offset:15 0 3 in + t2 >>= fun l2 -> + t1 >>= fun l1 -> + assert(l1 = 4); + assert(l2 = 3); + let buf = Bytes.make (String.length file_contents) '\x00' in + Lwt_unix.read fd buf 0 (String.length file_contents) >>= fun n -> + assert(n = (String.length file_contents)); + Lwt_unix.close fd >>= fun () -> + let read = Bytes.to_string buf in + Lwt.return (read = "01234abcd901234efg89")); + + test ~sequential:true "remove file" + (fun () -> + Unix.unlink test_file; + Lwt.return_true); +] + let suite = suite "lwt_unix" (wait_tests @ @@ -1117,5 +1179,6 @@ let suite = bind_tests @ dir_tests @ lwt_preemptive_tests @ - lwt_user_tests + lwt_user_tests @ + pread_tests ) From 6e14d46d754ea00def928148196b74807a459c2b Mon Sep 17 00:00:00 2001 From: Pierre Chambart Date: Thu, 5 Mar 2020 01:03:54 +0100 Subject: [PATCH 3/7] The FilePointer is updated by ReadFile As stated in https://stackoverflow.com/questions/30278873/is-the-overlapped-structure-updated-when-using-readfile the position in the file is updated contrary to what Microsoft documentation states https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile#synchronization-and-file-position --- src/unix/lwt_unix.cppo.mli | 6 ++++-- test/unix/test_lwt_unix.cppo.ml | 12 +----------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/src/unix/lwt_unix.cppo.mli b/src/unix/lwt_unix.cppo.mli index 2d31e6d1e1..a7be80482e 100644 --- a/src/unix/lwt_unix.cppo.mli +++ b/src/unix/lwt_unix.cppo.mli @@ -303,7 +303,8 @@ val pread : file_descr -> bytes -> file_offset:int -> int -> int -> int Lwt.t reads up to [len] bytes from [fd] at offset [file_offset] from the beginning of the file, and writes them to [buf], starting at offset [ofs]. - The current position of the file descriptor does not change. + The current position of the file descriptor is undefined when [pread] + returns. The thread can fail with any exception that can be raised by [read] or [lseek]. *) @@ -331,7 +332,8 @@ val pwrite : file_descr -> bytes -> file_offset:int -> int -> int -> int Lwt.t [ofs]. The data is written at offset [file_offset] from the beginning of [fd]. - The current position of the file descriptor does not change. + The current position of the file descriptor is undefined when [pwrite] + returns. The thread can fail with any exception that can be raised by [write] or [lseek]. *) diff --git a/test/unix/test_lwt_unix.cppo.ml b/test/unix/test_lwt_unix.cppo.ml index 9637bbd6ef..026338ab80 100644 --- a/test/unix/test_lwt_unix.cppo.ml +++ b/test/unix/test_lwt_unix.cppo.ml @@ -1133,17 +1133,6 @@ let pread_tests = Lwt_unix.close fd >>= fun () -> Lwt.return (read1 = "345" && read2 = "567")); - test ~sequential:true "pread does not seek" - (fun () -> - Lwt_unix.openfile test_file [O_RDWR] 0o666 >>= fun fd -> - let buf = Bytes.make 3 '\x00' in - Lwt_unix.pread fd buf ~file_offset:3 0 3 >>= fun _n -> - Lwt_unix.read fd buf 0 3 >>= fun n -> - assert(n = 3); - let read = Bytes.to_string buf in - Lwt_unix.close fd >>= fun () -> - Lwt.return (read = "012")); - test ~sequential:true "basic pwrite" (fun () -> Lwt_unix.openfile test_file [O_RDWR] 0o666 >>= fun fd -> @@ -1153,6 +1142,7 @@ let pread_tests = t1 >>= fun l1 -> assert(l1 = 4); assert(l2 = 3); + Lwt_unix.lseek fd 0 Lwt_unix.SEEK_SET >>= fun _pos -> let buf = Bytes.make (String.length file_contents) '\x00' in Lwt_unix.read fd buf 0 (String.length file_contents) >>= fun n -> assert(n = (String.length file_contents)); From 18947a3019f7cd4b2182cb4e65c790ef7589ccdc Mon Sep 17 00:00:00 2001 From: Pierre Chambart Date: Fri, 6 Mar 2020 18:52:26 +0100 Subject: [PATCH 4/7] Specify pread/pwrite effect on current position --- src/unix/lwt_unix.cppo.mli | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/unix/lwt_unix.cppo.mli b/src/unix/lwt_unix.cppo.mli index a7be80482e..767751660b 100644 --- a/src/unix/lwt_unix.cppo.mli +++ b/src/unix/lwt_unix.cppo.mli @@ -303,8 +303,8 @@ val pread : file_descr -> bytes -> file_offset:int -> int -> int -> int Lwt.t reads up to [len] bytes from [fd] at offset [file_offset] from the beginning of the file, and writes them to [buf], starting at offset [ofs]. - The current position of the file descriptor is undefined when [pread] - returns. + On Unix systems, the file descriptor position is unaffected. On Windows + it is changed to be just after the last read position. The thread can fail with any exception that can be raised by [read] or [lseek]. *) @@ -332,8 +332,8 @@ val pwrite : file_descr -> bytes -> file_offset:int -> int -> int -> int Lwt.t [ofs]. The data is written at offset [file_offset] from the beginning of [fd]. - The current position of the file descriptor is undefined when [pwrite] - returns. + On Unix systems, the file descriptor position is unaffected. On Windows + it is changed to be just after the last written position. The thread can fail with any exception that can be raised by [write] or [lseek]. *) From 2e6cde23a932832ae18ae6b6ce1132b310e31d60 Mon Sep 17 00:00:00 2001 From: Pierre Chambart Date: Fri, 6 Mar 2020 18:53:26 +0100 Subject: [PATCH 5/7] Fix error string for pwrite --- src/unix/lwt_unix.cppo.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/unix/lwt_unix.cppo.ml b/src/unix/lwt_unix.cppo.ml index f35418dc54..e2c2437c1c 100644 --- a/src/unix/lwt_unix.cppo.ml +++ b/src/unix/lwt_unix.cppo.ml @@ -701,7 +701,7 @@ let write ch buf pos len = let pwrite ch buf ~file_offset pos len = if pos < 0 || len < 0 || pos > Bytes.length buf - len then - invalid_arg "Lwt_unix.write" + invalid_arg "Lwt_unix.pwrite" else Lazy.force ch.blocking >>= function | true -> From 100827c45d98051b3c54824f155b9beea53a3a73 Mon Sep 17 00:00:00 2001 From: Pierre Chambart Date: Fri, 6 Mar 2020 19:05:46 +0100 Subject: [PATCH 6/7] Test both blocking and non-blocking pwrite/pread --- test/unix/test_lwt_unix.cppo.ml | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/test/unix/test_lwt_unix.cppo.ml b/test/unix/test_lwt_unix.cppo.ml index 026338ab80..f2377f6629 100644 --- a/test/unix/test_lwt_unix.cppo.ml +++ b/test/unix/test_lwt_unix.cppo.ml @@ -1113,13 +1113,20 @@ let file_suffix = let test_filename name = Printf.sprintf "%s_%i" name (file_suffix ()) -let pread_tests = +let pread_tests ~blocking = let test_file = test_filename "test_pread_pwrite" in let file_contents = "01234567890123456789" in + let blocking_string = + if blocking then + " blocking" + else + " nonblocking" + in [ - test ~sequential:true "basic pread" + test ~sequential:true ("basic pread" ^ blocking_string) (fun () -> Lwt_unix.openfile test_file [O_RDWR; O_TRUNC; O_CREAT] 0o666 >>= fun fd -> + if not blocking then Lwt_unix.set_blocking ~set_flags:false fd false; Lwt_unix.write_string fd file_contents 0 (String.length file_contents) >>= fun n -> assert(n = String.length file_contents); (* This should always be true in practice, show it if this is the reason for failing *) @@ -1133,9 +1140,10 @@ let pread_tests = Lwt_unix.close fd >>= fun () -> Lwt.return (read1 = "345" && read2 = "567")); - test ~sequential:true "basic pwrite" + test ~sequential:true ("basic pwrite" ^ blocking_string) (fun () -> Lwt_unix.openfile test_file [O_RDWR] 0o666 >>= fun fd -> + if not blocking then Lwt_unix.set_blocking ~set_flags:false fd false; let t1 = Lwt_unix.pwrite_string fd "abcd" ~file_offset:5 0 4 in let t2 = Lwt_unix.pwrite_string fd "efg" ~file_offset:15 0 3 in t2 >>= fun l2 -> @@ -1150,7 +1158,7 @@ let pread_tests = let read = Bytes.to_string buf in Lwt.return (read = "01234abcd901234efg89")); - test ~sequential:true "remove file" + test ~sequential:true ("remove file" ^ blocking_string) (fun () -> Unix.unlink test_file; Lwt.return_true); @@ -1170,5 +1178,6 @@ let suite = dir_tests @ lwt_preemptive_tests @ lwt_user_tests @ - pread_tests + pread_tests ~blocking:true @ + pread_tests ~blocking:false ) From cfe21df2791e4fd87b442cb0c44aa5ccb819e11b Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Sat, 7 Mar 2020 09:48:59 +0300 Subject: [PATCH 7/7] Tweaks [skip ci] --- src/unix/lwt_unix.cppo.ml | 16 ++++++++++++---- src/unix/lwt_unix.cppo.mli | 3 ++- src/unix/windows_c/windows_pread_job.c | 24 ++++++++++++------------ src/unix/windows_c/windows_pwrite_job.c | 21 +++++++++++---------- test/unix/test_lwt_unix.cppo.ml | 9 ++++++--- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/src/unix/lwt_unix.cppo.ml b/src/unix/lwt_unix.cppo.ml index e2c2437c1c..2923227218 100644 --- a/src/unix/lwt_unix.cppo.ml +++ b/src/unix/lwt_unix.cppo.ml @@ -631,8 +631,12 @@ let wait_read ch = external stub_read : Unix.file_descr -> Bytes.t -> int -> int -> int = "lwt_unix_read" external read_job : Unix.file_descr -> Bytes.t -> int -> int -> int job = "lwt_unix_read_job" -external stub_pread : Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int = "lwt_unix_pread" -external pread_job : Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int job = "lwt_unix_pread_job" +external stub_pread : + Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int = + "lwt_unix_pread" +external pread_job : + Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int job = + "lwt_unix_pread_job" let read ch buf pos len = if pos < 0 || len < 0 || pos > Bytes.length buf - len then @@ -685,8 +689,12 @@ let wait_write ch = external stub_write : Unix.file_descr -> Bytes.t -> int -> int -> int = "lwt_unix_write" external write_job : Unix.file_descr -> Bytes.t -> int -> int -> int job = "lwt_unix_write_job" -external stub_pwrite : Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int = "lwt_unix_pwrite" -external pwrite_job : Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int job = "lwt_unix_pwrite_job" +external stub_pwrite : + Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int = + "lwt_unix_pwrite" +external pwrite_job : + Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int job = + "lwt_unix_pwrite_job" let write ch buf pos len = if pos < 0 || len < 0 || pos > Bytes.length buf - len then diff --git a/src/unix/lwt_unix.cppo.mli b/src/unix/lwt_unix.cppo.mli index 767751660b..c2f6b85c4c 100644 --- a/src/unix/lwt_unix.cppo.mli +++ b/src/unix/lwt_unix.cppo.mli @@ -341,7 +341,8 @@ val pwrite : file_descr -> bytes -> file_offset:int -> int -> int -> int Lwt.t val write_string : file_descr -> string -> int -> int -> int Lwt.t (** See {!write}. *) -val pwrite_string : file_descr -> string -> file_offset:int -> int -> int -> int Lwt.t +val pwrite_string : + file_descr -> string -> file_offset:int -> int -> int -> int Lwt.t (** See {!pwrite}. *) (** Sequences of buffer slices for {!writev}. *) diff --git a/src/unix/windows_c/windows_pread_job.c b/src/unix/windows_c/windows_pread_job.c index 2ffc53f7d3..cb729c78c0 100644 --- a/src/unix/windows_c/windows_pread_job.c +++ b/src/unix/windows_c/windows_pread_job.c @@ -40,7 +40,7 @@ static void worker_pread(struct job_pread *job) overlapped.Offset = job->Offset; if (!ReadFile(job->handle, job->buffer, job->length, &(job->result), &overlapped)) - job->error_code = GetLastError(); + job->error_code = GetLastError(); } static value result_pread(struct job_pread *job) @@ -68,18 +68,18 @@ CAMLprim value lwt_unix_pread_job(value val_fd, value val_string, long length = Long_val(val_length); DWORDLONG file_offset = Long_val(val_file_offset); if (fd->kind != KIND_HANDLE) { - caml_invalid_argument("Lwt_unix.pread"); + caml_invalid_argument("Lwt_unix.pread"); } else { - LWT_UNIX_INIT_JOB(job, pread, length); - job->handle = fd->fd.handle; - job->length = length; - job->OffsetHigh = (DWORD)(file_offset >> 32); - job->Offset = (DWORD)(file_offset & 0xFFFFFFFFLL); - job->error_code = 0; - job->string = val_string; - job->offset = Long_val(val_offset); - caml_register_generational_global_root(&(job->string)); - return lwt_unix_alloc_job(&(job->job)); + LWT_UNIX_INIT_JOB(job, pread, length); + job->handle = fd->fd.handle; + job->length = length; + job->OffsetHigh = (DWORD)(file_offset >> 32); + job->Offset = (DWORD)(file_offset & 0xFFFFFFFFLL); + job->error_code = 0; + job->string = val_string; + job->offset = Long_val(val_offset); + caml_register_generational_global_root(&(job->string)); + return lwt_unix_alloc_job(&(job->job)); } } #endif diff --git a/src/unix/windows_c/windows_pwrite_job.c b/src/unix/windows_c/windows_pwrite_job.c index 4ca9b60552..d56247a906 100644 --- a/src/unix/windows_c/windows_pwrite_job.c +++ b/src/unix/windows_c/windows_pwrite_job.c @@ -33,7 +33,7 @@ static void worker_pwrite(struct job_pwrite *job) overlapped.Offset = job->Offset; if (!WriteFile(job->handle, job->buffer, job->length, &(job->result), &overlapped)) - job->error_code = GetLastError(); + job->error_code = GetLastError(); } static value result_pwrite(struct job_pwrite *job) @@ -58,16 +58,17 @@ CAMLprim value lwt_unix_pwrite_job(value val_fd, value val_string, long length = Long_val(val_length); DWORDLONG file_offset = Long_val(val_file_offset); if (fd->kind != KIND_HANDLE) { - caml_invalid_argument("Lwt_unix.pwrite"); + caml_invalid_argument("Lwt_unix.pwrite"); } else { - LWT_UNIX_INIT_JOB(job, pwrite, length); - job->handle = fd->fd.handle; - memcpy(job->buffer, String_val(val_string) + Long_val(val_offset), length); - job->length = length; - job->OffsetHigh = (DWORD)(file_offset >> 32); - job->Offset = (DWORD)(file_offset & 0xFFFFFFFFLL); - job->error_code = 0; - return lwt_unix_alloc_job(&(job->job)); + LWT_UNIX_INIT_JOB(job, pwrite, length); + job->handle = fd->fd.handle; + memcpy( + job->buffer, String_val(val_string) + Long_val(val_offset), length); + job->length = length; + job->OffsetHigh = (DWORD)(file_offset >> 32); + job->Offset = (DWORD)(file_offset & 0xFFFFFFFFLL); + job->error_code = 0; + return lwt_unix_alloc_job(&(job->job)); } } #endif diff --git a/test/unix/test_lwt_unix.cppo.ml b/test/unix/test_lwt_unix.cppo.ml index f2377f6629..9cec114a71 100644 --- a/test/unix/test_lwt_unix.cppo.ml +++ b/test/unix/test_lwt_unix.cppo.ml @@ -1125,11 +1125,14 @@ let pread_tests ~blocking = [ test ~sequential:true ("basic pread" ^ blocking_string) (fun () -> - Lwt_unix.openfile test_file [O_RDWR; O_TRUNC; O_CREAT] 0o666 >>= fun fd -> + Lwt_unix.openfile test_file [O_RDWR; O_TRUNC; O_CREAT] 0o666 + >>= fun fd -> if not blocking then Lwt_unix.set_blocking ~set_flags:false fd false; - Lwt_unix.write_string fd file_contents 0 (String.length file_contents) >>= fun n -> + Lwt_unix.write_string fd file_contents 0 (String.length file_contents) + >>= fun n -> assert(n = String.length file_contents); - (* This should always be true in practice, show it if this is the reason for failing *) + (* This should always be true in practice, show it if this is the reason + for failing *) let buf = Bytes.make 3 '\x00' in Lwt_unix.pread fd buf ~file_offset:3 0 3 >>= fun n -> assert(n = 3);