Skip to content

Commit

Permalink
Add Flow.load
Browse files Browse the repository at this point in the history
  • Loading branch information
SGrondin committed Sep 30, 2023
1 parent 4347a22 commit a18d2e3
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 85 deletions.
13 changes: 9 additions & 4 deletions lib_eio/buf_read.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type t = {
mutable buf : Cstruct.buffer;
mutable pos : int;
mutable len : int;
mutable flow : Flow.source_ty r option; (* None if we've seen eof *)
mutable flow : Flow_ty.source_ty r option; (* None if we've seen eof *)
mutable consumed : int; (* Total bytes consumed so far *)
max_size : int;
}
Expand Down Expand Up @@ -47,7 +47,7 @@ open Syntax
let capacity t = Bigarray.Array1.dim t.buf

let of_flow ?initial_size ~max_size flow =
let flow = (flow :> Flow.source_ty r) in
let flow = (flow :> Flow_ty.source_ty r) in
if max_size <= 0 then Fmt.invalid_arg "Max size %d should be positive!" max_size;
let initial_size = Option.value initial_size ~default:(min 4096 max_size) in
let buf = Bigarray.(Array1.create char c_layout initial_size) in
Expand Down Expand Up @@ -116,10 +116,15 @@ let ensure_slow_path t n =
)
in
try
let single_read (Resource.T (t, ops)) buf =
let open Flow_ty.Pi in
let module X = (val (Resource.get ops Source)) in
X.single_read t buf
in
while t.len < n do
let free_space = Cstruct.of_bigarray t.buf ~off:(t.pos + t.len) in
assert (t.len + Cstruct.length free_space >= n);
let got = Flow.single_read flow free_space in
let got = single_read flow free_space in
t.len <- t.len + got
done;
assert (buffered_bytes t >= n)
Expand All @@ -144,7 +149,7 @@ module F = struct
end

let as_flow =
let ops = Flow.Pi.source (module F) in
let ops = Flow_ty.Pi.source (module F) in
fun t -> Resource.T (t, ops)

let get t i =
Expand Down
8 changes: 4 additions & 4 deletions lib_eio/buf_read.mli
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type 'a parser = t -> 'a
@raise End_of_file The flow ended without enough data to parse an ['a].
@raise Buffer_limit_exceeded Parsing the value would exceed the configured size limit. *)

val parse : ?initial_size:int -> max_size:int -> 'a parser -> _ Flow.source -> ('a, [> `Msg of string]) result
val parse : ?initial_size:int -> max_size:int -> 'a parser -> _ Flow_ty.source -> ('a, [> `Msg of string]) result
(** [parse p flow ~max_size] uses [p] to parse everything in [flow].
It is a convenience function that does
Expand All @@ -34,7 +34,7 @@ val parse : ?initial_size:int -> max_size:int -> 'a parser -> _ Flow.source -> (
@param initial_size see {!of_flow}. *)

val parse_exn : ?initial_size:int -> max_size:int -> 'a parser -> _ Flow.source -> 'a
val parse_exn : ?initial_size:int -> max_size:int -> 'a parser -> _ Flow_ty.source -> 'a
(** [parse_exn] wraps {!parse}, but raises [Failure msg] if that returns [Error (`Msg msg)].
Catching exceptions with [parse] and then raising them might seem pointless,
Expand All @@ -48,7 +48,7 @@ val parse_string : 'a parser -> string -> ('a, [> `Msg of string]) result
val parse_string_exn : 'a parser -> string -> 'a
(** [parse_string_exn] is like {!parse_string}, but handles errors like {!parse_exn}. *)

val of_flow : ?initial_size:int -> max_size:int -> _ Flow.source -> t
val of_flow : ?initial_size:int -> max_size:int -> _ Flow_ty.source -> t
(** [of_flow ~max_size flow] is a buffered reader backed by [flow].
@param initial_size The initial amount of memory to allocate for the buffer.
Expand All @@ -70,7 +70,7 @@ val of_buffer : Cstruct.buffer -> t
val of_string : string -> t
(** [of_string s] is a reader that reads from [s]. *)

val as_flow : t -> Flow.source_ty r
val as_flow : t -> Flow_ty.source_ty r
(** [as_flow t] is a buffered flow.
Reading from it will return data from the buffer,
Expand Down
80 changes: 4 additions & 76 deletions lib_eio/flow.ml
Original file line number Diff line number Diff line change
@@ -1,81 +1,6 @@
open Std

type shutdown_command = [ `Receive | `Send | `All ]

type 't read_method = ..
type 't read_method += Read_source_buffer of ('t -> (Cstruct.t list -> int) -> unit)

type source_ty = [`R | `Flow]
type 'a source = ([> source_ty] as 'a) r

type sink_ty = [`W | `Flow]
type 'a sink = ([> sink_ty] as 'a) r

type shutdown_ty = [`Shutdown]
type 'a shutdown = ([> shutdown_ty] as 'a) r

module Pi = struct
module type SOURCE = sig
type t
val read_methods : t read_method list
val single_read : t -> Cstruct.t -> int
end

module type SINK = sig
type t
val single_write : t -> Cstruct.t list -> int
val copy : t -> src:_ source -> unit
end

module type SHUTDOWN = sig
type t
val shutdown : t -> shutdown_command -> unit
end

type (_, _, _) Resource.pi +=
| Source : ('t, (module SOURCE with type t = 't), [> source_ty]) Resource.pi
| Sink : ('t, (module SINK with type t = 't), [> sink_ty]) Resource.pi
| Shutdown : ('t, (module SHUTDOWN with type t = 't), [> shutdown_ty]) Resource.pi

let source (type t) (module X : SOURCE with type t = t) =
Resource.handler [H (Source, (module X))]

let sink (type t) (module X : SINK with type t = t) =
Resource.handler [H (Sink, (module X))]

let shutdown (type t) (module X : SHUTDOWN with type t = t) =
Resource.handler [ H (Shutdown, (module X))]

module type TWO_WAY = sig
include SHUTDOWN
include SOURCE with type t := t
include SINK with type t := t
end

let two_way (type t) (module X : TWO_WAY with type t = t) =
Resource.handler [
H (Shutdown, (module X));
H (Source, (module X));
H (Sink, (module X));
]

let simple_copy ~single_write t ~src:(Resource.T (src, src_ops)) =
let rec write_all buf =
if not (Cstruct.is_empty buf) then (
let sent = single_write t [buf] in
write_all (Cstruct.shift buf sent)
)
in
let module Src = (val (Resource.get src_ops Source)) in
try
let buf = Cstruct.create 4096 in
while true do
let got = Src.single_read src buf in
write_all (Cstruct.sub buf 0 got)
done
with End_of_file -> ()
end

include Flow_ty
open Pi

let close = Resource.close
Expand Down Expand Up @@ -180,6 +105,9 @@ let buffer_sink =
let ops = Pi.sink (module Buffer_sink) in
fun b -> Resource.T (b, ops)

let load flow =
Buf_read.(parse_exn take_all) flow ~max_size:max_int

type two_way_ty = [source_ty | sink_ty | shutdown_ty]
type 'a two_way = ([> two_way_ty] as 'a) r

Expand Down
3 changes: 3 additions & 0 deletions lib_eio/flow.mli
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ val read_exact : _ source -> Cstruct.t -> unit
val string_source : string -> source_ty r
(** [string_source s] is a source that gives the bytes of [s]. *)

val load : _ source -> string
(** [load src] is a convenience wrapper to read an entire flow efficiently. *)

val cstruct_source : Cstruct.t list -> source_ty r
(** [cstruct_source cs] is a source that gives the bytes of [cs]. *)

Expand Down
77 changes: 77 additions & 0 deletions lib_eio/flow_ty.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
open Std

type shutdown_command = [ `Receive | `Send | `All ]

type 't read_method = ..
type 't read_method += Read_source_buffer of ('t -> (Cstruct.t list -> int) -> unit)

type source_ty = [`R | `Flow]
type 'a source = ([> source_ty] as 'a) r

type sink_ty = [`W | `Flow]
type 'a sink = ([> sink_ty] as 'a) r

type shutdown_ty = [`Shutdown]
type 'a shutdown = ([> shutdown_ty] as 'a) r

module Pi = struct
module type SOURCE = sig
type t
val read_methods : t read_method list
val single_read : t -> Cstruct.t -> int
end

module type SINK = sig
type t
val single_write : t -> Cstruct.t list -> int
val copy : t -> src:_ source -> unit
end

module type SHUTDOWN = sig
type t
val shutdown : t -> shutdown_command -> unit
end

type (_, _, _) Resource.pi +=
| Source : ('t, (module SOURCE with type t = 't), [> source_ty]) Resource.pi
| Sink : ('t, (module SINK with type t = 't), [> sink_ty]) Resource.pi
| Shutdown : ('t, (module SHUTDOWN with type t = 't), [> shutdown_ty]) Resource.pi

let source (type t) (module X : SOURCE with type t = t) =
Resource.handler [H (Source, (module X))]

let sink (type t) (module X : SINK with type t = t) =
Resource.handler [H (Sink, (module X))]

let shutdown (type t) (module X : SHUTDOWN with type t = t) =
Resource.handler [ H (Shutdown, (module X))]

module type TWO_WAY = sig
include SHUTDOWN
include SOURCE with type t := t
include SINK with type t := t
end

let two_way (type t) (module X : TWO_WAY with type t = t) =
Resource.handler [
H (Shutdown, (module X));
H (Source, (module X));
H (Sink, (module X));
]

let simple_copy ~single_write t ~src:(Resource.T (src, src_ops)) =
let rec write_all buf =
if not (Cstruct.is_empty buf) then (
let sent = single_write t [buf] in
write_all (Cstruct.shift buf sent)
)
in
let module Src = (val (Resource.get src_ops Source)) in
try
let buf = Cstruct.create 4096 in
while true do
let got = Src.single_read src buf in
write_all (Cstruct.sub buf 0 got)
done
with End_of_file -> ()
end
3 changes: 2 additions & 1 deletion tests/buf_reader.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ Exception: End_of_file.

```ocaml
# let bflow = R.of_flow mock_flow ~max_size:100 |> R.as_flow;;
val bflow : Eio.Flow.source_ty Eio.Std.r = Eio__.Resource.T (<poly>, <abstr>)
val bflow : Eio__Flow_ty.source_ty Eio.Std.r =
Eio__.Resource.T (<poly>, <abstr>)
# next := ["foo"; "bar"]; read bflow 2;;
+mock_flow returning 3 bytes
+Read "fo"
Expand Down
16 changes: 16 additions & 0 deletions tests/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ let mock_source =
Exception: End_of_file.
```

## load

```ocaml
# run @@ fun () ->
let each = String.init 256 Char.chr in
let data = List.init 40 (fun _ -> Cstruct.of_string each) in
let got = Eio.Flow.load (mock_source data) in
traceln "Input length: %d\nOutput length: %d\nEqual: %b"
(Cstruct.lenv data) (String.length got) (String.equal got (Cstruct.copyv data));
;;
+Input length: 10240
+Output length: 10240
+Equal: true
- : unit = ()
```

## copy

```ocaml
Expand Down

0 comments on commit a18d2e3

Please sign in to comment.