Skip to content

Commit

Permalink
wip: refactor BufReader/BufWriter to be more idiomatic
Browse files Browse the repository at this point in the history
wip

wip
  • Loading branch information
piscisaureus committed May 26, 2019
1 parent 4381785 commit 97c67e1
Show file tree
Hide file tree
Showing 11 changed files with 509 additions and 410 deletions.
2 changes: 1 addition & 1 deletion encoding/test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import "./toml_test.ts";
import "./csv_test.ts";
//import "./csv_test.ts";
8 changes: 4 additions & 4 deletions http/file_server_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const { readFile, run } = Deno;

import { test } from "../testing/mod.ts";
import { assert, assertEquals } from "../testing/asserts.ts";
import { BufReader } from "../io/bufio.ts";
import { BufReader, EOF } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";

let fileServer;
Expand All @@ -22,10 +22,10 @@ async function startFileServer(): Promise<void> {
});
// Once fileServer is ready it will write to its stdout.
const r = new TextProtoReader(new BufReader(fileServer.stdout));
const [s, err] = await r.readLine();
assert(err == null);
assert(s.includes("server listening"));
const s = await r.readLine();
assert(s !== EOF && s.includes("server listening"));
}

function killFileServer(): void {
fileServer.close();
fileServer.stdout.close();
Expand Down
14 changes: 7 additions & 7 deletions http/racing_server_test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const { dial, run } = Deno;

import { test } from "../testing/mod.ts";
import { test, runIfMain } from "../testing/mod.ts";
import { assert, assertEquals } from "../testing/asserts.ts";
import { BufReader } from "../io/bufio.ts";
import { BufReader, EOF } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";

let server;
Expand All @@ -13,9 +13,8 @@ async function startServer(): Promise<void> {
});
// Once fileServer is ready it will write to its stdout.
const r = new TextProtoReader(new BufReader(server.stdout));
const [s, err] = await r.readLine();
assert(err == null);
assert(s.includes("Racing server listening..."));
const s = await r.readLine();
assert(s !== EOF && s.includes("Racing server listening..."));
}
function killServer(): void {
server.close();
Expand Down Expand Up @@ -57,9 +56,10 @@ test(async function serverPipelineRace(): Promise<void> {
const outLines = output.split("\n");
// length - 1 to disregard last empty line
for (let i = 0; i < outLines.length - 1; i++) {
const [s, err] = await r.readLine();
assert(!err);
const s = await r.readLine();
assertEquals(s, outLines[i]);
}
killServer();
});

runIfMain(import.meta);
142 changes: 77 additions & 65 deletions http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type Listener = Deno.Listener;
type Conn = Deno.Conn;
type Reader = Deno.Reader;
type Writer = Deno.Writer;
import { BufReader, BufState, BufWriter } from "../io/bufio.ts";
import { BufReader, BufWriter, EOF, UnexpectedEOFError } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { STATUS_TEXT } from "./http_status.ts";
import { assert, fail } from "../testing/asserts.ts";
Expand Down Expand Up @@ -134,26 +134,27 @@ export class ServerRequest {
if (transferEncodings.includes("chunked")) {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(this.r);
let [line] = await tp.readLine();
let line = await tp.readLine();
if (line === EOF) throw new UnexpectedEOFError();
// TODO: handle chunk extension
let [chunkSizeString] = line.split(";");
let chunkSize = parseInt(chunkSizeString, 16);
if (Number.isNaN(chunkSize) || chunkSize < 0) {
throw new Error("Invalid chunk size");
}
while (chunkSize > 0) {
let data = new Uint8Array(chunkSize);
let [nread] = await this.r.readFull(data);
if (nread !== chunkSize) {
throw new Error("Chunk data does not match size");
const data = new Uint8Array(chunkSize);
if ((await this.r.readFull(data)) === EOF) {
throw new UnexpectedEOFError();
}
yield data;
await this.r.readLine(); // Consume \r\n
[line] = await tp.readLine();
line = await tp.readLine();
if (line === EOF) throw new UnexpectedEOFError();
chunkSize = parseInt(line, 16);
}
const [entityHeaders, err] = await tp.readMIMEHeader();
if (!err) {
const entityHeaders = await tp.readMIMEHeader();
if (entityHeaders !== EOF) {
for (let [k, v] of entityHeaders) {
this.headers.set(k, v);
}
Expand Down Expand Up @@ -220,70 +221,78 @@ function fixLength(req: ServerRequest): void {
// ParseHTTPVersion parses a HTTP version string.
// "HTTP/1.0" returns (1, 0, true).
// Ported from https://github.com/golang/go/blob/f5c43b9/src/net/http/request.go#L766-L792
export function parseHTTPVersion(vers: string): [number, number, boolean] {
const Big = 1000000; // arbitrary upper bound
const digitReg = /^\d+$/; // test if string is only digit
let major: number;
let minor: number;

export function parseHTTPVersion(vers: string): [number, number] {
switch (vers) {
case "HTTP/1.1":
return [1, 1, true];
return [1, 1];

case "HTTP/1.0":
return [1, 0, true];
}
return [1, 0];

if (!vers.startsWith("HTTP/")) {
return [0, 0, false];
}
default: {
const Big = 1000000; // arbitrary upper bound
const digitReg = /^\d+$/; // test if string is only digit
let major: number;
let minor: number;

const dot = vers.indexOf(".");
if (dot < 0) {
return [0, 0, false];
}
if (!vers.startsWith("HTTP/")) {
break;
}

let majorStr = vers.substring(vers.indexOf("/") + 1, dot);
major = parseInt(majorStr);
if (!digitReg.test(majorStr) || isNaN(major) || major < 0 || major > Big) {
return [0, 0, false];
}
const dot = vers.indexOf(".");
if (dot < 0) {
break;
}

let majorStr = vers.substring(vers.indexOf("/") + 1, dot);
major = parseInt(majorStr);
if (
!digitReg.test(majorStr) ||
isNaN(major) ||
major < 0 ||
major > Big
) {
break;
}

let minorStr = vers.substring(dot + 1);
minor = parseInt(minorStr);
if (!digitReg.test(minorStr) || isNaN(minor) || minor < 0 || minor > Big) {
return [0, 0, false];
let minorStr = vers.substring(dot + 1);
minor = parseInt(minorStr);
if (
!digitReg.test(minorStr) ||
isNaN(minor) ||
minor < 0 ||
minor > Big
) {
break;
}

return [major, minor];
}
}
return [major, minor, true];

throw new Error(`malformed HTTP version ${vers}`);
}

export async function readRequest(
bufr: BufReader
): Promise<[ServerRequest, BufState]> {
): Promise<ServerRequest | EOF> {
const tp = new TextProtoReader(bufr);
const firstLine = await tp.readLine(); // e.g. GET /index.html HTTP/1.0
if (firstLine === EOF) return EOF;
const headers = await tp.readMIMEHeader();
if (headers === EOF) throw new UnexpectedEOFError();

const req = new ServerRequest();
req.r = bufr;
const tp = new TextProtoReader(bufr);
let err: BufState;
// First line: GET /index.html HTTP/1.0
let firstLine: string;
[firstLine, err] = await tp.readLine();
if (err) {
return [null, err];
}
[req.method, req.url, req.proto] = firstLine.split(" ", 3);

let ok: boolean;
[req.protoMinor, req.protoMajor, ok] = parseHTTPVersion(req.proto);
if (!ok) {
throw Error(`malformed HTTP version ${req.proto}`);
}

[req.headers, err] = await tp.readMIMEHeader();
[req.protoMinor, req.protoMajor] = parseHTTPVersion(req.proto);
req.headers = headers;
fixLength(req);
// TODO(zekth) : add parsing of headers eg:
// rfc: https://tools.ietf.org/html/rfc7230#section-3.3.2
// A sender MUST NOT send a Content-Length header field in any message
// that contains a Transfer-Encoding header field.
return [req, err];
return req;
}

export class Server implements AsyncIterable<ServerRequest> {
Expand All @@ -302,36 +311,39 @@ export class Server implements AsyncIterable<ServerRequest> {
): AsyncIterableIterator<ServerRequest> {
const bufr = new BufReader(conn);
const w = new BufWriter(conn);
let bufStateErr: BufState;
let req: ServerRequest;
let req: ServerRequest | EOF;
let err: Error | undefined;

while (!this.closing) {
try {
[req, bufStateErr] = await readRequest(bufr);
} catch (err) {
bufStateErr = err;
req = await readRequest(bufr);
} catch (e) {
err = e;
break;
}
if (req === EOF) {
break;
}
if (bufStateErr) break;

req.w = w;
yield req;

// Wait for the request to be processed before we accept a new request on
// this connection.
await req.done;
}

if (bufStateErr === "EOF") {
// The connection was gracefully closed.
} else if (bufStateErr instanceof Error) {
if (req === EOF) {
// No more requests arrived on connection.
} else if (err) {
// An error was thrown while parsing request headers.
await writeResponse(req.w, {
status: 400,
body: new TextEncoder().encode(`${bufStateErr.message}\r\n\r\n`)
body: new TextEncoder().encode(`${err.message}\r\n\r\n`)
});
} else if (this.closing) {
// There are more requests incoming but the server is closing.
// TODO(ry): send a back a HTTP 503 Service Unavailable status.
} else {
fail(`unexpected BufState: ${bufStateErr}`);
}

conn.close();
Expand Down
Loading

0 comments on commit 97c67e1

Please sign in to comment.