Skip to content

Commit

Permalink
Map promises onto futures.
Browse files Browse the repository at this point in the history
Refactors handlers.rs

The idea is that all Deno "ops" (aka bindings) should map onto
a Rust Future. By setting the "sync" flag in the Base message
users can determine if the future is executed immediately or put
on the event loop.

In the case of async futures, a promise is automatically created.
Errors are automatically forwarded and raised.

TODO:

- The file system ops in src/handler.rs are not using the thread pool
  yet. This will be done in the future using tokio_threadpool::blocking.
  That is, if you try to call them asynchronously, you will get a promise
  and it will act asynchronous, but currently it will be blocking.
- Handlers in src/handler.rs returned boxed futures. This was to make
  it easy while developing. We should try to remove this allocation.
  • Loading branch information
ry committed Sep 9, 2018
1 parent ff6eefd commit 0d03faf
Show file tree
Hide file tree
Showing 9 changed files with 496 additions and 577 deletions.
13 changes: 11 additions & 2 deletions js/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@ export class DenoError<T extends fbs.ErrorKind> extends Error {

// @internal
export function maybeThrowError(base: fbs.Base): void {
const err = maybeError(base);
if (err != null) {
throw err;
}
}

export function maybeError(base: fbs.Base): null | DenoError<fbs.ErrorKind> {
const kind = base.errorKind();
if (kind !== fbs.ErrorKind.NoError) {
throw new DenoError(kind, base.error()!);
if (kind === fbs.ErrorKind.NoError) {
return null;
} else {
return new DenoError(kind, base.error()!);
}
}
68 changes: 59 additions & 9 deletions js/fbs_util.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,77 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
// TODO Rename this file to //js/dispatch.ts
import { libdeno } from "./libdeno";
import { flatbuffers } from "flatbuffers";
import { maybeThrowError } from "./errors";
import { deno as fbs } from "gen/msg_generated";
import * as errors from "./errors";
import * as util from "./util";

let nextCmdId = 0;
const promiseTable = new Map<number, util.Resolvable<fbs.Base>>();

export function handleAsyncMsgFromRust(ui8: Uint8Array) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = fbs.Base.getRootAsBase(bb);

const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
}

// @internal
export function sendAsync(
builder: flatbuffers.Builder,
msgType: fbs.Any,
msg: flatbuffers.Offset
): Promise<fbs.Base> {
const [cmdId, resBuf] = sendInternal(builder, msgType, msg, false);
util.assert(resBuf == null);
const promise = util.createResolvable<fbs.Base>();
promiseTable.set(cmdId, promise);
return promise;
}

// TODO Rename to sendSync
// @internal
export function send(
builder: flatbuffers.Builder,
msgType: fbs.Any,
msg: flatbuffers.Offset
): null | fbs.Base {
fbs.Base.startBase(builder);
fbs.Base.addMsg(builder, msg);
fbs.Base.addMsgType(builder, msgType);
builder.finish(fbs.Base.endBase(builder));

const resBuf = libdeno.send(builder.asUint8Array());
const [cmdId, resBuf] = sendInternal(builder, msgType, msg, true);
util.assert(cmdId >= 0);
if (resBuf == null) {
return null;
} else {
const bb = new flatbuffers.ByteBuffer(new Uint8Array(resBuf!));
const u8 = new Uint8Array(resBuf!);
// console.log("recv sync message", util.hexdump(u8));
const bb = new flatbuffers.ByteBuffer(u8);
const baseRes = fbs.Base.getRootAsBase(bb);
maybeThrowError(baseRes);
errors.maybeThrowError(baseRes);
return baseRes;
}
}

function sendInternal(
builder: flatbuffers.Builder,
msgType: fbs.Any,
msg: flatbuffers.Offset,
sync = true
): [number, null | Uint8Array] {
const cmdId = nextCmdId++;
fbs.Base.startBase(builder);
fbs.Base.addMsg(builder, msg);
fbs.Base.addMsgType(builder, msgType);
fbs.Base.addSync(builder, sync);
fbs.Base.addCmdId(builder, cmdId);
builder.finish(fbs.Base.endBase(builder));

return [cmdId, libdeno.send(builder.asUint8Array())];
}
109 changes: 36 additions & 73 deletions js/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
notImplemented
} from "./util";
import { flatbuffers } from "flatbuffers";
import { send } from "./fbs_util";
import { sendAsync } from "./fbs_util";
import { deno as fbs } from "gen/msg_generated";
import {
Headers,
Expand All @@ -20,16 +20,6 @@ import {
} from "./fetch_types";
import { TextDecoder } from "./text_encoding";

/** @internal */
export function onFetchRes(base: fbs.Base, msg: fbs.FetchRes) {
const id = msg.id();
const req = fetchRequests.get(id);
assert(req != null, `Couldn't find FetchRequest id ${id}`);
req!.onMsg(base, msg);
}

const fetchRequests = new Map<number, FetchRequest>();

class DenoHeaders implements Headers {
append(name: string, value: string): void {
assert(false, "Implement me");
Expand Down Expand Up @@ -58,10 +48,9 @@ class DenoHeaders implements Headers {
}

class FetchResponse implements Response {
readonly url: string;
readonly url: string = "";
body: null;
bodyUsed = false; // TODO
status = 0;
statusText = "FIXME"; // TODO
readonly type = "basic"; // TODO
redirected = false; // TODO
Expand All @@ -71,10 +60,12 @@ class FetchResponse implements Response {
private first = true;
private bodyWaiter: Resolvable<ArrayBuffer>;

constructor(readonly req: FetchRequest) {
this.url = req.url;
constructor(readonly status: number, readonly body_: ArrayBuffer) {
this.bodyWaiter = createResolvable();
this.trailer = createResolvable();
setTimeout(() => {
this.bodyWaiter.resolve(body_);
}, 0);
}

arrayBuffer(): Promise<ArrayBuffer> {
Expand Down Expand Up @@ -114,78 +105,50 @@ class FetchResponse implements Response {
onHeader?: (res: FetchResponse) => void;
onError?: (error: Error) => void;

onMsg(base: fbs.Base, msg: fbs.FetchRes) {
onMsg(base: fbs.Base) {
/*
const error = base.error();
if (error != null) {
assert(this.onError != null);
this.onError!(new Error(error));
return;
}
*/

if (this.first) {
this.first = false;
this.status = msg.status();
assert(this.onHeader != null);
this.onHeader!(this);
} else {
// Body message. Assuming it all comes in one message now.
const bodyArray = msg.bodyArray();
assert(bodyArray != null);
const ab = typedArrayToArrayBuffer(bodyArray!);
this.bodyWaiter.resolve(ab);
}
}
}

let nextFetchId = 0;
//TODO implements Request
class FetchRequest {
private readonly id: number;
response: FetchResponse;
constructor(readonly url: string) {
this.id = nextFetchId++;
fetchRequests.set(this.id, this);
this.response = new FetchResponse(this);
}

onMsg(base: fbs.Base, msg: fbs.FetchRes) {
this.response.onMsg(base, msg);
}

destroy() {
fetchRequests.delete(this.id);
}

start() {
log("dispatch FETCH_REQ", this.id, this.url);

// Send FetchReq message
const builder = new flatbuffers.Builder();
const url = builder.createString(this.url);
fbs.FetchReq.startFetchReq(builder);
fbs.FetchReq.addId(builder, this.id);
fbs.FetchReq.addUrl(builder, url);
const msg = fbs.FetchReq.endFetchReq(builder);
send(builder, fbs.Any.FetchReq, msg);
}
}

export function fetch(
export async function fetch(
input?: Request | string,
init?: RequestInit
): Promise<Response> {
const fetchReq = new FetchRequest(input as string);
const response = fetchReq.response;
const promise = new Promise<Response>((resolve, reject) => {
response.onHeader = (response: FetchResponse) => {
log("onHeader");
resolve(response);
};
response.onError = (error: Error) => {
log("onError", error);
reject(error);
};
});
fetchReq.start();
return promise;
const url = input as string;
log("dispatch FETCH_REQ", url);

// Send FetchReq message
const builder = new flatbuffers.Builder();
const url_ = builder.createString(url);
fbs.FetchReq.startFetchReq(builder);
fbs.FetchReq.addUrl(builder, url_);
const resBase = await sendAsync(
builder,
fbs.Any.FetchReq,
fbs.FetchReq.endFetchReq(builder)
);

// Decode FetchRes
assert(fbs.Any.FetchRes === resBase.msgType());
const msg = new fbs.FetchRes();
assert(resBase.msg(msg) != null);

const status = msg.status();
const bodyArray = msg.bodyArray();
assert(bodyArray != null);
const body = typedArrayToArrayBuffer(bodyArray!);

const response = new FetchResponse(status, body);
return response;
}
29 changes: 2 additions & 27 deletions js/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import { assert, log, setLogDebug } from "./util";
import * as os from "./os";
import { DenoCompiler } from "./compiler";
import { libdeno } from "./libdeno";
import * as timers from "./timers";
import { onFetchRes } from "./fetch";
import { argv } from "./deno";
import { send } from "./fbs_util";
import { send, handleAsyncMsgFromRust } from "./fbs_util";

function sendStart(): fbs.StartRes {
const builder = new flatbuffers.Builder();
Expand All @@ -22,29 +20,6 @@ function sendStart(): fbs.StartRes {
return startRes;
}

function onMessage(ui8: Uint8Array) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = fbs.Base.getRootAsBase(bb);
switch (base.msgType()) {
case fbs.Any.FetchRes: {
const msg = new fbs.FetchRes();
assert(base.msg(msg) != null);
onFetchRes(base, msg);
break;
}
case fbs.Any.TimerReady: {
const msg = new fbs.TimerReady();
assert(base.msg(msg) != null);
timers.onMessage(msg);
break;
}
default: {
assert(false, "Unhandled message type");
break;
}
}
}

function onGlobalError(
message: string,
source: string,
Expand All @@ -58,7 +33,7 @@ function onGlobalError(

/* tslint:disable-next-line:no-default-export */
export default function denoMain() {
libdeno.recv(onMessage);
libdeno.recv(handleAsyncMsgFromRust);
libdeno.setGlobalErrorHandler(onGlobalError);
const compiler = DenoCompiler.instance();

Expand Down
Loading

0 comments on commit 0d03faf

Please sign in to comment.