Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up HTTP async iterator code #411

Merged
merged 22 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions http/http_bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ import { serve } from "./server.ts";

const addr = Deno.args[1] || "127.0.0.1:4500";
const server = serve(addr);

const body = new TextEncoder().encode("Hello World");

async function main(): Promise<void> {
console.log(`http://${addr}/`);
for await (const request of server) {
request.respond({ status: 200, body });
for await (const req of server) {
req.respond({ body });
}
}

Expand Down
191 changes: 72 additions & 119 deletions http/server.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,21 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
const { listen, copy, toAsyncIterator } = Deno;
type Listener = Deno.Listener;
type Conn = Deno.Conn;
type Reader = Deno.Reader;
type Writer = Deno.Writer;
import { BufReader, BufState, BufWriter } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { STATUS_TEXT } from "./http_status.ts";
import { assert } from "../testing/asserts.ts";

interface Deferred {
promise: Promise<{}>;
resolve: () => void;
reject: () => void;
}

function deferred(isResolved = false): Deferred {
let resolve, reject;
const promise = new Promise(
(res, rej): void => {
resolve = res;
reject = rej;
}
);
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}
import { Channel, deferred, Deferred } from "../util/async.ts";

interface HttpConn extends Conn {
ry marked this conversation as resolved.
Show resolved Hide resolved
// When read by a newly created request B, lastId is the id pointing to a previous
// request A, such that we must wait for responses to A to complete before
// writing B's response.
lastPipelineId: number;
pendingDeferredMap: Map<number, Deferred>;
pendingDeferredMap: Map<number, Deferred<void>>;
}

function createHttpConn(c: Conn): HttpConn {
Expand All @@ -46,8 +24,10 @@ function createHttpConn(c: Conn): HttpConn {
pendingDeferredMap: new Map()
});

const resolvedDeferred = deferred(true);
httpConn.pendingDeferredMap.set(0, resolvedDeferred);
const d = deferred();
d.resolve(); // The first request is ready immediately.
httpConn.pendingDeferredMap.set(0, d);

return httpConn;
}

Expand All @@ -58,6 +38,7 @@ function bufWriter(w: Writer): BufWriter {
return new BufWriter(w);
}
}

export function setContentLength(r: Response): void {
if (!r.headers) {
r.headers = new Headers();
Expand All @@ -74,6 +55,7 @@ export function setContentLength(r: Response): void {
}
}
}

async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const writer = bufWriter(w);
const encoder = new TextEncoder();
Expand All @@ -90,6 +72,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const endChunk = encoder.encode("0\r\n\r\n");
await writer.write(endChunk);
}

export async function writeResponse(w: Writer, r: Response): Promise<void> {
const protoMajor = 1;
const protoMinor = 1;
Expand Down Expand Up @@ -131,6 +114,7 @@ export async function writeResponse(w: Writer, r: Response): Promise<void> {
}
await writer.flush();
}

async function readAllIterator(
it: AsyncIterableIterator<Uint8Array>
): Promise<Uint8Array> {
Expand Down Expand Up @@ -250,7 +234,7 @@ export class ServerRequest {
lastPipelineId
);
assert(!!lastPipelineDeferred);
await lastPipelineDeferred.promise;
await lastPipelineDeferred;
// If yes, delete old deferred and proceed with writing.
this.conn.pendingDeferredMap.delete(lastPipelineId);
// Write our response!
Expand All @@ -264,114 +248,83 @@ export class ServerRequest {
}
}

interface ServeEnv {
reqQueue: ServerRequest[];
serveDeferred: Deferred;
}

/** Continuously read more requests from conn until EOF
* Calls maybeHandleReq.
* bufr is empty on a fresh TCP connection.
* Would be passed around and reused for later request on same conn
* TODO: make them async function after this change is done
* https://github.com/tc39/ecma262/pull/1250
* See https://v8.dev/blog/fast-async
*/
async function readRequest(
c: HttpConn,
bufr?: BufReader
): Promise<[ServerRequest, BufState]> {
if (!bufr) {
bufr = new BufReader(c);
}
async function* iterateHttpRequests(
ry marked this conversation as resolved.
Show resolved Hide resolved
c: HttpConn
): AsyncIterableIterator<[ServerRequest | null, BufState]> {
const bufr = new BufReader(c);
const bufw = new BufWriter(c);
const req = new ServerRequest();

// Set and incr pipeline id;
req.pipelineId = ++c.lastPipelineId;
// Set a new pipeline deferred associated with this request
// for future requests to wait for.
c.pendingDeferredMap.set(req.pipelineId, deferred());

req.conn = c;
req.r = bufr!;
req.w = bufw;
const tp = new TextProtoReader(bufr!);

let s: string;
let err: BufState;

// First line: GET /index.html HTTP/1.0
[s, err] = await tp.readLine();
if (err) {
return [null, err];
}
[req.method, req.url, req.proto] = s.split(" ", 3);

[req.headers, err] = await tp.readMIMEHeader();

return [req, err];
}
for (;;) {
const req = new ServerRequest();

// Set and incr pipeline id;
req.pipelineId = ++c.lastPipelineId;
// Set a new pipeline deferred associated with this request
// for future requests to wait for.
c.pendingDeferredMap.set(req.pipelineId, deferred());

req.conn = c;
req.r = bufr!;
req.w = bufw;

// First line: GET /index.html HTTP/1.0
const tp = new TextProtoReader(bufr!);
ry marked this conversation as resolved.
Show resolved Hide resolved
let [s, err]: [string, BufState] = await tp.readLine();
if (err) {
yield [null, err];
return;
}

function maybeHandleReq(
env: ServeEnv,
conn: Conn,
maybeReq: [ServerRequest, BufState]
): void {
const [req, _err] = maybeReq;
if (_err) {
conn.close(); // assume EOF for now...
return;
[req.method, req.url, req.proto] = s.split(" ", 3);
[req.headers, err] = await tp.readMIMEHeader();
yield [req, err];
ry marked this conversation as resolved.
Show resolved Hide resolved
}
env.reqQueue.push(req); // push req to queue
env.serveDeferred.resolve(); // signal while loop to process it
}

function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
export class Server implements AsyncIterable<ServerRequest> {
private closing = false;
private looping = false;
private channel = new Channel<ServerRequest>();

export async function* serve(
addr: string
): AsyncIterableIterator<ServerRequest> {
const listener = listen("tcp", addr);
const env: ServeEnv = {
reqQueue: [], // in case multiple promises are ready
serveDeferred: deferred()
};

// Routine that keeps calling accept
let handleConn = (_conn: Conn): void => {};
let scheduleAccept = (): void => {};
const acceptRoutine = (): void => {
scheduleAccept = (): void => {
listener.accept().then(handleConn);
};
handleConn = (conn: Conn): void => {
constructor(public listener: Listener) {}

close(): void {
this.closing = true;
this.listener.close();
}

private async *iterateRequests(): AsyncIterableIterator<ServerRequest> {
while (!this.closing) {
const conn = await this.listener.accept();
const httpConn = createHttpConn(conn);
serveConn(env, httpConn); // don't block
scheduleAccept(); // schedule next accept
};

scheduleAccept();
};

acceptRoutine();

// Loop hack to allow yield (yield won't work in callbacks)
while (true) {
await env.serveDeferred.promise;
env.serveDeferred = deferred(); // use a new deferred
let queueToProcess = env.reqQueue;
env.reqQueue = [];
for (const result of queueToProcess) {
yield result;
// Continue read more from conn when user is done with the current req
// Moving this here makes it easier to manage
serveConn(env, result.conn, result.r);

for await (const [req, err] of iterateHttpRequests(httpConn)) {
if (err) {
// TODO(ry) This should be more granular. Perhaps return back a 400 or
ry marked this conversation as resolved.
Show resolved Hide resolved
// 500 error?
httpConn.close();
break;
}
yield req;
}
}
}
listener.close();

[Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> {
return this.iterateRequests();
}
}

export function serve(addr: string): Server {
const listener = listen("tcp", addr);
return new Server(listener);
}

export async function listenAndServe(
Expand Down
33 changes: 7 additions & 26 deletions http/server_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { assertEquals } from "../testing/asserts.ts";
import { Response, ServerRequest, writeResponse } from "./server.ts";
import { BufReader, BufWriter } from "../io/bufio.ts";
import { StringReader } from "../io/readers.ts";
import { deferred } from "../util/async.ts";

interface ResponseTest {
response: Response;
Expand All @@ -22,31 +23,6 @@ const dec = new TextDecoder();

type Handler = () => void;

interface Deferred {
promise: Promise<{}>;
resolve: Handler;
reject: Handler;
}

function deferred(isResolved = false): Deferred {
let resolve: Handler = (): void => void 0;
let reject: Handler = (): void => void 0;
const promise = new Promise(
(res, rej): void => {
resolve = res;
reject = rej;
}
);
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}

const responseTests: ResponseTest[] = [
// Default response
{
Expand Down Expand Up @@ -74,6 +50,11 @@ test(async function responseWrite(): Promise<void> {
const request = new ServerRequest();
request.pipelineId = 1;
request.w = bufw;

const d0 = deferred<void>();
d0.resolve();
const d1 = deferred<void>();

request.conn = {
localAddr: "",
remoteAddr: "",
Expand All @@ -88,7 +69,7 @@ test(async function responseWrite(): Promise<void> {
},
close: (): void => {},
lastPipelineId: 0,
pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]])
pendingDeferredMap: new Map([[0, d0], [1, d1]])
};

await request.respond(testCase.response);
Expand Down
4 changes: 2 additions & 2 deletions prettier/testdata/opts/0.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
console.log(0)
console.log([function foo() {}, function baz() {}, a => {}])
console.log(0);
console.log([function foo() {}, function baz() {}, (a) => {}]);
2 changes: 1 addition & 1 deletion prettier/testdata/opts/1.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
console.log ("1")
console.log('1');
2 changes: 1 addition & 1 deletion prettier/testdata/opts/2.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
console.log({a:1})
console.log({a: 1});
Loading