Skip to content

Commit

Permalink
Optimise Flow.copy with Buf_read.as_flow
Browse files Browse the repository at this point in the history
By default, Flow.copy creates a 4KB buffer and copies the data through
that. However, if the source of the copy is a buffered reader then it is
much more efficient to use its buffer directly.

This updates the flow you get from `Buf_read.as_flow` to offer this
optimisation, and also updates the eio_posix backend's flow to use this
optimisation when available (eio_linux already supported this).

Detected in a benchmark by Leandro Ostera.
  • Loading branch information
talex5 committed Jan 2, 2024
1 parent fbe8a71 commit 19c43d7
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
4 changes: 3 additions & 1 deletion bench/bench_copy.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
(* A client opens a connection to an echo service and sends a load of data via it. *)

open Eio.Std

let chunk_size = 1 lsl 16
Expand Down Expand Up @@ -31,7 +33,7 @@ let time name service =
let time = t1 -. t0 in
let bytes_per_second = float n_bytes /. time in
traceln "%s: %.2f MB/s" name (bytes_per_second /. 1024. /. 1024.);
(Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy"))
Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy")

let run _env =
[
Expand Down
8 changes: 7 additions & 1 deletion lib_eio/buf_read.ml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,13 @@ module F = struct
consume t len;
len

let read_methods = []
let rsb t fn =
ensure t 1;
let data = peek t in
let sent = fn [data] in
consume t sent

let read_methods = [Flow.Read_source_buffer rsb]
end

let as_flow =
Expand Down
17 changes: 16 additions & 1 deletion lib_eio_posix/flow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,22 @@ module Impl = struct
with Unix.Unix_error (code, name, arg) ->
raise (Err.wrap code name arg)

let copy t ~src = Eio.Flow.Pi.simple_copy ~single_write t ~src
(* Copy using the [Read_source_buffer] optimisation.
Avoids a copy if the source already has the data. *)
let copy_with_rsb rsb dst =
try
while true do rsb (single_write dst) done
with End_of_file -> ()

let copy t ~src =
let Eio.Resource.T (src_t, ops) = src in
let module Src = (val (Eio.Resource.get ops Eio.Flow.Pi.Source)) in
let rec aux = function
| Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb (rsb src_t) t
| _ :: xs -> aux xs
| [] -> Eio.Flow.Pi.simple_copy ~single_write t ~src
in
aux Src.read_methods

let single_read t buf =
match Low_level.readv t [| buf |] with
Expand Down

0 comments on commit 19c43d7

Please sign in to comment.