From 849683a3e78efe877e41937fc4de89de4c1e5f81 Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Mon, 3 Dec 2018 16:17:43 -0800 Subject: [PATCH 1/7] Unblock multiple HTTP requests --- http.ts | 76 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 28 deletions(-) diff --git a/http.ts b/http.ts index 4a4f0ccd9540..e7e986a3926d 100644 --- a/http.ts +++ b/http.ts @@ -6,35 +6,46 @@ import { assert } from "./util"; export async function* serve(addr: string) { const listener = listen("tcp", addr); - while (true) { - const c = await listener.accept(); - yield* serveConn(c); - } - listener.close(); -} + // For racing promises + const pool: Promise[] = []; + const untrackFromPool = (p: Promise) => { + const requestIndex = pool.indexOf(p); + if (requestIndex >= 0) { + pool.splice(requestIndex, 1); + } + }; + // Push conn promise also to pool to avoid starving + let trackedConnPromise = listener.accept().then(conn => [ + conn, + untrackFromPool.bind(null, trackedConnPromise), + ]); + pool.push(trackedConnPromise); -export async function* serveConn(c: Conn) { - let bufr = new BufReader(c); - let bufw = new BufWriter(c); - try { - while (true) { - const [req, err] = await readRequest(bufr); - if (err == "EOF") { - break; - } - if (err == "ShortWrite") { - console.log("ShortWrite error"); - break; - } - if (err) { - throw err; - } - req.w = bufw; + while (true) { + const [reqOrConn, untrack] = await Promise.race(pool); + untrack(); // remove from pool + if (!Array.isArray(reqOrConn)) { + // result is not array, means from connPromise + // Push readRequest promise for conn + const trackedReqPromise = readRequest(reqOrConn) + .then((maybeReq) => [ + maybeReq, + untrackFromPool.bind(null, trackedReqPromise) + ]); + pool.push(trackedReqPromise); + // Prepared to accept another connection (avoid starving) + trackedConnPromise = listener.accept().then(conn => [ + conn, + untrackFromPool.bind(null, trackedConnPromise), + ]); + pool.push(trackedConnPromise); + } else { + // TODO: handle _err + const [req, _err] = reqOrConn as [ServerRequest, BufState]; yield req; } - } finally { - c.close(); } + listener.close(); } interface Response { @@ -60,6 +71,7 @@ class ServerRequest { proto: string; headers: Headers; w: BufWriter; + _conn: Conn; async respond(r: Response): Promise { const protoMajor = 1; @@ -75,7 +87,7 @@ class ServerRequest { setContentLength(r); if (r.headers) { - for (let [key, value] of r.headers) { + for (const [key, value] of r.headers) { out += `${key}: ${value}\r\n`; } } @@ -90,12 +102,18 @@ class ServerRequest { } await this.w.flush(); + // TODO: handle keep alive + this._conn.close(); } } -async function readRequest(b: BufReader): Promise<[ServerRequest, BufState]> { - const tp = new TextProtoReader(b); +async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> { + const bufr = new BufReader(c); + const bufw = new BufWriter(c); const req = new ServerRequest(); + req.w = bufw; + req._conn = c; + const tp = new TextProtoReader(bufr); let s: string; let err: BufState; @@ -109,5 +127,7 @@ async function readRequest(b: BufReader): Promise<[ServerRequest, BufState]> { [req.headers, err] = await tp.readMIMEHeader(); + // TODO: handle body + return [req, err]; } From 020880e8411f8ffbc69c163a8b572018a38d9600 Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Mon, 3 Dec 2018 20:33:33 -0800 Subject: [PATCH 2/7] Alternative implementation w/o Promise.race --- http.ts | 89 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 54 insertions(+), 35 deletions(-) diff --git a/http.ts b/http.ts index e7e986a3926d..3600ccdc33a3 100644 --- a/http.ts +++ b/http.ts @@ -4,45 +4,64 @@ import { TextProtoReader } from "./textproto.ts"; import { STATUS_TEXT } from "./http_status"; import { assert } from "./util"; +interface Deferred { + promise: Promise<{}>; + resolve: () => void; + reject: () => void; +} + +function deferred(): Deferred { + let resolve, reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { + promise, resolve, reject, + }; +} + export async function* serve(addr: string) { const listener = listen("tcp", addr); - // For racing promises - const pool: Promise[] = []; - const untrackFromPool = (p: Promise) => { - const requestIndex = pool.indexOf(p); - if (requestIndex >= 0) { - pool.splice(requestIndex, 1); - } - }; - // Push conn promise also to pool to avoid starving - let trackedConnPromise = listener.accept().then(conn => [ - conn, - untrackFromPool.bind(null, trackedConnPromise), - ]); - pool.push(trackedConnPromise); + let nextTaskId = 0; + const resultMap: Map = new Map(); + let serveDeferred = deferred(); + let queue: number[] = []; // in case multiple promises are ready + + const scheduleAccept = () => { + listener.accept().then((conn) => { + queue.push(nextTaskId); + resultMap.set(nextTaskId, conn); + scheduleAccept(); // self schedule next accept + serveDeferred.resolve(); // hint loop to run + }) + nextTaskId++; + } + + scheduleAccept(); // start accept while (true) { - const [reqOrConn, untrack] = await Promise.race(pool); - untrack(); // remove from pool - if (!Array.isArray(reqOrConn)) { - // result is not array, means from connPromise - // Push readRequest promise for conn - const trackedReqPromise = readRequest(reqOrConn) - .then((maybeReq) => [ - maybeReq, - untrackFromPool.bind(null, trackedReqPromise) - ]); - pool.push(trackedReqPromise); - // Prepared to accept another connection (avoid starving) - trackedConnPromise = listener.accept().then(conn => [ - conn, - untrackFromPool.bind(null, trackedConnPromise), - ]); - pool.push(trackedConnPromise); - } else { - // TODO: handle _err - const [req, _err] = reqOrConn as [ServerRequest, BufState]; - yield req; + await serveDeferred.promise; + serveDeferred = deferred(); // use a new deferred + let queueToProcess = queue; + queue = []; + for (const resultId of queueToProcess) { + const result = resultMap.get(resultId); + resultMap.delete(resultId); + if (Array.isArray(result)) { + const [req, _err] = result as [ServerRequest, BufState]; + if (!_err) { + yield req; + } + } else { + const conn = result as Conn; + readRequest(conn).then((maybeReq) => { + queue.push(nextTaskId); + resultMap.set(nextTaskId, maybeReq); + serveDeferred.resolve(); + }); + nextTaskId++; + } } } listener.close(); From 4457b045c040ac91eed0809cf0de570d2c2cd5f3 Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Tue, 4 Dec 2018 08:12:03 -0800 Subject: [PATCH 3/7] Remove unneeded ids --- http.ts | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/http.ts b/http.ts index 3600ccdc33a3..fe6e8684660c 100644 --- a/http.ts +++ b/http.ts @@ -23,19 +23,15 @@ function deferred(): Deferred { export async function* serve(addr: string) { const listener = listen("tcp", addr); - let nextTaskId = 0; - const resultMap: Map = new Map(); let serveDeferred = deferred(); - let queue: number[] = []; // in case multiple promises are ready + let queue: any[] = []; // in case multiple promises are ready const scheduleAccept = () => { listener.accept().then((conn) => { - queue.push(nextTaskId); - resultMap.set(nextTaskId, conn); + queue.push(conn); scheduleAccept(); // self schedule next accept serveDeferred.resolve(); // hint loop to run - }) - nextTaskId++; + }); } scheduleAccept(); // start accept @@ -45,22 +41,18 @@ export async function* serve(addr: string) { serveDeferred = deferred(); // use a new deferred let queueToProcess = queue; queue = []; - for (const resultId of queueToProcess) { - const result = resultMap.get(resultId); - resultMap.delete(resultId); + for (const result of queueToProcess) { if (Array.isArray(result)) { const [req, _err] = result as [ServerRequest, BufState]; - if (!_err) { + if (!_err) { // TODO: check _err yield req; } } else { const conn = result as Conn; readRequest(conn).then((maybeReq) => { - queue.push(nextTaskId); - resultMap.set(nextTaskId, maybeReq); + queue.push(maybeReq); serveDeferred.resolve(); }); - nextTaskId++; } } } From 158ae0aedea9b5b071aa017f52ad190e764cde1b Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Tue, 4 Dec 2018 19:10:37 -0800 Subject: [PATCH 4/7] Don't recreate functions --- http.ts | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/http.ts b/http.ts index fe6e8684660c..d2170db8515c 100644 --- a/http.ts +++ b/http.ts @@ -26,15 +26,25 @@ export async function* serve(addr: string) { let serveDeferred = deferred(); let queue: any[] = []; // in case multiple promises are ready - const scheduleAccept = () => { - listener.accept().then((conn) => { + const handleMaybeReq = (maybeReq: any) => { + queue.push(maybeReq); + serveDeferred.resolve(); + } + + // routine that keeps calling accept + const acceptRoutine = () => { + const handleConn = (conn: Conn) => { queue.push(conn); - scheduleAccept(); // self schedule next accept + scheduleAccept(); // schedule next accept serveDeferred.resolve(); // hint loop to run - }); + } + const scheduleAccept = () => { + listener.accept().then(handleConn); + } + scheduleAccept(); } - scheduleAccept(); // start accept + acceptRoutine(); while (true) { await serveDeferred.promise; @@ -49,10 +59,7 @@ export async function* serve(addr: string) { } } else { const conn = result as Conn; - readRequest(conn).then((maybeReq) => { - queue.push(maybeReq); - serveDeferred.resolve(); - }); + readRequest(conn).then(handleMaybeReq); } } } From eeb06ce7d51837580420f5af6d56437fc2df05fa Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Wed, 5 Dec 2018 12:16:23 -0800 Subject: [PATCH 5/7] Don't close socket --- http.ts | 45 ++++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/http.ts b/http.ts index d2170db8515c..f5c8866fa15b 100644 --- a/http.ts +++ b/http.ts @@ -24,19 +24,29 @@ function deferred(): Deferred { export async function* serve(addr: string) { const listener = listen("tcp", addr); let serveDeferred = deferred(); - let queue: any[] = []; // in case multiple promises are ready - - const handleMaybeReq = (maybeReq: any) => { - queue.push(maybeReq); - serveDeferred.resolve(); + let reqQueue: ServerRequest[] = []; // in case multiple promises are ready + + // Continuously read more requests from conn until EOF + // Mutually calling with handleReq + const readRequestsFromConn = async (conn: Conn) => { + const [req, _err] = await readRequest(conn); + if (_err) { + conn.close(); // assume EOF, for now + return; + } + handleReq(conn, req); + } + const handleReq = (conn: Conn, req: ServerRequest) => { + reqQueue.push(req); // push req to queue + readRequestsFromConn(conn); // try read more (reusing connection) + serveDeferred.resolve(); // signal while loop to process it } - // routine that keeps calling accept + // Routine that keeps calling accept const acceptRoutine = () => { const handleConn = (conn: Conn) => { - queue.push(conn); + readRequestsFromConn(conn); // don't block scheduleAccept(); // schedule next accept - serveDeferred.resolve(); // hint loop to run } const scheduleAccept = () => { listener.accept().then(handleConn); @@ -46,21 +56,14 @@ export async function* serve(addr: string) { acceptRoutine(); + // Loop hack to allow yield (yield won't work in callbacks) while (true) { await serveDeferred.promise; serveDeferred = deferred(); // use a new deferred - let queueToProcess = queue; - queue = []; + let queueToProcess = reqQueue; + reqQueue = []; for (const result of queueToProcess) { - if (Array.isArray(result)) { - const [req, _err] = result as [ServerRequest, BufState]; - if (!_err) { // TODO: check _err - yield req; - } - } else { - const conn = result as Conn; - readRequest(conn).then(handleMaybeReq); - } + yield result; } } listener.close(); @@ -89,7 +92,6 @@ class ServerRequest { proto: string; headers: Headers; w: BufWriter; - _conn: Conn; async respond(r: Response): Promise { const protoMajor = 1; @@ -120,8 +122,6 @@ class ServerRequest { } await this.w.flush(); - // TODO: handle keep alive - this._conn.close(); } } @@ -130,7 +130,6 @@ async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> { const bufw = new BufWriter(c); const req = new ServerRequest(); req.w = bufw; - req._conn = c; const tp = new TextProtoReader(bufr); let s: string; From 70a96431364708b462bbef3d5040c549e26c1115 Mon Sep 17 00:00:00 2001 From: "Kevin (Kun) \"Kassimo\" Qian" Date: Thu, 6 Dec 2018 12:31:32 -0800 Subject: [PATCH 6/7] Expose serveConn --- http.ts | 57 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/http.ts b/http.ts index f5c8866fa15b..26fd1008d9c0 100644 --- a/http.ts +++ b/http.ts @@ -21,31 +21,42 @@ function deferred(): Deferred { }; } +interface ServeEnv { + reqQueue: ServerRequest[]; + serveDeferred: Deferred; +} + +// Continuously read more requests from conn until EOF +// Mutually calling with maybeHandleReq +// TODO: make them async function after this change is done +// https://github.com/tc39/ecma262/pull/1250 +// See https://v8.dev/blog/fast-async +export function serveConn(env: ServeEnv, conn: Conn) { + readRequest(conn).then(maybeHandleReq.bind(null, env, conn)); +} +function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) { + const [req, _err] = maybeReq; + if (_err) { + conn.close(); // assume EOF for now... + return; + } + env.reqQueue.push(req); // push req to queue + env.serveDeferred.resolve(); // signal while loop to process it + // TODO: protection against client req flooding + serveConn(env, conn); // try read more (reusing connection) +} + export async function* serve(addr: string) { const listener = listen("tcp", addr); - let serveDeferred = deferred(); - let reqQueue: ServerRequest[] = []; // in case multiple promises are ready - - // Continuously read more requests from conn until EOF - // Mutually calling with handleReq - const readRequestsFromConn = async (conn: Conn) => { - const [req, _err] = await readRequest(conn); - if (_err) { - conn.close(); // assume EOF, for now - return; - } - handleReq(conn, req); - } - const handleReq = (conn: Conn, req: ServerRequest) => { - reqQueue.push(req); // push req to queue - readRequestsFromConn(conn); // try read more (reusing connection) - serveDeferred.resolve(); // signal while loop to process it - } + const env: ServeEnv = { + reqQueue: [], // in case multiple promises are ready + serveDeferred: deferred(), + }; // Routine that keeps calling accept const acceptRoutine = () => { const handleConn = (conn: Conn) => { - readRequestsFromConn(conn); // don't block + serveConn(env, conn); // don't block scheduleAccept(); // schedule next accept } const scheduleAccept = () => { @@ -58,10 +69,10 @@ export async function* serve(addr: string) { // Loop hack to allow yield (yield won't work in callbacks) while (true) { - await serveDeferred.promise; - serveDeferred = deferred(); // use a new deferred - let queueToProcess = reqQueue; - reqQueue = []; + await env.serveDeferred.promise; + env.serveDeferred = deferred(); // use a new deferred + let queueToProcess = env.reqQueue; + env.reqQueue = []; for (const result of queueToProcess) { yield result; } From 42f36c902bc56e86878cfcf98e8611b683259c00 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 6 Dec 2018 16:04:11 -0500 Subject: [PATCH 7/7] prettier --- http.ts | 12 +++++++----- http_test.ts | 3 +-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/http.ts b/http.ts index 26fd1008d9c0..6e36704fdc95 100644 --- a/http.ts +++ b/http.ts @@ -17,7 +17,9 @@ function deferred(): Deferred { reject = rej; }); return { - promise, resolve, reject, + promise, + resolve, + reject }; } @@ -50,7 +52,7 @@ export async function* serve(addr: string) { const listener = listen("tcp", addr); const env: ServeEnv = { reqQueue: [], // in case multiple promises are ready - serveDeferred: deferred(), + serveDeferred: deferred() }; // Routine that keeps calling accept @@ -58,12 +60,12 @@ export async function* serve(addr: string) { const handleConn = (conn: Conn) => { serveConn(env, conn); // don't block scheduleAccept(); // schedule next accept - } + }; const scheduleAccept = () => { listener.accept().then(handleConn); - } + }; scheduleAccept(); - } + }; acceptRoutine(); diff --git a/http_test.ts b/http_test.ts index e6cb87f01e96..b0007a89270a 100644 --- a/http_test.ts +++ b/http_test.ts @@ -5,8 +5,7 @@ const addr = "0.0.0.0:8000"; const s = serve(addr); console.log(`listening on http://${addr}/`); -const body = (new TextEncoder()).encode("Hello World\n"); - +const body = new TextEncoder().encode("Hello World\n"); async function main() { for await (const req of s) {