diff --git a/apps/host/src/index.ts b/apps/host/src/index.ts index ea0c37091..e1fe4915c 100644 --- a/apps/host/src/index.ts +++ b/apps/host/src/index.ts @@ -26,6 +26,7 @@ import { import WebSocket from 'ws'; const mapTerminalIdToSocket: Record = {}; +const mapTerminalIdToHasHeader: Record = {}; const server = createServer(); @@ -72,8 +73,9 @@ server.on('upgrade', (request, socket, head) => { const params = url.searchParams; const host_token = params.get('host_token'); const terminal_id = params.get('terminal_id'); + const has_header = params.get('has_header') === 'true'; if (!((!process.env.HOST_TOKEN || host_token === process.env.HOST_TOKEN) && terminal_id)) { - console.info(formatTime(Date.now()), 'Auth Failed', { terminal_id, host_token }); + console.info(formatTime(Date.now()), 'Auth Failed', url.toString()); socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); socket.destroy(); return; @@ -86,11 +88,38 @@ server.on('upgrade', (request, socket, head) => { oldTerminal.close(); } mapTerminalIdToSocket[terminal_id] = ws; // Register New Terminal + mapTerminalIdToHasHeader[terminal_id] = has_header; // Forward Terminal Messages (fromEvent(ws, 'message') as Observable).subscribe((origin) => { - const msg = JSON.parse(origin.data.toString()); - if (!terminalInfos.has(msg.target_terminal_id)) return; // Skip if Terminal Not Found - mapTerminalIdToSocket[msg.target_terminal_id]?.send(origin.data); + const raw_message = origin.data.toString(); + if (has_header) { + const idx = raw_message.indexOf('\n'); + const raw_headers = raw_message.slice(0, idx + 1); + const headers = JSON.parse(raw_headers); + const target_terminal_id = headers.target_terminal_id; + if (!target_terminal_id) return; // Skip if target_terminal_id not defined + if (!terminalInfos.has(target_terminal_id)) return; // Skip if Terminal Not Found + if (mapTerminalIdToHasHeader[target_terminal_id]) { + // if target terminal supports header, forward the message as is + mapTerminalIdToSocket[target_terminal_id]?.send(raw_message); + } else { + // if target terminal does not support header, strip the header and forward the message + mapTerminalIdToSocket[target_terminal_id]?.send(raw_message.slice(idx + 1)); + } + } else { + // message without headers + const msg = JSON.parse(raw_message); + const target_terminal_id = msg.target_terminal_id; + if (!terminalInfos.has(target_terminal_id)) return; // Skip if Terminal Not Found + if (mapTerminalIdToHasHeader[target_terminal_id]) { + const headers = { target_terminal_id, source_terminal_id: msg.source_terminal_id }; + // if target terminal supports headers, wrap the message with header and forward + mapTerminalIdToSocket[target_terminal_id]?.send(JSON.stringify(headers) + '\n' + raw_message); + } else { + // if target terminal does not support headers, forward the message as is + mapTerminalIdToSocket[target_terminal_id]?.send(origin.data); + } + } }); // Clean up on Terminal Disconnect fromEvent(ws, 'close').subscribe(() => { @@ -98,13 +127,14 @@ server.on('upgrade', (request, socket, head) => { terminalInfos.delete(terminal_id); mapTerminalIdToSocket[terminal_id]?.terminate(); delete mapTerminalIdToSocket[terminal_id]; + delete mapTerminalIdToHasHeader[terminal_id]; }); }); }); server.listen(8888); -const HOST_URL = `ws://localhost:8888?host_token=${process.env.HOST_TOKEN}`; +const HOST_URL = `ws://localhost:8888?has_header=true&host_token=${process.env.HOST_TOKEN}`; const terminal = new Terminal(HOST_URL, { terminal_id: '@host', name: 'Host Terminal', @@ -139,10 +169,11 @@ defer(() => terminalInfos.keys()) retry(3), tap({ error: (err) => { - console.info(formatTime(Date.now()), 'Terminal ping failed', target_terminal_id, err); + console.info(formatTime(Date.now()), 'Terminal ping failed', target_terminal_id, `${err}`); terminalInfos.delete(target_terminal_id); mapTerminalIdToSocket[target_terminal_id]?.terminate(); delete mapTerminalIdToSocket[target_terminal_id]; + delete mapTerminalIdToHasHeader[target_terminal_id]; }, }), catchError(() => EMPTY), diff --git a/common/changes/@yuants/app-host/2024-11-26-22-06.json b/common/changes/@yuants/app-host/2024-11-26-22-06.json new file mode 100644 index 000000000..ac91d8aa4 --- /dev/null +++ b/common/changes/@yuants/app-host/2024-11-26-22-06.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/app-host", + "comment": "support has_header", + "type": "minor" + } + ], + "packageName": "@yuants/app-host" +} \ No newline at end of file diff --git a/common/changes/@yuants/protocol/2024-11-26-22-06.json b/common/changes/@yuants/protocol/2024-11-26-22-06.json new file mode 100644 index 000000000..87e3c0f74 --- /dev/null +++ b/common/changes/@yuants/protocol/2024-11-26-22-06.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/protocol", + "comment": "support has_header", + "type": "minor" + } + ], + "packageName": "@yuants/protocol" +} \ No newline at end of file diff --git a/libraries/protocol/src/terminal.ts b/libraries/protocol/src/terminal.ts index dfced3b42..3b87c092e 100644 --- a/libraries/protocol/src/terminal.ts +++ b/libraries/protocol/src/terminal.ts @@ -136,6 +136,10 @@ export class Terminal { terminal_id: string; private _serviceHandlers: Record = {}; private _serviceOptions: Record = {}; + /** + * Terminal Message Header Flag + */ + private has_header: boolean; private _terminalInfoUpdated$ = new Subject(); @@ -151,6 +155,7 @@ export class Terminal { const url = new URL(host_url); url.searchParams.set('terminal_id', this.terminal_id); // make sure terminal_id is in the connection parameters this.host_url = url.toString(); + this.has_header = url.searchParams.get('has_header') === 'true'; this._conn = connection || createConnectionWs(this.host_url); this._setupTunnel(); @@ -171,18 +176,45 @@ export class Terminal { { session_id: string; maxMessageSize: number; peer: SimplePeer.Instance } | undefined > = {}; + private _parseMsgFromWs = (msg: string): ITerminalMessage => { + // + TerminalReceivedBytesTotal.add(msg.length, { + terminal_id: this.terminal_id, + tunnel: 'WS', + }); + + if (this.has_header) { + return JSON.parse(msg.slice(msg.indexOf('\n') + 1)); + } + return JSON.parse(msg); + }; + + private _sendMsgByWs = (msg: ITerminalMessage): void => { + // + if (this.has_header) { + const headers = { + target_terminal_id: msg.target_terminal_id, + source_terminal_id: msg.source_terminal_id, + }; + this._conn.output$.next(JSON.stringify(headers) + '\n' + JSON.stringify(msg)); + return; + } + + const content = JSON.stringify(msg); + TerminalTransmittedBytesTotal.add(content.length, { + terminal_id: this.terminal_id, + tunnel: 'WS', + }); + + this._conn.output$.next(content); + }; + private _setupTunnel() { this._subscriptions.push( from(this._conn.input$) .pipe( map((msg) => msg.toString()), - tap((msg) => { - TerminalReceivedBytesTotal.add(msg.length, { - terminal_id: this.terminal_id, - tunnel: 'WS', - }); - }), - map((msg): ITerminalMessage => JSON.parse(msg)), + map((msg): ITerminalMessage => this._parseMsgFromWs(msg)), ) .subscribe((msg) => { if (msg.method) { @@ -285,25 +317,13 @@ export class Terminal { } catch (err) { console.error(formatTime(Date.now()), 'Terminal', 'WebRTC', 'send', 'error', err); // fall back to WS - const content = JSON.stringify(msg); - TerminalTransmittedBytesTotal.add(content.length, { - terminal_id: this.terminal_id, - tunnel: 'WS', - }); - - this._conn.output$.next(content); + this._sendMsgByWs(msg); } }); return; } - const content = JSON.stringify(msg); - TerminalTransmittedBytesTotal.add(content.length, { - terminal_id: this.terminal_id, - tunnel: 'WS', - }); - - this._conn.output$.next(content); + this._sendMsgByWs(msg); }), ); }