Skip to content

Commit

Permalink
Clean up HTTP async iterator code (denoland/std#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
ry authored May 20, 2019
1 parent 227d92e commit a295bb0
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 182 deletions.
5 changes: 2 additions & 3 deletions http/http_bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ import { serve } from "./server.ts";

const addr = Deno.args[1] || "127.0.0.1:4500";
const server = serve(addr);

const body = new TextEncoder().encode("Hello World");

async function main(): Promise<void> {
console.log(`http://${addr}/`);
for await (const request of server) {
request.respond({ status: 200, body });
for await (const req of server) {
req.respond({ body });
}
}

Expand Down
231 changes: 81 additions & 150 deletions http/server.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,14 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
const { listen, copy, toAsyncIterator } = Deno;
type Listener = Deno.Listener;
type Conn = Deno.Conn;
type Reader = Deno.Reader;
type Writer = Deno.Writer;
import { BufReader, BufState, BufWriter } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { STATUS_TEXT } from "./http_status.ts";
import { assert } from "../testing/asserts.ts";

interface Deferred {
promise: Promise<{}>;
resolve: () => void;
reject: () => void;
}

function deferred(isResolved = false): Deferred {
let resolve, reject;
const promise = new Promise(
(res, rej): void => {
resolve = res;
reject = rej;
}
);
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}

interface HttpConn extends Conn {
// When read by a newly created request B, lastId is the id pointing to a previous
// request A, such that we must wait for responses to A to complete before
// writing B's response.
lastPipelineId: number;
pendingDeferredMap: Map<number, Deferred>;
}

function createHttpConn(c: Conn): HttpConn {
const httpConn = Object.assign(c, {
lastPipelineId: 0,
pendingDeferredMap: new Map()
});

const resolvedDeferred = deferred(true);
httpConn.pendingDeferredMap.set(0, resolvedDeferred);
return httpConn;
}
import { assert, fail } from "../testing/asserts.ts";
import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts";

function bufWriter(w: Writer): BufWriter {
if (w instanceof BufWriter) {
Expand All @@ -58,6 +17,7 @@ function bufWriter(w: Writer): BufWriter {
return new BufWriter(w);
}
}

export function setContentLength(r: Response): void {
if (!r.headers) {
r.headers = new Headers();
Expand All @@ -74,6 +34,7 @@ export function setContentLength(r: Response): void {
}
}
}

async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const writer = bufWriter(w);
const encoder = new TextEncoder();
Expand All @@ -90,6 +51,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const endChunk = encoder.encode("0\r\n\r\n");
await writer.write(endChunk);
}

export async function writeResponse(w: Writer, r: Response): Promise<void> {
const protoMajor = 1;
const protoMinor = 1;
Expand Down Expand Up @@ -131,6 +93,7 @@ export async function writeResponse(w: Writer, r: Response): Promise<void> {
}
await writer.flush();
}

async function readAllIterator(
it: AsyncIterableIterator<Uint8Array>
): Promise<Uint8Array> {
Expand All @@ -154,14 +117,14 @@ async function readAllIterator(
}

export class ServerRequest {
pipelineId: number;
url: string;
method: string;
proto: string;
headers: Headers;
conn: HttpConn;
conn: Conn;
r: BufReader;
w: BufWriter;
done: Deferred<void> = deferred();

public async *bodyStream(): AsyncIterableIterator<Uint8Array> {
if (this.headers.has("content-length")) {
Expand Down Expand Up @@ -244,134 +207,102 @@ export class ServerRequest {
}

async respond(r: Response): Promise<void> {
// Check and wait if the previous request is done responding.
const lastPipelineId = this.pipelineId - 1;
const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
lastPipelineId
);
assert(!!lastPipelineDeferred);
await lastPipelineDeferred.promise;
// If yes, delete old deferred and proceed with writing.
this.conn.pendingDeferredMap.delete(lastPipelineId);
// Write our response!
await writeResponse(this.w, r);
// Signal the next pending request that it can start writing.
const currPipelineDeferred = this.conn.pendingDeferredMap.get(
this.pipelineId
);
assert(!!currPipelineDeferred);
currPipelineDeferred.resolve();
// Signal that this request has been processed and the next pipelined
// request on the same connection can be accepted.
this.done.resolve();
}
}

interface ServeEnv {
reqQueue: ServerRequest[];
serveDeferred: Deferred;
}

/** Continuously read more requests from conn until EOF
* Calls maybeHandleReq.
* bufr is empty on a fresh TCP connection.
* Would be passed around and reused for later request on same conn
* TODO: make them async function after this change is done
* https://github.com/tc39/ecma262/pull/1250
* See https://v8.dev/blog/fast-async
*/
async function readRequest(
c: HttpConn,
bufr?: BufReader
conn: Conn,
bufr: BufReader
): Promise<[ServerRequest, BufState]> {
if (!bufr) {
bufr = new BufReader(c);
}
const bufw = new BufWriter(c);
const req = new ServerRequest();

// Set and incr pipeline id;
req.pipelineId = ++c.lastPipelineId;
// Set a new pipeline deferred associated with this request
// for future requests to wait for.
c.pendingDeferredMap.set(req.pipelineId, deferred());

req.conn = c;
req.r = bufr!;
req.w = bufw;
const tp = new TextProtoReader(bufr!);

let s: string;
req.conn = conn;
req.r = bufr;
req.w = new BufWriter(conn);
const tp = new TextProtoReader(bufr);
let err: BufState;

// First line: GET /index.html HTTP/1.0
[s, err] = await tp.readLine();
let firstLine: string;
[firstLine, err] = await tp.readLine();
if (err) {
return [null, err];
}
[req.method, req.url, req.proto] = s.split(" ", 3);

[req.method, req.url, req.proto] = firstLine.split(" ", 3);
[req.headers, err] = await tp.readMIMEHeader();

return [req, err];
}

function maybeHandleReq(
env: ServeEnv,
conn: Conn,
maybeReq: [ServerRequest, BufState]
): void {
const [req, _err] = maybeReq;
if (_err) {
conn.close(); // assume EOF for now...
return;
}
env.reqQueue.push(req); // push req to queue
env.serveDeferred.resolve(); // signal while loop to process it
}
export class Server implements AsyncIterable<ServerRequest> {
private closing = false;

function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
constructor(public listener: Listener) {}

export async function* serve(
addr: string
): AsyncIterableIterator<ServerRequest> {
const listener = listen("tcp", addr);
const env: ServeEnv = {
reqQueue: [], // in case multiple promises are ready
serveDeferred: deferred()
};
close(): void {
this.closing = true;
this.listener.close();
}

// Routine that keeps calling accept
let handleConn = (_conn: Conn): void => {};
let scheduleAccept = (): void => {};
const acceptRoutine = (): void => {
scheduleAccept = (): void => {
listener.accept().then(handleConn);
};
handleConn = (conn: Conn): void => {
const httpConn = createHttpConn(conn);
serveConn(env, httpConn); // don't block
scheduleAccept(); // schedule next accept
};
// Yields all HTTP requests on a single TCP connection.
private async *iterateHttpRequests(
conn: Conn
): AsyncIterableIterator<ServerRequest> {
const bufr = new BufReader(conn);
let bufStateErr: BufState;
let req: ServerRequest;

while (!this.closing) {
[req, bufStateErr] = await readRequest(conn, bufr);
if (bufStateErr) break;
yield req;
// Wait for the request to be processed before we accept a new request on
// this connection.
await req.done;
}

scheduleAccept();
};
if (bufStateErr === "EOF") {
// The connection was gracefully closed.
} else if (bufStateErr instanceof Error) {
// TODO(ry): send something back like a HTTP 500 status.
} else if (this.closing) {
// There are more requests incoming but the server is closing.
// TODO(ry): send a back a HTTP 503 Service Unavailable status.
} else {
fail(`unexpected BufState: ${bufStateErr}`);
}

acceptRoutine();
conn.close();
}

// Loop hack to allow yield (yield won't work in callbacks)
while (true) {
await env.serveDeferred.promise;
env.serveDeferred = deferred(); // use a new deferred
let queueToProcess = env.reqQueue;
env.reqQueue = [];
for (const result of queueToProcess) {
yield result;
// Continue read more from conn when user is done with the current req
// Moving this here makes it easier to manage
serveConn(env, result.conn, result.r);
}
// Accepts a new TCP connection and yields all HTTP requests that arrive on
// it. When a connection is accepted, it also creates a new iterator of the
// same kind and adds it to the request multiplexer so that another TCP
// connection can be accepted.
private async *acceptConnAndIterateHttpRequests(
mux: MuxAsyncIterator<ServerRequest>
): AsyncIterableIterator<ServerRequest> {
if (this.closing) return;
// Wait for a new connection.
const conn = await this.listener.accept();
// Try to accept another connection and add it to the multiplexer.
mux.add(this.acceptConnAndIterateHttpRequests(mux));
// Yield the requests that arrive on the just-accepted connection.
yield* this.iterateHttpRequests(conn);
}
listener.close();

[Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> {
const mux: MuxAsyncIterator<ServerRequest> = new MuxAsyncIterator();
mux.add(this.acceptConnAndIterateHttpRequests(mux));
return mux.iterate();
}
}

export function serve(addr: string): Server {
const listener = listen("tcp", addr);
return new Server(listener);
}

export async function listenAndServe(
Expand Down
32 changes: 3 additions & 29 deletions http/server_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,6 @@ const dec = new TextDecoder();

type Handler = () => void;

interface Deferred {
promise: Promise<{}>;
resolve: Handler;
reject: Handler;
}

function deferred(isResolved = false): Deferred {
let resolve: Handler = (): void => void 0;
let reject: Handler = (): void => void 0;
const promise = new Promise(
(res, rej): void => {
resolve = res;
reject = rej;
}
);
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}

const responseTests: ResponseTest[] = [
// Default response
{
Expand All @@ -72,8 +47,8 @@ test(async function responseWrite(): Promise<void> {
const buf = new Buffer();
const bufw = new BufWriter(buf);
const request = new ServerRequest();
request.pipelineId = 1;
request.w = bufw;

request.conn = {
localAddr: "",
remoteAddr: "",
Expand All @@ -86,13 +61,12 @@ test(async function responseWrite(): Promise<void> {
write: async (): Promise<number> => {
return -1;
},
close: (): void => {},
lastPipelineId: 0,
pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]])
close: (): void => {}
};

await request.respond(testCase.response);
assertEquals(buf.toString(), testCase.raw);
await request.done;
}
});

Expand Down
1 change: 1 addition & 0 deletions test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import "./strings/test.ts";
import "./testing/test.ts";
import "./textproto/test.ts";
import "./toml/test.ts";
import "./util/test.ts";
import "./ws/test.ts";

import "./testing/main.ts";
Loading

0 comments on commit a295bb0

Please sign in to comment.