Skip to content

Commit

Permalink
feat(protocol): support has_header (#926)
Browse files Browse the repository at this point in the history
  • Loading branch information
zccz14 authored Nov 27, 2024
1 parent 102892c commit c7a0ca7
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 27 deletions.
43 changes: 37 additions & 6 deletions apps/host/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
import WebSocket from 'ws';

const mapTerminalIdToSocket: Record<string, WebSocket.WebSocket> = {};
const mapTerminalIdToHasHeader: Record<string, boolean> = {};

const server = createServer();

Expand Down Expand Up @@ -72,8 +73,9 @@ server.on('upgrade', (request, socket, head) => {
const params = url.searchParams;
const host_token = params.get('host_token');
const terminal_id = params.get('terminal_id');
const has_header = params.get('has_header') === 'true';
if (!((!process.env.HOST_TOKEN || host_token === process.env.HOST_TOKEN) && terminal_id)) {
console.info(formatTime(Date.now()), 'Auth Failed', { terminal_id, host_token });
console.info(formatTime(Date.now()), 'Auth Failed', url.toString());
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
Expand All @@ -86,25 +88,53 @@ server.on('upgrade', (request, socket, head) => {
oldTerminal.close();
}
mapTerminalIdToSocket[terminal_id] = ws; // Register New Terminal
mapTerminalIdToHasHeader[terminal_id] = has_header;
// Forward Terminal Messages
(fromEvent(ws, 'message') as Observable<WebSocket.MessageEvent>).subscribe((origin) => {
const msg = JSON.parse(origin.data.toString());
if (!terminalInfos.has(msg.target_terminal_id)) return; // Skip if Terminal Not Found
mapTerminalIdToSocket[msg.target_terminal_id]?.send(origin.data);
const raw_message = origin.data.toString();
if (has_header) {
const idx = raw_message.indexOf('\n');
const raw_headers = raw_message.slice(0, idx + 1);
const headers = JSON.parse(raw_headers);
const target_terminal_id = headers.target_terminal_id;
if (!target_terminal_id) return; // Skip if target_terminal_id not defined
if (!terminalInfos.has(target_terminal_id)) return; // Skip if Terminal Not Found
if (mapTerminalIdToHasHeader[target_terminal_id]) {
// if target terminal supports header, forward the message as is
mapTerminalIdToSocket[target_terminal_id]?.send(raw_message);
} else {
// if target terminal does not support header, strip the header and forward the message
mapTerminalIdToSocket[target_terminal_id]?.send(raw_message.slice(idx + 1));
}
} else {
// message without headers
const msg = JSON.parse(raw_message);
const target_terminal_id = msg.target_terminal_id;
if (!terminalInfos.has(target_terminal_id)) return; // Skip if Terminal Not Found
if (mapTerminalIdToHasHeader[target_terminal_id]) {
const headers = { target_terminal_id, source_terminal_id: msg.source_terminal_id };
// if target terminal supports headers, wrap the message with header and forward
mapTerminalIdToSocket[target_terminal_id]?.send(JSON.stringify(headers) + '\n' + raw_message);
} else {
// if target terminal does not support headers, forward the message as is
mapTerminalIdToSocket[target_terminal_id]?.send(origin.data);
}
}
});
// Clean up on Terminal Disconnect
fromEvent(ws, 'close').subscribe(() => {
console.info(formatTime(Date.now()), 'terminal disconnected', terminal_id);
terminalInfos.delete(terminal_id);
mapTerminalIdToSocket[terminal_id]?.terminate();
delete mapTerminalIdToSocket[terminal_id];
delete mapTerminalIdToHasHeader[terminal_id];
});
});
});

server.listen(8888);

const HOST_URL = `ws://localhost:8888?host_token=${process.env.HOST_TOKEN}`;
const HOST_URL = `ws://localhost:8888?has_header=true&host_token=${process.env.HOST_TOKEN}`;
const terminal = new Terminal(HOST_URL, {
terminal_id: '@host',
name: 'Host Terminal',
Expand Down Expand Up @@ -139,10 +169,11 @@ defer(() => terminalInfos.keys())
retry(3),
tap({
error: (err) => {
console.info(formatTime(Date.now()), 'Terminal ping failed', target_terminal_id, err);
console.info(formatTime(Date.now()), 'Terminal ping failed', target_terminal_id, `${err}`);
terminalInfos.delete(target_terminal_id);
mapTerminalIdToSocket[target_terminal_id]?.terminate();
delete mapTerminalIdToSocket[target_terminal_id];
delete mapTerminalIdToHasHeader[target_terminal_id];
},
}),
catchError(() => EMPTY),
Expand Down
10 changes: 10 additions & 0 deletions common/changes/@yuants/app-host/2024-11-26-22-06.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/app-host",
"comment": "support has_header",
"type": "minor"
}
],
"packageName": "@yuants/app-host"
}
10 changes: 10 additions & 0 deletions common/changes/@yuants/protocol/2024-11-26-22-06.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/protocol",
"comment": "support has_header",
"type": "minor"
}
],
"packageName": "@yuants/protocol"
}
62 changes: 41 additions & 21 deletions libraries/protocol/src/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ export class Terminal {
terminal_id: string;
private _serviceHandlers: Record<string, IServiceHandler> = {};
private _serviceOptions: Record<string, IServiceOptions> = {};
/**
* Terminal Message Header Flag
*/
private has_header: boolean;

private _terminalInfoUpdated$ = new Subject<void>();

Expand All @@ -151,6 +155,7 @@ export class Terminal {
const url = new URL(host_url);
url.searchParams.set('terminal_id', this.terminal_id); // make sure terminal_id is in the connection parameters
this.host_url = url.toString();
this.has_header = url.searchParams.get('has_header') === 'true';

this._conn = connection || createConnectionWs(this.host_url);
this._setupTunnel();
Expand All @@ -171,18 +176,45 @@ export class Terminal {
{ session_id: string; maxMessageSize: number; peer: SimplePeer.Instance } | undefined
> = {};

private _parseMsgFromWs = (msg: string): ITerminalMessage => {
//
TerminalReceivedBytesTotal.add(msg.length, {
terminal_id: this.terminal_id,
tunnel: 'WS',
});

if (this.has_header) {
return JSON.parse(msg.slice(msg.indexOf('\n') + 1));
}
return JSON.parse(msg);
};

private _sendMsgByWs = (msg: ITerminalMessage): void => {
//
if (this.has_header) {
const headers = {
target_terminal_id: msg.target_terminal_id,
source_terminal_id: msg.source_terminal_id,
};
this._conn.output$.next(JSON.stringify(headers) + '\n' + JSON.stringify(msg));
return;
}

const content = JSON.stringify(msg);
TerminalTransmittedBytesTotal.add(content.length, {
terminal_id: this.terminal_id,
tunnel: 'WS',
});

this._conn.output$.next(content);
};

private _setupTunnel() {
this._subscriptions.push(
from(this._conn.input$)
.pipe(
map((msg) => msg.toString()),
tap((msg) => {
TerminalReceivedBytesTotal.add(msg.length, {
terminal_id: this.terminal_id,
tunnel: 'WS',
});
}),
map((msg): ITerminalMessage => JSON.parse(msg)),
map((msg): ITerminalMessage => this._parseMsgFromWs(msg)),
)
.subscribe((msg) => {
if (msg.method) {
Expand Down Expand Up @@ -285,25 +317,13 @@ export class Terminal {
} catch (err) {
console.error(formatTime(Date.now()), 'Terminal', 'WebRTC', 'send', 'error', err);
// fall back to WS
const content = JSON.stringify(msg);
TerminalTransmittedBytesTotal.add(content.length, {
terminal_id: this.terminal_id,
tunnel: 'WS',
});

this._conn.output$.next(content);
this._sendMsgByWs(msg);
}
});
return;
}

const content = JSON.stringify(msg);
TerminalTransmittedBytesTotal.add(content.length, {
terminal_id: this.terminal_id,
tunnel: 'WS',
});

this._conn.output$.next(content);
this._sendMsgByWs(msg);
}),
);
}
Expand Down

0 comments on commit c7a0ca7

Please sign in to comment.