Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Response.sendfile and Body.sendfile #124

Merged
merged 5 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/carl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ let handle_response ~cli ({ Response.body; _ } as response) =
print_string ~cli formatter s
| _ ->
let open Lwt.Syntax in
let stream, or_error = Body.to_stream body in
let* stream, or_error = Body.to_stream body in
let total_len = Body.length body in
Lwt.catch
(fun () ->
Expand Down
166 changes: 125 additions & 41 deletions lib/body.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(*----------------------------------------------------------------------------
* Copyright (c) 2019-2020, António Nuno Monteiro
* Copyright (c) 2019-2022, António Nuno Monteiro
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -74,6 +74,11 @@ type contents =
| `String of string
| `Bigstring of Bigstringaf.t IOVec.t
| `Stream of Bigstringaf.t IOVec.t Lwt_stream.t
| `Sendfile of
Lwt_unix.file_descr
* (* Waiter and notifier for when the fd closes. *)
unit Lwt.t
* unit Lwt.u
]

type t =
Expand Down Expand Up @@ -120,58 +125,126 @@ let of_bigstring ?(off = 0) ?len bstr =
let length = `Fixed (Int64.of_int len) in
create ~length (`Bigstring (IOVec.make bstr ~off ~len))

let sendfile ?length path =
let**! fd =
Lwt.catch
(fun () ->
let+ fd = Lwt_unix.openfile path [ O_RDONLY ] 0 in
Ok fd)
(fun exn -> Lwt_result.fail (`Exn exn))
in
let+ length =
match length with
| None ->
let+ { Unix.st_size; _ } = Lwt_unix.fstat fd in
`Fixed (Int64.of_int st_size)
| Some length -> Lwt.return length
in
let t, u = Lwt.wait () in
Ok (create ~length (`Sendfile (fd, t, u)))

let or_error t ~stream v =
let+ () = Lwt_stream.closed stream in
match Lwt.state t.error_received with
| Lwt.Return error -> Error error
| Lwt.Fail _ | Lwt.Sleep -> Ok v

let to_stream ({ contents; _ } as t) =
let ensure_closed fd =
match Lwt_unix.state fd with
| Opened -> Lwt_unix.close fd
| Closed | Aborted _ -> Lwt.return_unit

(* TODO: accept buffer for I/O, so that caller can pool buffers? *)
let stream_of_fd ?on_close fd =
let+ { Unix.st_size = length; _ } = Lwt_unix.fstat fd in
let remaining = ref length in
let stream =
Lwt_stream.from (fun () ->
if !remaining = 0
then Lwt.return_none
else
let bytes_to_read = min 0x4000 !remaining in
(* TODO: read from config buffer size? *)
(* (min config.Config.body_buffer_size !remaining) *)
let buf = Bigstringaf.create bytes_to_read in
let* bytes_read = Lwt_bytes.read fd buf 0 bytes_to_read in
remaining := !remaining - bytes_read;
Lwt.return_some (IOVec.make buf ~off:0 ~len:bytes_read))
in
Lwt.on_success (Lwt_stream.closed stream) (fun () ->
Lwt.dont_wait
(fun () ->
let+ () = ensure_closed fd in
Option.iter (fun f -> f ()) on_close)
(fun _exn -> Option.iter (fun f -> f ()) on_close));
stream

let to_stream ({ contents; _ } as t) =
let+ stream =
match contents with
| `Empty _ -> Lwt_stream.of_list []
| `Empty _ -> Lwt.return (Lwt_stream.of_list [])
| `String s ->
let len = String.length s in
Lwt_stream.of_list
[ IOVec.make (Bigstringaf.of_string ~off:0 ~len s) ~off:0 ~len ]
| `Bigstring iovec -> Lwt_stream.of_list [ iovec ]
| `Stream stream -> stream
Lwt.return
(Lwt_stream.of_list
[ IOVec.make (Bigstringaf.of_string ~off:0 ~len s) ~off:0 ~len ])
| `Bigstring iovec -> Lwt.return (Lwt_stream.of_list [ iovec ])
| `Stream stream -> Lwt.return stream
| `Sendfile (fd, _, u) ->
stream_of_fd ~on_close:(fun () -> Lwt.wakeup_later u ()) fd
in
stream, or_error ~stream t ()

let to_string ({ contents; length; _ } as t) =
let stream_to_string { length; _ } stream =
let len =
match length with
| `Fixed n -> Int64.to_int n
| _ ->
(* TODO: use some config? *)
0x100
in
let result_buffer = Buffer.create len in
let+ () =
Lwt_stream.iter
(fun { IOVec.buffer; off; len } ->
let bytes = Bytes.create len in
Bigstringaf.blit_to_bytes buffer ~src_off:off ~dst_off:0 ~len bytes;
Buffer.add_bytes result_buffer bytes)
stream
in
Buffer.contents result_buffer

let to_string ({ contents; _ } as t) =
match contents with
| `Empty _ -> Lwt_result.return ""
| `String s -> Lwt_result.return s
| `Bigstring { IOVec.buffer; off; len } ->
Lwt_result.return (Bigstringaf.substring ~off ~len buffer)
| `Stream stream ->
let len =
match length with
| `Fixed n -> Int64.to_int n
| _ ->
(* TODO: use some config? *)
0x100
in
let result_buffer = Buffer.create len in
let* () =
Lwt_stream.iter
(fun { IOVec.buffer; off; len } ->
let bytes = Bytes.create len in
Bigstringaf.blit_to_bytes buffer ~src_off:off ~dst_off:0 ~len bytes;
Buffer.add_bytes result_buffer bytes)
stream
in
or_error t ~stream (Buffer.contents result_buffer)
let* str = stream_to_string t stream in
or_error t ~stream str
| `Sendfile (fd, _, u) ->
let* stream = stream_of_fd ~on_close:(fun () -> Lwt.wakeup_later u ()) fd in
let* str = stream_to_string t stream in
or_error t ~stream str

let to_string_stream ({ contents; _ } as t) =
let stream =
let+ stream =
match contents with
| `Empty _ -> Lwt_stream.of_list []
| `String s -> Lwt_stream.of_list [ s ]
| `Empty _ -> Lwt.return (Lwt_stream.of_list [])
| `String s -> Lwt.return (Lwt_stream.of_list [ s ])
| `Bigstring { IOVec.buffer; off; len } ->
Lwt_stream.of_list [ Bigstringaf.substring ~off ~len buffer ]
Lwt.return (Lwt_stream.of_list [ Bigstringaf.substring ~off ~len buffer ])
| `Stream stream ->
Lwt.return
(Lwt_stream.map
(fun { IOVec.buffer; off; len } ->
Bigstringaf.substring buffer ~off ~len)
stream)
| `Sendfile (fd, _, u) ->
let+ stream =
stream_of_fd ~on_close:(fun () -> Lwt.wakeup_later u ()) fd
in
Lwt_stream.map
(fun { IOVec.buffer; off; len } ->
Bigstringaf.substring buffer ~off ~len)
Expand All @@ -185,21 +258,32 @@ let drain ({ contents; _ } as t) =
| `Stream stream ->
let* () = Lwt_stream.junk_while (fun _ -> true) stream in
or_error t ~stream ()
| `Sendfile (fd, _, _) ->
let+ () = ensure_closed fd in
Ok ()

let drain_available { contents; _ } =
match contents with
| `Empty _ | `String _ | `Bigstring _ -> Lwt.return_unit
| `Stream stream -> Lwt_stream.junk_old stream
| `Sendfile (fd, _, _) -> ensure_closed fd

let is_closed t =
match t.contents with
| `Empty _ | `String _ | `Bigstring _ -> true
| `Stream stream -> Lwt_stream.is_closed stream
| `Sendfile (fd, _, _) ->
(match Lwt_unix.state fd with
| Opened -> false
| Closed | Aborted _ -> true)

let closed t =
match t.contents with
| `Empty _ | `String _ | `Bigstring _ -> Lwt_result.return ()
| `Stream stream -> or_error t ~stream ()
| `Sendfile (_fd, t, _u) ->
let+ () = t in
Ok ()

let when_closed t f = Lwt.on_success (closed t) f

Expand Down Expand Up @@ -328,61 +412,61 @@ let stream_write_body

(* Traversal *)
let fold f t init =
let stream, _ = to_stream t in
let* stream, _ = to_stream t in
let* ret = Lwt_stream.fold f stream init in
or_error t ~stream ret

let fold_string f t init =
let stream, _ = to_string_stream t in
let* stream, _ = to_string_stream t in
let* ret = Lwt_stream.fold f stream init in
or_error t ~stream ret

let fold_s f t init =
let stream, _ = to_stream t in
let* stream, _ = to_stream t in
let* ret = Lwt_stream.fold_s f stream init in
or_error t ~stream ret

let fold_string_s f t init =
let stream, _ = to_string_stream t in
let* stream, _ = to_string_stream t in
let* ret = Lwt_stream.fold_s f stream init in
or_error t ~stream ret

let iter f t =
let stream, or_error = to_stream t in
let* stream, or_error = to_stream t in
let* () = Lwt_stream.iter f stream in
or_error

let iter_string f t =
let stream, or_error = to_string_stream t in
let* stream, or_error = to_string_stream t in
let* () = Lwt_stream.iter f stream in
or_error

let iter_p f t =
let stream, or_error = to_stream t in
let* stream, or_error = to_stream t in
let* () = Lwt_stream.iter_p f stream in
or_error

let iter_string_p f t =
let stream, or_error = to_string_stream t in
let* stream, or_error = to_string_stream t in
let* () = Lwt_stream.iter_p f stream in
or_error

let iter_s f t =
let stream, or_error = to_stream t in
let* stream, or_error = to_stream t in
let* () = Lwt_stream.iter_s f stream in
or_error

let iter_string_s f t =
let stream, or_error = to_string_stream t in
let* stream, or_error = to_string_stream t in
let* () = Lwt_stream.iter_s f stream in
or_error

let iter_n ?max_concurrency f t =
let stream, or_error = to_stream t in
let* stream, or_error = to_stream t in
let* () = Lwt_stream.iter_n ?max_concurrency f stream in
or_error

let iter_string_n ?max_concurrency f t =
let stream, or_error = to_string_stream t in
let* stream, or_error = to_string_stream t in
let* () = Lwt_stream.iter_n ?max_concurrency f stream in
or_error
7 changes: 2 additions & 5 deletions lib/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,8 @@ module Oneshot = struct
let* response = response_result in
match response with
| Ok { Response.body; _ } ->
(match body.contents with
| `Empty _ | `String _ | `Bigstring _ -> shutdown t
| `Stream stream ->
let* () = Lwt_stream.closed stream in
shutdown t)
let* _body_closed = Body.closed body in
shutdown t
| Error _ -> Lwt.return_unit);
response_result

Expand Down
3 changes: 2 additions & 1 deletion lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
magic-mime
ssl
uri
ipaddr))
ipaddr
sendfile))
4 changes: 2 additions & 2 deletions lib/form.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ module Multipart = struct
(* TODO(anmonteiro): validate max content-length from a config, etc. *)
match Headers.get_exn request.headers "content-type" with
| content_type when is_valid_content_type content_type ->
let stream, _or_error = Body.to_stream request.body in
let* stream, _or_error = Body.to_stream request.body in
let kvs, push_to_kvs = Lwt_stream.create () in
let emit name stream = push_to_kvs (Some (name, stream)) in
let++! multipart =
Expand Down Expand Up @@ -107,7 +107,7 @@ module Multipart = struct
let* result =
Lwt_stream.fold (fun t acc -> (t.name, t) :: acc) field_stream []
in
let _stream, or_error = Body.to_stream request.body in
let* _stream, or_error = Body.to_stream request.body in
let+ or_error_result = or_error in
Result.map (fun () -> result) or_error_result
end
33 changes: 27 additions & 6 deletions lib/http_impl.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(*----------------------------------------------------------------------------
* Copyright (c) 2019-2020, António Nuno Monteiro
* Copyright (c) 2019-2022, António Nuno Monteiro
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -38,8 +38,8 @@ let src = Logs.Src.create "piaf.http" ~doc:"Piaf HTTP module"

module Log = (val Logs.src_log src : Logs.LOG)

let make_error_handler notify_response_received ~kind:_ error =
Lwt.wakeup notify_response_received error
let make_error_handler notify_error ~kind:_ error =
Lwt.wakeup notify_error error

let lwterr e = Lwt.map (fun e -> Error e) e

Expand Down Expand Up @@ -116,7 +116,7 @@ let send_request
=
fun conn ~config ~body request ->
let (Connection.Conn
{ impl = (module Http); handle; connection_error_received; _ })
{ impl = (module Http); handle; connection_error_received; runtime; _ })
=
conn
in
Expand All @@ -128,10 +128,15 @@ let send_request
let error_handler = make_error_handler notify_error in
Log.info (fun m ->
m "@[<v 0>Sending request:@]@]@;<0 2>@[<v 0>%a@]@." Request.pp_hum request);
let flush_headers_immediately =
match body.contents with
| `Sendfile _ -> true
| _ -> config.flush_headers_immediately
in
let request_body =
Http.Client.request
handle
~flush_headers_immediately:config.flush_headers_immediately
~flush_headers_immediately
~error_handler
~response_handler
request
Expand All @@ -148,7 +153,23 @@ let send_request
| `Stream stream ->
Lwt.on_success (Lwt_stream.closed stream) (fun () ->
flush_and_close (module Http.Body) request_body);
Lwt.wrap3 Body.stream_write_body (module Http.Body) request_body stream);
Lwt.wrap3 Body.stream_write_body (module Http.Body) request_body stream
| `Sendfile (src_fd, _, _) ->
(match runtime with
| HTTP runtime ->
Bodyw.close request_body;
let on_exn exn = Lwt.wakeup notify_error (`Exn exn) in
Posix.sendfile
(module Http.Body)
~on_exn
~src_fd
~dst_fd:(Gluten_lwt_unix.Client.socket runtime)
request_body;
Lwt.return_unit
| HTTPS _ ->
(* can't `sendfile` on an encrypted connection.
* TODO(anmonteiro): Return error message saying that. *)
assert false));
handle_response response_received error_received connection_error_received

let can't_upgrade msg =
Expand Down
Loading