Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(extensions/websocket): implement websocketstream #10365

Merged
merged 77 commits into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
c571f8f
implement
crowlKats Apr 25, 2021
71eb7d7
fiix
crowlKats Apr 25, 2021
03db16b
fiix
crowlKats Apr 25, 2021
0f34ea5
Merge branch 'main' into websocketstream
crowlKats Apr 30, 2021
cfa0ef9
better errors
crowlKats Apr 30, 2021
ff6f1df
Merge branch 'main' into websocketstream
ry May 1, 2021
bd2ae46
Merge branch 'main' into websocketstream
crowlKats May 1, 2021
3364f0c
webidl
crowlKats May 1, 2021
501262b
lint
crowlKats May 1, 2021
a518f43
fix idl
crowlKats May 1, 2021
9660661
add tests
crowlKats May 1, 2021
e70a25d
clean up
crowlKats May 2, 2021
30df21f
Update extensions/websocket/02_websocketstream.js
crowlKats May 3, 2021
fa9fd0f
clean up
crowlKats May 3, 2021
358ea94
throw TypeError for invalid chunk type
crowlKats May 3, 2021
db5d42f
Update extensions/websocket/02_websocketstream.js
crowlKats May 3, 2021
ec1fbe8
fix
crowlKats May 3, 2021
3b28892
Update extensions/websocket/02_websocketstream.js
crowlKats May 3, 2021
c842e0e
Merge remote-tracking branch 'origin/main' into websocketstream
piscisaureus May 4, 2021
4aaae1b
lint ignore no-undef
crowlKats May 4, 2021
40f1325
move type declaration
crowlKats May 4, 2021
e522db8
Revert "move type declaration"
crowlKats May 4, 2021
c382f52
Merge branch 'main' into websocketstream
crowlKats May 11, 2021
aa04087
partial wpt conformance
crowlKats May 11, 2021
7627a41
more wpt
crowlKats May 11, 2021
ba5f7b6
clean up
crowlKats May 11, 2021
2ca5d8b
Merge branch 'main' into websocketstream
crowlKats May 19, 2021
6ab0832
update expectation.json
crowlKats May 19, 2021
0b85846
remove no-undef directive
crowlKats May 20, 2021
c263437
remove unneeded tests
crowlKats May 20, 2021
6580940
custominspect
crowlKats May 20, 2021
f881abd
Merge branch 'main' into websocketstream
crowlKats May 25, 2021
82bcfd7
fix hanging
crowlKats May 25, 2021
c0060b3
Merge branch 'main' into websocketstream
crowlKats Jun 16, 2021
05667b4
fix
crowlKats Jun 16, 2021
141fca5
fix wpt
crowlKats Jun 17, 2021
a500487
Merge branch 'main' into websocketstream
crowlKats Jul 8, 2021
f6a38aa
fix
crowlKats Jul 8, 2021
9fe67ec
fix constructor wpt
crowlKats Jul 8, 2021
bd4cfba
fmt
crowlKats Jul 8, 2021
4da878d
proper abort
crowlKats Jul 9, 2021
cff8448
fix
crowlKats Jul 9, 2021
b4bf49e
fix abort
crowlKats Jul 9, 2021
c90e243
clean up
crowlKats Jul 10, 2021
d3ebda7
fmt
crowlKats Jul 10, 2021
be2701a
revert early close
crowlKats Jul 10, 2021
f8f3062
lint
crowlKats Jul 10, 2021
1056d70
handle early close
crowlKats Jul 10, 2021
0930f9d
handle early close
crowlKats Jul 10, 2021
35ec1ef
more wpt
crowlKats Jul 10, 2021
2666a08
fix wpt
crowlKats Jul 10, 2021
63d6f52
lint
crowlKats Jul 10, 2021
b0c9c41
fmt
crowlKats Jul 10, 2021
c0b5a48
primordials
littledivy Jul 10, 2021
f5dc1e9
Update extensions/websocket/02_websocketstream.js
crowlKats Jul 10, 2021
5c7745d
Merge branch 'main' into websocketstream
crowlKats Jul 12, 2021
aff900a
fix leaking
crowlKats Jul 12, 2021
a212a71
fix various issues
crowlKats Jul 12, 2021
b081843
fmt
crowlKats Jul 12, 2021
ceff648
lint
crowlKats Jul 12, 2021
1eb23fe
Merge branch 'main' into websocketstream
crowlKats Jul 19, 2021
1ef1633
fix
crowlKats Jul 19, 2021
d2a9e86
ci
crowlKats Jul 19, 2021
a3844bf
fix
crowlKats Jul 19, 2021
6aa7957
fix
crowlKats Jul 19, 2021
8fade20
fix
crowlKats Jul 26, 2021
09f2dde
Merge branch 'main' into websocketstream
bartlomieju Jul 29, 2021
f5c46c8
Merge branch 'main' into websocketstream
bartlomieju Aug 2, 2021
1e0f29b
Merge branch 'main' into websocketstream
bartlomieju Aug 6, 2021
57c12b9
Update extensions/websocket/02_websocketstream.js
crowlKats Aug 8, 2021
198710b
add branding
crowlKats Aug 8, 2021
35db598
use internal abort methods
crowlKats Aug 8, 2021
5abd0bb
Merge remote-tracking branch 'origin/websocketstream' into websockets…
crowlKats Aug 8, 2021
67c25c5
Merge branch 'main' into websocketstream
lucacasonato Aug 9, 2021
f8bbe00
Merge remote-tracking branch 'origin/main' into websocketstream
lucacasonato Aug 9, 2021
1c80cb0
fix build
lucacasonato Aug 9, 2021
4c5f5b2
make WebSocketStream unstable
lucacasonato Aug 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cli/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1265,3 +1265,28 @@ declare interface WorkerOptions {
};
};
}

declare interface WebSocketStreamOptions {
protocols?: string[];
signal?: AbortSignal;
}

declare interface WebSocketConnection {
readable: ReadableStream<string | Uint8Array>;
writable: WritableStream<string | Uint8Array>;
extensions: string;
protocol: string;
}

declare interface WebSocketCloseInfo {
code?: number;
reason?: string;
}

declare class WebSocketStream {
constructor(url: string, options?: WebSocketStreamOptions);
url: string;
connection: Promise<WebSocketConnection>;
closed: Promise<WebSocketCloseInfo>;
close(closeInfo: WebSocketCloseInfo): void;
}
247 changes: 247 additions & 0 deletions op_crates/websocket/02_websocketstream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";

((window) => {
const core = window.Deno.core;

/** @template T */
class Deferred {
/** @type {Promise<T>} */
#promise;
/** @type {(reject?: any) => void} */
#reject;
/** @type {(value: T | PromiseLike<T>) => void} */
#resolve;
/** @type {"pending" | "fulfilled"} */
#state = "pending";

constructor() {
this.#promise = new Promise((resolve, reject) => {
this.#resolve = resolve;
this.#reject = reject;
});
}

/** @returns {Promise<T>} */
get promise() {
return this.#promise;
}

/** @returns {"pending" | "fulfilled"} */
get state() {
return this.#state;
}

/** @param {any=} reason */
reject(reason) {
// already settled promises are a no-op
if (this.#state !== "pending") {
return;
}
this.#state = "fulfilled";
this.#reject(reason);
}

/** @param {T | PromiseLike<T>} value */
resolve(value) {
// already settled promises are a no-op
if (this.#state !== "pending") {
return;
}
this.#state = "fulfilled";
this.#resolve(value);
}
}

/**
* Tries to close the resource (and ignores BadResource errors).
* @param {number} rid
*/
function tryClose(rid) {
try {
core.close(rid);
} catch (err) {
// Ignore error if the socket has already been closed.
if (!(err instanceof Deno.errors.BadResource)) throw err;
}
}

class WebSocketStream {
#rid;

#url;
get url() {
return this.#url;
}

constructor(url, options) {
requiredArguments("WebSocket", arguments.length, 1);

const wsURL = new URL(url);

if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") {
throw new DOMException(
"Only ws & wss schemes are allowed in a WebSocket URL.",
"SyntaxError",
);
}

if (wsURL.hash !== "" || wsURL.href.endsWith("#")) {
throw new DOMException(
"Fragments are not allowed in a WebSocket URL.",
"SyntaxError",
);
}

this.#url = wsURL.href;

core.opSync("op_ws_check_permission", this.#url);

if (
options?.protocols?.some((x) =>
options.protocols.indexOf(x) !== options.protocols.lastIndexOf(x)
)
) {
throw new DOMException(
"Can't supply multiple times the same protocol.",
"SyntaxError",
);
}

core.opAsync("op_ws_create", {
url: wsURL.href,
protocols: options?.protocols?.join(", "),
}).then((create) => {
if (create.success) {
options.abort.addEventListener("abort", () => this.close());

const readable = new ReadableStream({
pull: async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this.#rid,
);

switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(new Uint8Array(value));
break;
}
case "ping": {
core.opAsync("op_ws_send", {
rid: this.#rid,
kind: "pong",
});
break;
}
case "close": {
this.#closed.resolve(value);
tryClose(this.#rid);
break;
}
case "error": {
this.#closed.reject();
controller.error(value);
tryClose(this.#rid);
break;
}
}
},
cancel: (reason) => {
this.close(reason);
},
});
const writable = new WritableStream({
write: async (chunk) => {
if (typeof chunk === "string") {
await core.opAsync("op_ws_send", {
rid: this.#rid,
kind: "text",
text: chunk,
});
} else if (chunk instanceof Uint8Array) {
await core.opAsync("op_ws_send", {
rid: this.#rid,
kind: "binary",
}, chunk);
}
},
cancel: (reason) => {
this.close(reason);
},
abort: (reason) => {
this.close(reason);
},
});

this.#connection.resolve({
readable,
writable,
extensions: create.extensions ?? "",
protocol: create.protocol ?? "",
});
} else {
this.#connection.reject();
this.#closed.reject();
}
});
}

#connection = new Deferred();
get connection() {
return this.#connection.promise;
}

#closed = new Deferred();
get closed() {
return this.#closed.promise;
}

close(closeInfo) {
if (
closeInfo?.code &&
!(closeInfo.code === 1000 ||
(3000 <= closeInfo.code && closeInfo.code < 5000))
) {
throw new DOMException(
"The close code must be either 1000 or in the range of 3000 to 4999.",
"NotSupportedError",
);
}

const encoder = new TextEncoder();
if (
closeInfo?.reason && encoder.encode(closeInfo.reason).byteLength > 123
) {
throw new DOMException(
"The close reason may not be longer than 123 bytes.",
"SyntaxError",
);
}

let code = closeInfo?.code;
if (closeInfo?.reason && code === undefined) {
code = 1000;
}

if (this.#closed.state === "pending") {
core.opAsync("op_ws_close", {
rid: this.#rid,
code,
reason: closeInfo?.reason,
}).then(() => {
this.#closed.resolve({
code: closeInfo?.code ?? 1005,
reason: closeInfo?.reason,
});
});
}
}
}

window.__bootstrap.webSocket.WebSocketStream = WebSocketStream;
})(this);
19 changes: 13 additions & 6 deletions op_crates/websocket/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ pub enum NextEventResponse {
Close { code: u16, reason: String },
Ping,
Pong,
Error,
Error(String),
Closed,
}

Expand Down Expand Up @@ -323,7 +323,7 @@ pub async fn op_ws_next_event(
},
Some(Ok(Message::Ping(_))) => NextEventResponse::Ping,
Some(Ok(Message::Pong(_))) => NextEventResponse::Pong,
Some(Err(_)) => NextEventResponse::Error,
Some(Err(e)) => NextEventResponse::Error(e.to_string()),
None => {
state.borrow_mut().resource_table.close(rid).unwrap();
NextEventResponse::Closed
Expand All @@ -334,12 +334,19 @@ pub async fn op_ws_next_event(

/// Load and execute the javascript code.
pub fn init(isolate: &mut JsRuntime) {
isolate
.execute(
let files = vec![
(
"deno:op_crates/websocket/01_websocket.js",
include_str!("01_websocket.js"),
)
.unwrap();
),
(
"deno:op_crates/websocket/02_websocketstream.js",
include_str!("02_websocketstream.js"),
),
];
for (url, source_code) in files {
isolate.execute(url, source_code).unwrap();
}
}

pub fn get_declaration() -> PathBuf {
Expand Down
1 change: 1 addition & 0 deletions runtime/js/99_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ delete Object.prototype.__proto__;
URL: util.nonEnumerable(url.URL),
URLSearchParams: util.nonEnumerable(url.URLSearchParams),
WebSocket: util.nonEnumerable(webSocket.WebSocket),
WebSocketStream: util.nonEnumerable(webSocket.WebSocketStream),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to only be registered when --unstable is passed? I think typeof WebSocketStream === "undefined" should be true when user does not pass --unstable.

Worker: util.nonEnumerable(worker.Worker),
WritableStream: util.nonEnumerable(streams.WritableStream),
WritableStreamDefaultWriter: util.nonEnumerable(
Expand Down