From bc1743fd544caae5f7e333c892a93f2b5b44ccdf Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Fri, 4 Nov 2022 17:41:00 +0100 Subject: [PATCH] fix(ext/http): flush chunk when streaming resource When streaming a resource in ext/http, with compression enabled, we didn't flush individual chunks. This became very problematic when we enabled `req.body` from `fetch` for FastStream recently. This commit now correctly flushes each resource chunk after compression. --- cli/tests/unit/http_test.ts | 77 +++++++++++++++++++++++++++++++++++++ ext/http/lib.rs | 6 ++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index e7c99352fc88ae..1d2addb2dce2e2 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -2515,6 +2515,83 @@ Deno.test( }, ); +Deno.test({ + name: "http server compresses and flushes each chunk of a streamed resource", + permissions: { net: true, run: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + const port2 = 4502; + + const encoder = new TextEncoder(); + const listener = Deno.listen({ hostname, port }); + const listener2 = Deno.listen({ hostname, port: port2 }); + + let httpConn: Deno.HttpConn; + async function server() { + const tcpConn = await listener.accept(); + httpConn = Deno.serveHttp(tcpConn); + const e = await httpConn.nextRequest(); + assert(e); + const { request, respondWith } = e; + assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); + const resp = await fetch(`http://${hostname}:${port2}/`); + await respondWith(resp); + listener.close(); + } + + const ts = new TransformStream(); + const writer = ts.writable.getWriter(); + writer.write(encoder.encode("hello")); + + let httpConn2: Deno.HttpConn; + async function server2() { + const tcpConn = await listener2.accept(); + httpConn2 = Deno.serveHttp(tcpConn); + const e = await httpConn2.nextRequest(); + assert(e); + await e.respondWith( + new Response(ts.readable, { + headers: { "Content-Type": "text/plain" }, + }), + ); + listener2.close(); + } + + async function client() { + const url = `http://${hostname}:${port}/`; + const args = [ + "--request", + "GET", + "--url", + url, + "--header", + "Accept-Encoding: gzip, deflate, br", + "--no-buffer", + ]; + const proc = Deno.spawnChild("curl", { args, stderr: "null" }); + const stdout = proc.stdout + .pipeThrough(new DecompressionStream("gzip")) + .pipeThrough(new TextDecoderStream()); + let body = ""; + for await (const chunk of stdout) { + body += chunk; + if (body === "hello") { + writer.write(encoder.encode(" world")); + writer.close(); + } + } + assertEquals(body, "hello world"); + const status = await proc.status; + assert(status.success); + } + + await Promise.all([server(), server2(), client()]); + httpConn!.close(); + httpConn2!.close(); + }, +}); + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r); diff --git a/ext/http/lib.rs b/ext/http/lib.rs index e71d9fae3bead5..af117d3f92fa2e 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -769,7 +769,11 @@ async fn op_http_write_resource( match &mut *wr { HttpResponseWriter::Body(body) => { - if let Err(err) = body.write_all(&view).await { + let mut result = body.write_all(&view).await; + if result.is_ok() { + result = body.flush().await; + } + if let Err(err) = result { assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); // Don't return "broken pipe", that's an implementation detail. // Pull up the failure associated with the transport connection instead.