Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batched copy for regular and block device files #748

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion bench/bench_copy.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ let chunk_size = 1 lsl 16
let n_chunks = 10000
let n_bytes = n_chunks * chunk_size

let rec await pid =
match Unix.waitpid [] pid with
| (_pid, status) -> status
| exception Unix.Unix_error (Unix.EINTR, _, _) -> await pid

let run prog args =
await (Unix.create_process prog (Array.of_list (prog :: args)) Unix.stdin Unix.stdout Unix.stderr)

let check_status = function
| Unix.WEXITED 0 -> ()
| _ -> assert false

let generate_file name =
run "fallocate" [ "-l"; string_of_int n_bytes; name ]
|> check_status

let run_client sock =
Fiber.both
(fun () ->
Expand Down Expand Up @@ -35,8 +51,26 @@ let time name service =
traceln "%s: %.2f MB/s" name (bytes_per_second /. 1024. /. 1024.);
Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy")

let run _env =
let time_fs fs name service =
let ( / ) = Eio.Path.(/) in
let fname_in = "cptest.in" in
let fname_out ="cptest.out" in
if not (Sys.file_exists fname_in) then generate_file fname_in;
Eio.Path.with_open_in (fs / fname_in) @@ fun inflow ->
Eio.Path.with_open_out ~create:(`Exclusive 0o644) (fs / fname_out) @@ fun outflow ->
let t0 = Unix.gettimeofday () in
service inflow outflow;
let t1 = Unix.gettimeofday () in
let time = t1 -. t0 in
let bytes_per_second = float n_bytes /. time in
traceln "%s: %.2f MB/s" name (bytes_per_second /. 1024. /. 1024.);
at_exit (fun () -> try Sys.remove fname_in with _ -> ());
Sys.remove fname_out;
Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy")

let run env =
[
time_fs env#fs "default_fs" (fun inflow outflow -> Eio.Flow.copy inflow outflow);
time "default" (fun sock -> Eio.Flow.copy sock sock);
time "buf_read" (fun sock ->
let r = Eio.Buf_read.of_flow sock ~initial_size:(64 * 1024) ~max_size:(64 * 1024) |> Eio.Buf_read.as_flow in
Expand Down
48 changes: 39 additions & 9 deletions lib_eio_linux/flow.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
open Eio.Std

(* When copying between files of finite size (e.g. regular files and
block devices), we want to batch up read and write requests to
submit at one go *)
let batch_copy src dst =
let read_then_write_chunk infd outfd file_offset =
let buf = Low_level.alloc_fixed_or_wait () in
let len = Uring.Region.length buf in
Low_level.read_exactly ~file_offset infd buf len;
Low_level.write ~file_offset outfd buf len;
Low_level.free_fixed buf
in
let copy_file infd outfd insize block_size =
let module Int63 = Optint.Int63 in
Switch.run @@ fun sw ->
let rec copy_block file_offset =
let remaining = Int63.(sub insize file_offset) in
if remaining <> Int63.zero then (
let len = Int63.to_int (min (Int63.of_int block_size) remaining) in
Fiber.fork ~sw (fun () -> read_then_write_chunk infd outfd file_offset);
copy_block Int63.(add file_offset (of_int len))
)
in
copy_block Int63.zero
in
let insize = (Low_level.fstat src).size in
copy_file src dst insize 4096

(* When copying between a source with an FD and a sink with an FD, we can share the chunk
and avoid copying. *)
let fast_copy src dst =
Expand All @@ -13,14 +40,17 @@ let fast_copy src dst =
done
with End_of_file -> ()
in
Low_level.with_chunk ~fallback @@ fun chunk ->
let chunk_size = Uring.Region.length chunk in
try
while true do
let got = Low_level.read_upto src chunk chunk_size in
Low_level.write dst chunk got
done
with End_of_file -> ()
match (Low_level.fstat src).kind with
| `Block_device | `Regular_file -> batch_copy src dst
| _ ->
Low_level.with_chunk ~fallback @@ fun chunk ->
let chunk_size = Uring.Region.length chunk in
try
while true do
let got = Low_level.read_upto src chunk chunk_size in
Low_level.write dst chunk got
done
with End_of_file -> ()

(* Try a fast copy using splice. If the FDs don't support that, switch to copying. *)
let _fast_copy_try_splice src dst =
Expand All @@ -35,7 +65,7 @@ let _fast_copy_try_splice src dst =

(* XXX workaround for issue #319, PR #327 *)
let fast_copy_try_splice src dst = fast_copy src dst

let[@tail_mod_cons] rec list_take n = function
| [] -> []
| x :: xs ->
Expand Down
Loading