From 0ec8075c1ef60cdbb0b1d4cdad40cbb44e21661b Mon Sep 17 00:00:00 2001 From: Tobias Ortmayr Date: Wed, 25 May 2022 02:28:25 -0700 Subject: [PATCH] Integrate new message-rpc prototype into core messaging API (extensions) (#11011) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactors and improves the prototype of a faster JSON-RPC protocol initially contributed by @tsmaeder (See also https://github.com/eclipse-theia/theia/pull/10781). The encoding approach used in the initial POC has some performance drawbacks when encoding plain JSON objects. We refactored the protocol to improve the performance for JSON objects whilst maintaining the excellent performance for encoding objects that contain binary data. Integrates the new message-rpc prototype into the core messaging API (replacing vscode-ws-jsonrpc). This has major impacts on the Messaging API as we no longer expose a `Connection` object (which was provided by vscode-ws-jsonrpc) and directly rely on a generic transport `Channel` implementation instead. - Introduce `Channel` as the main transport concept for messages (instead of the dedicated `Connection` from vscode-jsonrpc) - Replace usage of `vscode-ws-jsonrpc` with a custom binary RPC protocol. - Remove 'vscode-ws-jsonrpc' depdency from "@theia/core/shared". - Refactor all connection providers to use the new binary protocol. - Ensure that the `RemoteFileSystemProvider` API uses `Uint8Arrays` over plain number arrays. This enables direct serialization as buffers and reduces the overhead of unnecessarily converting from and to `Uint8Arrays`. - Refactor terminal widget and terminal backend contribution so that the widgets communicates with the underlying terminal process using the new rpc protocol. - Rework the IPC bootstrap protocol so that it uses a binary pipe for message transmission instead of the `ipc` pipe which only supports string encoding. - Extend the `JsonRpcProxyFactory` with an optional `RpcConnectionFactory` that enables adopter to creates proxies with a that use a custom `RpcProtocol`/`RpcConnection`. The plugin API still uses its own RPC protocol implementation. This will be addressed in a follow-up PR. (See https://github.com/eclipse-theia/theia/pull/11093) Contributed on behalf of STMicroelectronics. Closes #10684 Co-authored-by: Thomas Mäder --- CHANGELOG.md | 11 + package.json | 4 + packages/core/README.md | 1 - packages/core/package.json | 4 +- .../core/shared/vscode-ws-jsonrpc/index.d.ts | 1 - .../core/shared/vscode-ws-jsonrpc/index.js | 1 - .../messaging/ws-connection-provider.ts | 53 +- .../src/browser/progress-status-bar-item.ts | 2 +- packages/core/src/common/cancellation.ts | 8 + packages/core/src/common/index.ts | 1 + .../src/common/message-rpc/channel.spec.ts | 88 ++++ .../core/src/common/message-rpc/channel.ts | 248 +++++++++ .../logger.ts => common/message-rpc/index.ts} | 27 +- .../src/common/message-rpc/message-buffer.ts | 99 ++++ .../message-rpc/rpc-message-encoder.spec.ts | 42 ++ .../common/message-rpc/rpc-message-encoder.ts | 497 ++++++++++++++++++ .../src/common/message-rpc/rpc-protocol.ts | 212 ++++++++ .../uint8-array-message-buffer.spec.ts | 41 ++ .../message-rpc/uint8-array-message-buffer.ts | 206 ++++++++ .../messaging/abstract-connection-provider.ts | 61 +-- .../messaging/connection-error-handler.ts | 3 +- packages/core/src/common/messaging/handler.ts | 4 +- .../common/messaging/proxy-factory.spec.ts | 21 +- .../src/common/messaging/proxy-factory.ts | 37 +- .../common/messaging/web-socket-channel.ts | 215 +++----- .../electron-ipc-connection-provider.ts | 32 +- .../electron-ws-connection-provider.ts | 13 +- .../electron-messaging-contribution.ts | 152 +++--- .../messaging/electron-messaging-service.ts | 10 +- .../src/node/messaging/binary-message-pipe.ts | 168 ++++++ .../core/src/node/messaging/ipc-bootstrap.ts | 14 +- .../core/src/node/messaging/ipc-channel.ts | 98 ++++ .../node/messaging/ipc-connection-provider.ts | 53 +- .../core/src/node/messaging/ipc-protocol.ts | 4 +- .../node/messaging/messaging-contribution.ts | 103 +--- .../src/node/messaging/messaging-service.ts | 35 +- .../messaging/test/test-web-socket-channel.ts | 43 +- .../src/browser/debug-session-connection.ts | 12 +- .../src/browser/debug-session-contribution.ts | 3 +- .../debug/src/node/debug-adapter-session.ts | 6 +- packages/debug/src/node/debug-model.ts | 3 +- packages/filesystem/src/common/files.ts | 2 +- .../src/common/remote-file-system-provider.ts | 37 +- packages/plugin-ext/src/common/connection.ts | 44 +- .../debug/plugin-debug-session-factory.ts | 2 +- .../debug/plugin-debug-adapter-session.ts | 2 +- .../task/src/node/task-server.slow-spec.ts | 53 +- .../src/browser/terminal-widget-impl.ts | 26 +- ...terminal-backend-contribution.slow-spec.ts | 18 +- .../src/node/terminal-backend-contribution.ts | 27 +- yarn.lock | 38 +- 51 files changed, 2254 insertions(+), 631 deletions(-) delete mode 100644 packages/core/shared/vscode-ws-jsonrpc/index.d.ts delete mode 100644 packages/core/shared/vscode-ws-jsonrpc/index.js create mode 100644 packages/core/src/common/message-rpc/channel.spec.ts create mode 100644 packages/core/src/common/message-rpc/channel.ts rename packages/core/src/{node/messaging/logger.ts => common/message-rpc/index.ts} (64%) create mode 100644 packages/core/src/common/message-rpc/message-buffer.ts create mode 100644 packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts create mode 100644 packages/core/src/common/message-rpc/rpc-message-encoder.ts create mode 100644 packages/core/src/common/message-rpc/rpc-protocol.ts create mode 100644 packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts create mode 100644 packages/core/src/common/message-rpc/uint8-array-message-buffer.ts create mode 100644 packages/core/src/node/messaging/binary-message-pipe.ts create mode 100644 packages/core/src/node/messaging/ipc-channel.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 984069a2812d2..34350df50ffd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,17 @@ - [callhierarchy] `paths.ts` and `glob.ts` moved to `core/src/common`; `language-selector.ts` moved to `editor/src/common`. Any imports will need to be updated. - [electron] removed redundant config option `disallowReloadKeybinding` from `dev-packages/application-package/src/application-props.ts` file and corresponding test [#11099](https://github.com/eclipse-theia/theia/pull/11099) +[Breaking Changes:](#breaking_changes_1.26.0) + +- [core] Refactored the core messaging API. Replaced `vscode-ws-jsonrpc` with a custom RPC protocol that is better suited for handling binary data and enables message tunneling. + This impacts all main concepts of the messaging API. The API no longer exposes a `Connection` object and uses a generic `Channel` implementation instead. + - Replaces usage of `vscode-json-rpc`'s `Connection` with the new generic `Channel`. Affects `AbstractConnectionProvider`, `MessagingService`, `IPCConnectionProvider`, `ElectronMessagingService` + - `MessagingService`: No longer offers the `listen` and `forward` method. Use `wsChannel` instead. + - `RemoteFileSystemServer`: Use `UInt8Array` instead of plain number arrays for all arguments and return type that store binary data + - `DebugAdapter`: Replaced the debug-service internal `Channel` implementation with the newly introduced generic `Channel`. + [#11011](https://github.com/eclipse-theia/theia/pull/11011) - Contributed on behalf of STMicroelectronics. + + ## v1.25.0 - 4/28/2022 [1.25.0 Milestone](https://github.com/eclipse-theia/theia/milestone/35) diff --git a/package.json b/package.json index 950701b6136a3..6b294c3fed0c2 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,8 @@ "**/@types/node": "12" }, "devDependencies": { + "@types/chai": "4.3.0", + "@types/chai-spies": "1.0.3", "@types/chai-string": "^1.4.0", "@types/jsdom": "^11.0.4", "@types/node": "12", @@ -20,6 +22,8 @@ "@typescript-eslint/eslint-plugin": "^4.8.1", "@typescript-eslint/eslint-plugin-tslint": "^4.8.1", "@typescript-eslint/parser": "^4.8.1", + "chai": "4.3.4", + "chai-spies": "1.0.0", "chai-string": "^1.4.0", "chalk": "4.0.0", "concurrently": "^3.5.0", diff --git a/packages/core/README.md b/packages/core/README.md index 64537785cf2f5..9ddef5f7b5b30 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -96,7 +96,6 @@ export class SomeClass { - `react-virtualized` (from [`react-virtualized@^9.20.0`](https://www.npmjs.com/package/react-virtualized)) - `vscode-languageserver-protocol` (from [`vscode-languageserver-protocol@~3.15.3`](https://www.npmjs.com/package/vscode-languageserver-protocol)) - `vscode-uri` (from [`vscode-uri@^2.1.1`](https://www.npmjs.com/package/vscode-uri)) - - `vscode-ws-jsonrpc` (from [`vscode-ws-jsonrpc@^0.2.0`](https://www.npmjs.com/package/vscode-ws-jsonrpc)) - `dompurify` (from [`dompurify@^2.2.9`](https://www.npmjs.com/package/dompurify)) - `express` (from [`express@^4.16.3`](https://www.npmjs.com/package/express)) - `lodash.debounce` (from [`lodash.debounce@^4.0.8`](https://www.npmjs.com/package/lodash.debounce)) diff --git a/packages/core/package.json b/packages/core/package.json index 733a8b75400f9..877fcf3a10812 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -67,7 +67,6 @@ "uuid": "^8.3.2", "vscode-languageserver-protocol": "~3.15.3", "vscode-uri": "^2.1.1", - "vscode-ws-jsonrpc": "^0.2.0", "ws": "^7.1.2", "yargs": "^15.3.1" }, @@ -107,8 +106,7 @@ "react-dom", "react-virtualized", "vscode-languageserver-protocol", - "vscode-uri", - "vscode-ws-jsonrpc" + "vscode-uri" ], "export =": [ "dompurify as DOMPurify", diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.d.ts b/packages/core/shared/vscode-ws-jsonrpc/index.d.ts deleted file mode 100644 index b11ff897103ed..0000000000000 --- a/packages/core/shared/vscode-ws-jsonrpc/index.d.ts +++ /dev/null @@ -1 +0,0 @@ -export * from 'vscode-ws-jsonrpc'; diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.js b/packages/core/shared/vscode-ws-jsonrpc/index.js deleted file mode 100644 index 3aed560b82d62..0000000000000 --- a/packages/core/shared/vscode-ws-jsonrpc/index.js +++ /dev/null @@ -1 +0,0 @@ -module.exports = require('vscode-ws-jsonrpc'); diff --git a/packages/core/src/browser/messaging/ws-connection-provider.ts b/packages/core/src/browser/messaging/ws-connection-provider.ts index f83aabda22826..905515a70392c 100644 --- a/packages/core/src/browser/messaging/ws-connection-provider.ts +++ b/packages/core/src/browser/messaging/ws-connection-provider.ts @@ -15,11 +15,11 @@ // ***************************************************************************** import { injectable, interfaces, decorate, unmanaged } from 'inversify'; -import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../../common'; import { Endpoint } from '../endpoint'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { io, Socket } from 'socket.io-client'; +import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; decorate(injectable(), JsonRpcProxyFactory); decorate(unmanaged(), JsonRpcProxyFactory, 0); @@ -35,6 +35,8 @@ export interface WebSocketOptions { export class WebSocketConnectionProvider extends AbstractConnectionProvider { protected readonly onSocketDidOpenEmitter: Emitter = new Emitter(); + // Socket that is used by the main channel + protected socket: Socket; get onSocketDidOpen(): Event { return this.onSocketDidOpenEmitter.event; } @@ -48,31 +50,39 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider(path, arg); } - protected readonly socket: Socket; - - constructor() { - super(); + protected createMainChannel(): Channel { const url = this.createWebSocketUrl(WebSocketChannel.wsPath); const socket = this.createWebSocket(url); + const channel = new WebSocketChannel(this.toIWebSocket(socket)); socket.on('connect', () => { this.fireSocketDidOpen(); }); - socket.on('disconnect', reason => { - for (const channel of [...this.channels.values()]) { - channel.close(undefined, reason); - } - this.fireSocketDidClose(); - }); - socket.on('message', data => { - this.handleIncomingRawMessage(data); - }); + channel.onClose(() => this.fireSocketDidClose()); socket.connect(); this.socket = socket; + + return channel; + } + + protected toIWebSocket(socket: Socket): IWebSocket { + return { + close: () => { + socket.removeAllListeners('disconnect'); + socket.removeAllListeners('error'); + socket.removeAllListeners('message'); + socket.close(); + }, + isConnected: () => socket.connected, + onClose: cb => socket.on('disconnect', reason => cb(reason)), + onError: cb => socket.on('error', reason => cb(reason)), + onMessage: cb => socket.on('message', data => cb(data)), + send: message => socket.emit('message', message) + }; } - override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void { + override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { if (this.socket.connected) { - super.openChannel(path, handler, options); + return super.openChannel(path, handler, options); } else { const openChannel = () => { this.socket.off('connect', openChannel); @@ -82,14 +92,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider { - if (this.socket.connected) { - this.socket.send(content); - } - }); - } - /** * @param path The handler to reach in the backend. */ @@ -143,3 +145,4 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => { + const leftWrite = new Uint8ArrayWriteBuffer(); + leftWrite.onCommit(buffer => { + this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); + }); + return leftWrite; + }); + readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => { + const rightWrite = new Uint8ArrayWriteBuffer(); + rightWrite.onCommit(buffer => { + this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); + }); + return rightWrite; + }); +} +describe('Message Channel', () => { + describe('Channel multiplexer', () => { + it('should forward messages to intended target channel', async () => { + const pipe = new ChannelPipe(); + + const leftMultiplexer = new ChannelMultiplexer(pipe.left); + const rightMultiplexer = new ChannelMultiplexer(pipe.right); + const openChannelSpy = spy(() => { + }); + + rightMultiplexer.onDidOpenChannel(openChannelSpy); + leftMultiplexer.onDidOpenChannel(openChannelSpy); + + const leftFirst = await leftMultiplexer.open('first'); + const leftSecond = await leftMultiplexer.open('second'); + + const rightFirst = rightMultiplexer.getOpenChannel('first'); + const rightSecond = rightMultiplexer.getOpenChannel('second'); + + assert.isNotNull(rightFirst); + assert.isNotNull(rightSecond); + + const leftSecondSpy = spy((buf: MessageProvider) => { + const message = buf().readString(); + expect(message).equal('message for second'); + }); + + leftSecond.onMessage(leftSecondSpy); + + const rightFirstSpy = spy((buf: MessageProvider) => { + const message = buf().readString(); + expect(message).equal('message for first'); + }); + + rightFirst!.onMessage(rightFirstSpy); + + leftFirst.getWriteBuffer().writeString('message for first').commit(); + rightSecond!.getWriteBuffer().writeString('message for second').commit(); + + expect(leftSecondSpy).to.be.called(); + expect(rightFirstSpy).to.be.called(); + + expect(openChannelSpy).to.be.called.exactly(4); + }); + }); +}); diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts new file mode 100644 index 0000000000000..e4279b43d5173 --- /dev/null +++ b/packages/core/src/common/message-rpc/channel.ts @@ -0,0 +1,248 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { Emitter, Event } from '../event'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * A channel is a bidirectional communications channel with lifecycle and + * error signalling. Note that creation of channels is specific to particular + * implementations and thus not part of the protocol. + */ +export interface Channel { + + /** + * The remote side has closed the channel + */ + onClose: Event; + + /** + * An error has occurred while writing to or reading from the channel + */ + onError: Event; + + /** + * A message has arrived and can be read by listeners using a {@link MessageProvider}. + */ + onMessage: Event; + + /** + * Obtain a {@link WriteBuffer} to write a message to the channel. + */ + getWriteBuffer(): WriteBuffer; + + /** + * Close this channel. No {@link onClose} event should be sent + */ + close(): void; +} + +/** + * The event that is emitted when a channel is closed from the remote side. + */ +export interface ChannelCloseEvent { + reason: string, + code?: number +}; + +/** + * The `MessageProvider` is emitted when a channel receives a new message. + * Listeners can invoke the provider to obtain a new {@link ReadBuffer} for the received message. + * This ensures that each listener has its own isolated {@link ReadBuffer} instance. + * + */ +export type MessageProvider = () => ReadBuffer; + +/** + * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to + * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}. + */ +export class ForwardingChannel implements Channel { + + constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) { + } + + onCloseEmitter: Emitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; + }; + + onErrorEmitter: Emitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; + }; + + onMessageEmitter: Emitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; + }; + + getWriteBuffer(): WriteBuffer { + return this.writeBufferSource(); + } + + send(message: Uint8Array): void { + const writeBuffer = this.getWriteBuffer(); + writeBuffer.writeBytes(message); + writeBuffer.commit(); + } + + close(): void { + this.closeHandler(); + } +} + +/** + * The different message types used in the messaging protocol of the {@link ChannelMultiplexer} + */ +export enum MessageTypes { + Open = 1, + Close = 2, + AckOpen = 3, + Data = 4 +} + +/** + * The write buffers in this implementation immediately write to the underlying + * channel, so we rely on writers to the multiplexed channels to always commit their + * messages and always in one go. + */ +export class ChannelMultiplexer { + protected pendingOpen: Map void> = new Map(); + protected openChannels: Map = new Map(); + + protected readonly onOpenChannelEmitter = new Emitter<{ id: string, channel: Channel }>(); + get onDidOpenChannel(): Event<{ id: string, channel: Channel }> { + return this.onOpenChannelEmitter.event; + } + + constructor(protected readonly underlyingChannel: Channel) { + this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer())); + this.underlyingChannel.onClose(event => this.closeUnderlyingChannel(event)); + this.underlyingChannel.onError(error => this.handleError(error)); + } + + protected handleError(error: unknown): void { + this.openChannels.forEach(channel => { + channel.onErrorEmitter.fire(error); + }); + } + + closeUnderlyingChannel(event?: ChannelCloseEvent): void { + + this.pendingOpen.clear(); + this.openChannels.forEach(channel => { + channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' }); + }); + + this.openChannels.clear(); + } + + protected handleMessage(buffer: ReadBuffer): void { + const type = buffer.readUint8(); + const id = buffer.readString(); + switch (type) { + case MessageTypes.AckOpen: { + return this.handleAckOpen(id); + } + case MessageTypes.Open: { + return this.handleOpen(id); + } + case MessageTypes.Close: { + return this.handleClose(id); + } + case MessageTypes.Data: { + return this.handleData(id, buffer.sliceAtReadPosition()); + } + } + } + + protected handleAckOpen(id: string): void { + // edge case: both side try to open a channel at the same time. + const resolve = this.pendingOpen.get(id); + if (resolve) { + const channel = this.createChannel(id); + this.pendingOpen.delete(id); + this.openChannels.set(id, channel); + resolve!(channel); + this.onOpenChannelEmitter.fire({ id, channel }); + } + } + + protected handleOpen(id: string): void { + if (!this.openChannels.has(id)) { + const channel = this.createChannel(id); + this.openChannels.set(id, channel); + const resolve = this.pendingOpen.get(id); + if (resolve) { + // edge case: both side try to open a channel at the same time. + resolve(channel); + } + this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit(); + this.onOpenChannelEmitter.fire({ id, channel }); + } + } + + protected handleClose(id: string): void { + const channel = this.openChannels.get(id); + if (channel) { + channel.onCloseEmitter.fire({ reason: 'Channel has been closed from the remote side' }); + this.openChannels.delete(id); + } + } + + protected handleData(id: string, data: ReadBuffer): void { + const channel = this.openChannels.get(id); + if (channel) { + channel.onMessageEmitter.fire(() => data); + } + } + + protected createChannel(id: string): ForwardingChannel { + return new ForwardingChannel(id, () => this.closeChannel(id), () => this.prepareWriteBuffer(id)); + } + + // Prepare the write buffer for the channel with the give, id. The channel id has to be encoded + // and written to the buffer before the actual message. + protected prepareWriteBuffer(id: string): WriteBuffer { + const underlying = this.underlyingChannel.getWriteBuffer(); + underlying.writeUint8(MessageTypes.Data); + underlying.writeString(id); + return underlying; + } + + protected closeChannel(id: string): void { + this.underlyingChannel.getWriteBuffer() + .writeUint8(MessageTypes.Close) + .writeString(id) + .commit(); + + this.openChannels.delete(id); + } + + open(id: string): Promise { + const result = new Promise((resolve, reject) => { + this.pendingOpen.set(id, resolve); + }); + this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit(); + return result; + } + + getOpenChannel(id: string): Channel | undefined { + return this.openChannels.get(id); + } +} + diff --git a/packages/core/src/node/messaging/logger.ts b/packages/core/src/common/message-rpc/index.ts similarity index 64% rename from packages/core/src/node/messaging/logger.ts rename to packages/core/src/common/message-rpc/index.ts index 45229f3f47423..671540a31c42b 100644 --- a/packages/core/src/node/messaging/logger.ts +++ b/packages/core/src/common/message-rpc/index.ts @@ -1,5 +1,5 @@ // ***************************************************************************** -// Copyright (C) 2017 TypeFox and others. +// Copyright (C) 2022 STMicroelectronics and others. // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License v. 2.0 which is available at @@ -13,25 +13,6 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** - -import { Logger } from 'vscode-ws-jsonrpc'; - -export class ConsoleLogger implements Logger { - - error(message: string): void { - console.log(message); - } - - warn(message: string): void { - console.log(message); - } - - info(message: string): void { - console.log(message); - } - - log(message: string): void { - console.log(message); - } - -} +export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol'; +export { Channel, ChannelCloseEvent, MessageProvider } from './channel'; +export { ReadBuffer, WriteBuffer } from './message-buffer'; diff --git a/packages/core/src/common/message-rpc/message-buffer.ts b/packages/core/src/common/message-rpc/message-buffer.ts new file mode 100644 index 0000000000000..396ba95d93d73 --- /dev/null +++ b/packages/core/src/common/message-rpc/message-buffer.ts @@ -0,0 +1,99 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +/** + * A buffer maintaining a write position capable of writing primitive values + */ +export interface WriteBuffer { + writeUint8(byte: number): this + writeUint16(value: number): this + writeUint32(value: number): this + writeString(value: string): this + writeBytes(value: Uint8Array): this + writeNumber(value: number): this + writeLength(value: number): this + /** + * Makes any writes to the buffer permanent, for example by sending the writes over a channel. + * You must obtain a new write buffer after committing + */ + commit(): void; +} + +export class ForwardingWriteBuffer implements WriteBuffer { + constructor(protected readonly underlying: WriteBuffer) { + } + + writeUint8(byte: number): this { + this.underlying.writeUint8(byte); + return this; + } + + writeUint16(value: number): this { + this.underlying.writeUint16(value); + return this; + } + + writeUint32(value: number): this { + this.underlying.writeUint32(value); + return this; + } + + writeLength(value: number): this { + this.underlying.writeLength(value); + return this; + } + + writeString(value: string): this { + this.underlying.writeString(value); + return this; + } + + writeBytes(value: Uint8Array): this { + this.underlying.writeBytes(value); + return this; + } + + writeNumber(value: number): this { + this.underlying.writeNumber(value); + return this; + } + + commit(): void { + this.underlying.commit(); + } +} + +/** + * A buffer maintaining a read position in a buffer containing a received message capable of + * reading primitive values. + */ +export interface ReadBuffer { + readUint8(): number; + readUint16(): number; + readUint32(): number; + readString(): string; + readNumber(): number, + readLength(): number, + readBytes(): Uint8Array; + + /** + * Returns a new read buffer whose starting read position is the current read position of this buffer. + * This is useful to create read buffers sub messages. + * (e.g. when using a multiplexer the beginning of the message might contain some protocol overhead which should not be part + * of the message reader that is sent to the target channel) + */ + sliceAtReadPosition(): ReadBuffer +} diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts new file mode 100644 index 0000000000000..01a8aa2099ffb --- /dev/null +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts @@ -0,0 +1,42 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { expect } from 'chai'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; +import { RpcMessageDecoder, RpcMessageEncoder } from './rpc-message-encoder'; + +describe('PPC Message Codex', () => { + describe('RPC Message Encoder & Decoder', () => { + it('should encode object into binary message and decode the message back into the original object', () => { + const buffer = new Uint8Array(1024); + const writer = new Uint8ArrayWriteBuffer(buffer); + + const encoder = new RpcMessageEncoder(); + const jsonMangled = JSON.parse(JSON.stringify(encoder)); + + encoder.writeTypedValue(writer, encoder); + + const written = writer.getCurrentContents(); + + const reader = new Uint8ArrayReadBuffer(written); + + const decoder = new RpcMessageDecoder(); + const decoded = decoder.readTypedValue(reader); + + expect(decoded).deep.equal(jsonMangled); + }); + }); +}); diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts new file mode 100644 index 0000000000000..8b0c237187747 --- /dev/null +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts @@ -0,0 +1,497 @@ +// ***************************************************************************** +// Copyright (C) 2022 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * This code lets you encode rpc protocol messages (request/reply/notification/error/cancel) + * into a channel write buffer and decode the same messages from a read buffer. + * Custom encoders/decoders can be registered to specially handling certain types of values + * to be encoded. Clients are responsible for ensuring that the set of tags for encoders + * is distinct and the same at both ends of a channel. + */ + +export type RpcMessage = RequestMessage | ReplyMessage | ReplyErrMessage | CancelMessage | NotificationMessage; + +export const enum RpcMessageType { + Request = 1, + Notification = 2, + Reply = 3, + ReplyErr = 4, + Cancel = 5, +} + +export interface CancelMessage { + type: RpcMessageType.Cancel; + id: number; +} + +export interface RequestMessage { + type: RpcMessageType.Request; + id: number; + method: string; + args: any[]; +} + +export interface NotificationMessage { + type: RpcMessageType.Notification; + id: number; + method: string; + args: any[]; +} + +export interface ReplyMessage { + type: RpcMessageType.Reply; + id: number; + res: any; +} + +export interface ReplyErrMessage { + type: RpcMessageType.ReplyErr; + id: number; + err: any; +} + +export interface SerializedError { + readonly $isError: true; + readonly name: string; + readonly message: string; + readonly stack: string; +} + +/** + * A special error that can be returned in case a request + * has failed. Provides additional information i.e. an error code + * and additional error data. + */ +export class ResponseError extends Error { + constructor(readonly code: number, message: string, readonly data: any) { + super(message); + } +} + +/** + * The tag values for the default {@link ValueEncoder}s & {@link ValueDecoder}s + */ + +export enum ObjectType { + JSON = 0, + ByteArray = 1, + ObjectArray = 2, + Undefined = 3, + Object = 4, + String = 5, + Boolean = 6, + Number = 7, + // eslint-disable-next-line @typescript-eslint/no-shadow + ResponseError = 8, + Error = 9 + +} + +/** + * A value encoder writes javascript values to a write buffer. Encoders will be asked + * in turn (ordered by their tag value, descending) whether they can encode a given value + * This means encoders with higher tag values have priority. Since the default encoders + * have tag values from 1-7, they can be easily overridden. + */ +export interface ValueEncoder { + /** + * Returns true if this encoder wants to encode this value. + * @param value the value to be encoded + */ + is(value: any): boolean; + /** + * Write the given value to the buffer. Will only be called if {@link is(value)} returns true. + * @param buf The buffer to write to + * @param value The value to be written + * @param recursiveEncode A function that will use the encoders registered on the {@link MessageEncoder} + * to write a value to the underlying buffer. This is used mostly to write structures like an array + * without having to know how to encode the values in the array + */ + write(buf: WriteBuffer, value: any, recursiveEncode?: (buf: WriteBuffer, value: any) => void): void; +} + +/** + * Reads javascript values from a read buffer + */ +export interface ValueDecoder { + /** + * Reads a value from a read buffer. This method will be called for the decoder that is + * registered for the tag associated with the value encoder that encoded this value. + * @param buf The read buffer to read from + * @param recursiveDecode A function that will use the decoders registered on the {@link RpcMessageDecoder} + * to read values from the underlying read buffer. This is used mostly to decode structures like an array + * without having to know how to decode the values in the array. + */ + read(buf: ReadBuffer, recursiveDecode: (buf: ReadBuffer) => unknown): unknown; +} + +/** + * A `RpcMessageDecoder` parses a a binary message received via {@link ReadBuffer} into a {@link RpcMessage} + */ +export class RpcMessageDecoder { + + protected decoders: Map = new Map(); + + constructor() { + this.registerDecoder(ObjectType.JSON, { + read: buf => { + const json = buf.readString(); + return JSON.parse(json); + } + }); + this.registerDecoder(ObjectType.Error, { + read: buf => { + const serializedError: SerializedError = JSON.parse(buf.readString()); + const error = new Error(serializedError.message); + Object.assign(error, serializedError); + return error; + } + }); + + this.registerDecoder(ObjectType.ResponseError, { + read: buf => { + const error = JSON.parse(buf.readString()); + return new ResponseError(error.code, error.message, error.data); + } + }); + this.registerDecoder(ObjectType.ByteArray, { + read: buf => buf.readBytes() + }); + + this.registerDecoder(ObjectType.ObjectArray, { + read: buf => this.readArray(buf) + }); + + this.registerDecoder(ObjectType.Undefined, { + read: () => undefined + }); + + this.registerDecoder(ObjectType.Object, { + read: (buf, recursiveRead) => { + const propertyCount = buf.readLength(); + const result = Object.create({}); + for (let i = 0; i < propertyCount; i++) { + const key = buf.readString(); + const value = recursiveRead(buf); + result[key] = value; + } + return result; + } + }); + + this.registerDecoder(ObjectType.String, { + read: (buf, recursiveRead) => buf.readString() + }); + + this.registerDecoder(ObjectType.Boolean, { + read: buf => buf.readUint8() === 1 + }); + + this.registerDecoder(ObjectType.Number, { + read: buf => buf.readNumber() + }); + + } + + /** + * Registers a new {@link ValueDecoder} for the given tag. + * After the successful registration the {@link tagIntType} is recomputed + * by retrieving the highest tag value and calculating the required Uint size to store it. + * @param tag the tag for which the decoder should be registered. + * @param decoder the decoder that should be registered. + */ + registerDecoder(tag: number, decoder: ValueDecoder): void { + if (this.decoders.has(tag)) { + throw new Error(`Decoder already registered: ${tag}`); + } + this.decoders.set(tag, decoder); + } + parse(buf: ReadBuffer): RpcMessage { + try { + const msgType = buf.readUint8(); + + switch (msgType) { + case RpcMessageType.Request: + return this.parseRequest(buf); + case RpcMessageType.Notification: + return this.parseNotification(buf); + case RpcMessageType.Reply: + return this.parseReply(buf); + case RpcMessageType.ReplyErr: + return this.parseReplyErr(buf); + case RpcMessageType.Cancel: + return this.parseCancel(buf); + } + throw new Error(`Unknown message type: ${msgType}`); + } catch (e) { + // exception does not show problematic content: log it! + console.log('failed to parse message: ' + buf); + throw e; + } + } + + protected parseCancel(msg: ReadBuffer): CancelMessage { + const callId = msg.readUint32(); + return { + type: RpcMessageType.Cancel, + id: callId + }; + } + + protected parseRequest(msg: ReadBuffer): RequestMessage { + const callId = msg.readUint32(); + const method = msg.readString(); + let args = this.readArray(msg); + // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs + args = args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null + + return { + type: RpcMessageType.Request, + id: callId, + method: method, + args: args + }; + } + + protected parseNotification(msg: ReadBuffer): NotificationMessage { + const callId = msg.readUint32(); + const method = msg.readString(); + let args = this.readArray(msg); + // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs + args = args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null + + return { + type: RpcMessageType.Notification, + id: callId, + method: method, + args: args + }; + } + + parseReply(msg: ReadBuffer): ReplyMessage { + const callId = msg.readUint32(); + const value = this.readTypedValue(msg); + return { + type: RpcMessageType.Reply, + id: callId, + res: value + }; + } + + parseReplyErr(msg: ReadBuffer): ReplyErrMessage { + const callId = msg.readUint32(); + + const err = this.readTypedValue(msg); + + return { + type: RpcMessageType.ReplyErr, + id: callId, + err + }; + } + + readArray(buf: ReadBuffer): any[] { + const length = buf.readLength(); + const result = new Array(length); + for (let i = 0; i < length; i++) { + result[i] = this.readTypedValue(buf); + } + return result; + } + + readTypedValue(buf: ReadBuffer): any { + const type = buf.readUint8(); + const decoder = this.decoders.get(type); + if (!decoder) { + throw new Error(`No decoder for tag ${type}`); + } + return decoder.read(buf, innerBuffer => this.readTypedValue(innerBuffer)); + } +} + +/** + * A `RpcMessageEncoder` writes {@link RpcMessage} objects to a {@link WriteBuffer}. Note that it is + * up to clients to commit the message. This allows for multiple messages being + * encoded before sending. + */ +export class RpcMessageEncoder { + + protected readonly encoders: [number, ValueEncoder][] = []; + protected readonly registeredTags: Set = new Set(); + + constructor() { + this.registerEncoders(); + } + + protected registerEncoders(): void { + // encoders will be consulted in reverse order of registration, so the JSON fallback needs to be last + this.registerEncoder(ObjectType.JSON, { + is: () => true, + write: (buf, value) => { + buf.writeString(JSON.stringify(value)); + } + }); + + this.registerEncoder(ObjectType.Object, { + is: value => typeof value === 'object', + write: (buf, object, recursiveEncode) => { + const properties = Object.keys(object); + const relevant = []; + for (const property of properties) { + const value = object[property]; + if (typeof value !== 'function') { + relevant.push([property, value]); + } + } + + buf.writeLength(relevant.length); + for (const [property, value] of relevant) { + buf.writeString(property); + recursiveEncode?.(buf, value); + } + } + }); + + this.registerEncoder(ObjectType.Error, { + is: value => value instanceof Error, + write: (buf, error: Error) => { + const { name, message } = error; + const stack: string = (error).stacktrace || error.stack; + const serializedError = { + $isError: true, + name, + message, + stack + }; + buf.writeString(JSON.stringify(serializedError)); + } + }); + + this.registerEncoder(ObjectType.ResponseError, { + is: value => value instanceof ResponseError, + write: (buf, value) => buf.writeString(JSON.stringify(value)) + }); + + this.registerEncoder(ObjectType.Undefined, { + // eslint-disable-next-line no-null/no-null + is: value => value == null, + write: () => { } + }); + + this.registerEncoder(ObjectType.ObjectArray, { + is: value => Array.isArray(value), + write: (buf, value) => { + this.writeArray(buf, value); + } + }); + + this.registerEncoder(ObjectType.ByteArray, { + is: value => value instanceof Uint8Array, + write: (buf, value) => { + buf.writeBytes(value); + } + }); + + this.registerEncoder(ObjectType.String, { + is: value => typeof value === 'string', + write: (buf, value) => { + buf.writeString(value); + } + }); + + this.registerEncoder(ObjectType.Boolean, { + is: value => typeof value === 'boolean', + write: (buf, value) => { + buf.writeUint8(value === true ? 1 : 0); + } + }); + + this.registerEncoder(ObjectType.Number, { + is: value => typeof value === 'number', + write: (buf, value) => { + buf.writeNumber(value); + } + }); + } + + /** + * Registers a new {@link ValueEncoder} for the given tag. + * After the successful registration the {@link tagIntType} is recomputed + * by retrieving the highest tag value and calculating the required Uint size to store it. + * @param tag the tag for which the encoder should be registered. + * @param decoder the encoder that should be registered. + */ + registerEncoder(tag: number, encoder: ValueEncoder): void { + if (this.registeredTags.has(tag)) { + throw new Error(`Tag already registered: ${tag}`); + } + this.registeredTags.add(tag); + this.encoders.push([tag, encoder]); + } + + cancel(buf: WriteBuffer, requestId: number): void { + buf.writeUint8(RpcMessageType.Cancel); + buf.writeUint32(requestId); + } + + notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + buf.writeUint8(RpcMessageType.Notification); + buf.writeUint32(requestId); + buf.writeString(method); + this.writeArray(buf, args); + } + + request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + buf.writeUint8(RpcMessageType.Request); + buf.writeUint32(requestId); + buf.writeString(method); + this.writeArray(buf, args); + } + + replyOK(buf: WriteBuffer, requestId: number, res: any): void { + buf.writeUint8(RpcMessageType.Reply); + buf.writeUint32(requestId); + this.writeTypedValue(buf, res); + } + + replyErr(buf: WriteBuffer, requestId: number, err: any): void { + buf.writeUint8(RpcMessageType.ReplyErr); + buf.writeUint32(requestId); + this.writeTypedValue(buf, err); + } + + writeTypedValue(buf: WriteBuffer, value: any): void { + for (let i: number = this.encoders.length - 1; i >= 0; i--) { + if (this.encoders[i][1].is(value)) { + buf.writeUint8(this.encoders[i][0]); + this.encoders[i][1].write(buf, value, (innerBuffer, innerValue) => { + this.writeTypedValue(innerBuffer, innerValue); + }); + return; + } + } + } + + writeArray(buf: WriteBuffer, value: any[]): void { + buf.writeLength(value.length); + for (let i = 0; i < value.length; i++) { + this.writeTypedValue(buf, value[i]); + } + } +} diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts new file mode 100644 index 0000000000000..4854fa124d612 --- /dev/null +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -0,0 +1,212 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { CancellationToken, CancellationTokenSource } from '../cancellation'; +import { Emitter, Event } from '../event'; +import { Deferred } from '../promise-util'; +import { Channel } from './channel'; +import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; +import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; + +/** + * Handles request messages received by the {@link RpcServer}. + */ +export type RequestHandler = (method: string, args: any[]) => Promise; + +/** + * Initialization options for a {@link RpcProtocol}. + */ +export interface RpcProtocolOptions { + /** + * The message encoder that should be used. If `undefined` the default {@link RpcMessageEncoder} will be used. + */ + encoder?: RpcMessageEncoder, + /** + * The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used. + */ + decoder?: RpcMessageDecoder +} +/** + * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send + * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side. + * Clients can get a promise for a remote request result that will be either resolved or + * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request + * Currently, there is no timeout handling for long running requests implemented. + */ +export class RpcProtocol { + static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token'; + + protected readonly pendingRequests: Map> = new Map(); + + protected nextMessageId: number = 0; + + protected readonly encoder: RpcMessageEncoder; + protected readonly decoder: RpcMessageDecoder; + + protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter(); + protected readonly cancellationTokenSources = new Map(); + + get onNotification(): Event<{ method: string; args: any[]; }> { + return this.onNotificationEmitter.event; + } + + constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { + this.encoder = options.encoder ?? new RpcMessageEncoder(); + this.decoder = options.decoder ?? new RpcMessageDecoder(); + const registration = channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))); + channel.onClose(() => registration.dispose()); + + } + + handleMessage(message: RpcMessage): void { + switch (message.type) { + case RpcMessageType.Cancel: { + this.handleCancel(message.id); + break; + } + case RpcMessageType.Request: { + this.handleRequest(message.id, message.method, message.args); + break; + } + case RpcMessageType.Notification: { + this.handleNotify(message.id, message.method, message.args); + break; + } + case RpcMessageType.Reply: { + this.handleReply(message.id, message.res); + break; + } + case RpcMessageType.ReplyErr: { + this.handleReplyErr(message.id, message.err); + break; + } + } + } + + protected handleReply(id: number, value: any): void { + const replyHandler = this.pendingRequests.get(id); + if (replyHandler) { + this.pendingRequests.delete(id); + replyHandler.resolve(value); + } else { + console.warn(`reply: no handler for message: ${id}`); + } + } + + protected handleReplyErr(id: number, error: any): void { + try { + const replyHandler = this.pendingRequests.get(id); + if (replyHandler) { + this.pendingRequests.delete(id); + replyHandler.reject(error); + } else { + console.warn(`error: no handler for message: ${id}`); + } + } catch (err) { + throw err; + } + } + + sendRequest(method: string, args: any[]): Promise { + + const id = this.nextMessageId++; + const reply = new Deferred(); + + // The last element of the request args might be a cancellation token. As these tokens are not serializable we have to remove it from the + // args array and the `CANCELLATION_TOKEN_KEY` string instead. + const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined; + if (cancellationToken && cancellationToken.isCancellationRequested) { + return Promise.reject(this.cancelError()); + } + + if (cancellationToken) { + args.push(RpcProtocol.CANCELLATION_TOKEN_KEY); + cancellationToken.onCancellationRequested(() => { + this.sendCancel(id); + this.pendingRequests.get(id)?.reject(this.cancelError()); + } + ); + } + this.pendingRequests.set(id, reply); + + const output = this.channel.getWriteBuffer(); + this.encoder.request(output, id, method, args); + output.commit(); + return reply.promise; + } + + sendNotification(method: string, args: any[]): void { + const output = this.channel.getWriteBuffer(); + this.encoder.notification(output, this.nextMessageId++, method, args); + output.commit(); + } + + sendCancel(requestId: number): void { + const output = this.channel.getWriteBuffer(); + this.encoder.cancel(output, requestId); + output.commit(); + } + + cancelError(): Error { + const error = new Error('"Request has already been canceled by the sender"'); + error.name = 'Cancel'; + return error; + } + + protected handleCancel(id: number): void { + const cancellationTokenSource = this.cancellationTokenSources.get(id); + if (cancellationTokenSource) { + this.cancellationTokenSources.delete(id); + cancellationTokenSource.cancel(); + } + } + + protected async handleRequest(id: number, method: string, args: any[]): Promise { + + const output = this.channel.getWriteBuffer(); + + // Check if the last argument of the received args is the key for indicating that a cancellation token should be used + // If so remove the key from the args and create a new cancellation token. + const addToken = args.length && args[args.length - 1] === RpcProtocol.CANCELLATION_TOKEN_KEY ? args.pop() : false; + if (addToken) { + const tokenSource = new CancellationTokenSource(); + this.cancellationTokenSources.set(id, tokenSource); + args.push(tokenSource.token); + } + + try { + const result = await this.requestHandler(method, args); + this.cancellationTokenSources.delete(id); + this.encoder.replyOK(output, id, result); + output.commit(); + } catch (err) { + // In case of an error the output buffer might already contains parts of an message. + // => Dispose the current buffer and retrieve a new, clean one for writing the response error. + if (output instanceof Uint8ArrayWriteBuffer) { + output.dispose(); + } + const errorOutput = this.channel.getWriteBuffer(); + this.cancellationTokenSources.delete(id); + this.encoder.replyErr(errorOutput, id, err); + errorOutput.commit(); + } + } + + protected async handleNotify(id: number, method: string, args: any[]): Promise { + this.onNotificationEmitter.fire({ method, args }); + } +} diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts new file mode 100644 index 0000000000000..59cccbf90a605 --- /dev/null +++ b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts @@ -0,0 +1,41 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +import { expect } from 'chai'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; + +describe('array message buffer tests', () => { + it('basic read write test', () => { + const buffer = new Uint8Array(1024); + const writer = new Uint8ArrayWriteBuffer(buffer); + + writer.writeUint8(8); + writer.writeUint32(10000); + writer.writeBytes(new Uint8Array([1, 2, 3, 4])); + writer.writeString('this is a string'); + writer.writeString('another string'); + writer.commit(); + + const written = writer.getCurrentContents(); + + const reader = new Uint8ArrayReadBuffer(written); + + expect(reader.readUint8()).equal(8); + expect(reader.readUint32()).equal(10000); + expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4])); + expect(reader.readString()).equal('this is a string'); + expect(reader.readString()).equal('another string'); + }); +}); diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts new file mode 100644 index 0000000000000..feec31dcd69cf --- /dev/null +++ b/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts @@ -0,0 +1,206 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +import { Disposable } from '../disposable'; +import { Emitter, Event } from '../event'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * The default {@link WriteBuffer} implementation. Uses a {@link Uint8Array} for buffering. + * The {@link Uint8ArrayWriteBuffer.onCommit} hook can be used to rect to on-commit events. + * After the {@link Uint8ArrayWriteBuffer.commit} method has been called the buffer is disposed + * and can no longer be used for writing data. If the writer buffer is no longer needed but the message + * has not been committed yet it has to be disposed manually. + */ +export class Uint8ArrayWriteBuffer implements WriteBuffer, Disposable { + + private encoder = new TextEncoder(); + private msg: DataView; + private isDisposed = false; + private offset: number; + + constructor(private buffer: Uint8Array = new Uint8Array(1024), writePosition: number = 0) { + this.offset = buffer.byteOffset + writePosition; + this.msg = new DataView(buffer.buffer); + } + + ensureCapacity(value: number): this { + let newLength = this.buffer.byteLength; + while (newLength < this.offset + value) { + newLength *= 2; + } + if (newLength !== this.buffer.byteLength) { + const newBuffer = new Uint8Array(newLength); + newBuffer.set(this.buffer); + this.buffer = newBuffer; + this.msg = new DataView(this.buffer.buffer); + } + return this; + } + + writeLength(length: number): this { + if (length < 0 || (length % 1) !== 0) { + throw new Error(`Could not write the given length value. '${length}' is not an integer >= 0`); + } + if (length < 127) { + this.writeUint8(length); + } else { + this.writeUint8(128 + (length & 127)); + this.writeLength(length >> 7); + } + return this; + } + + writeNumber(value: number): this { + this.ensureCapacity(8); + this.msg.setFloat64(this.offset, value); + this.offset += 8; + return this; + } + + writeUint8(value: number): this { + this.ensureCapacity(1); + this.buffer[this.offset++] = value; + return this; + } + + writeUint16(value: number): this { + this.ensureCapacity(2); + this.msg.setUint16(this.offset, value); + this.offset += 2; + return this; + } + + writeUint32(value: number): this { + this.ensureCapacity(4); + this.msg.setUint32(this.offset, value); + this.offset += 4; + return this; + } + + writeString(value: string): this { + this.ensureCapacity(4 * value.length); + const result = this.encoder.encodeInto(value, this.buffer.subarray(this.offset + 4)); + this.msg.setUint32(this.offset, result.written!); + this.offset += 4 + result.written!; + return this; + } + + writeBytes(value: Uint8Array): this { + this.writeLength(value.byteLength); + this.ensureCapacity(value.length); + this.buffer.set(value, this.offset); + this.offset += value.length; + return this; + } + + private onCommitEmitter = new Emitter(); + get onCommit(): Event { + return this.onCommitEmitter.event; + } + + commit(): void { + if (this.isDisposed) { + throw new Error("Could not invoke 'commit'. The WriteBuffer is already disposed."); + } + this.onCommitEmitter.fire(this.getCurrentContents()); + this.dispose(); + } + + getCurrentContents(): Uint8Array { + return this.buffer.slice(this.buffer.byteOffset, this.offset); + } + + dispose(): void { + if (!this.isDisposed) { + this.onCommitEmitter.dispose(); + this.isDisposed = true; + } + } + +} +/** + * The default {@link ReadBuffer} implementation. Uses a {@link Uint8Array} for buffering. + * Is for single message read. A message can only be read once. + */ +export class Uint8ArrayReadBuffer implements ReadBuffer { + private offset: number = 0; + private msg: DataView; + private decoder = new TextDecoder(); + + constructor(private readonly buffer: Uint8Array, readPosition = 0) { + this.offset = buffer.byteOffset + readPosition; + this.msg = new DataView(buffer.buffer); + } + + readUint8(): number { + return this.msg.getUint8(this.offset++); + } + + readUint16(): number { + const result = this.msg.getUint16(this.offset); + this.offset += 2; + return result; + } + + readUint32(): number { + const result = this.msg.getUint32(this.offset); + this.offset += 4; + return result; + } + + readLength(): number { + let shift = 0; + let byte = this.readUint8(); + let value = (byte & 127) << shift; + while (byte > 127) { + shift += 7; + byte = this.readUint8(); + value = value + ((byte & 127) << shift); + } + return value; + } + + readNumber(): number { + const result = this.msg.getFloat64(this.offset); + this.offset += 8; + return result; + } + + readString(): string { + const len = this.readUint32(); + const sliceOffset = this.offset - this.buffer.byteOffset; + const result = this.decodeString(this.buffer.slice(sliceOffset, sliceOffset + len)); + this.offset += len; + return result; + } + + private decodeString(buf: Uint8Array): string { + return this.decoder.decode(buf); + } + + readBytes(): Uint8Array { + const length = this.readLength(); + const sliceOffset = this.offset - this.buffer.byteOffset; + const result = this.buffer.slice(sliceOffset, sliceOffset + length); + this.offset += length; + return result; + } + + sliceAtReadPosition(): ReadBuffer { + const sliceOffset = this.offset - this.buffer.byteOffset; + return new Uint8ArrayReadBuffer(this.buffer, sliceOffset); + } +} diff --git a/packages/core/src/common/messaging/abstract-connection-provider.ts b/packages/core/src/common/messaging/abstract-connection-provider.ts index d4c5c3bd3aaf3..d003fe86fc683 100644 --- a/packages/core/src/common/messaging/abstract-connection-provider.ts +++ b/packages/core/src/common/messaging/abstract-connection-provider.ts @@ -15,11 +15,10 @@ // ***************************************************************************** import { injectable, interfaces } from 'inversify'; -import { ConsoleLogger, createWebSocketConnection, Logger } from 'vscode-ws-jsonrpc'; import { Emitter, Event } from '../event'; import { ConnectionHandler } from './handler'; import { JsonRpcProxy, JsonRpcProxyFactory } from './proxy-factory'; -import { WebSocketChannel } from './web-socket-channel'; +import { Channel, ChannelMultiplexer } from '../message-rpc/channel'; /** * Factor common logic according to `ElectronIpcConnectionProvider` and @@ -45,9 +44,6 @@ export abstract class AbstractConnectionProvider throw new Error('abstract'); } - protected channelIdSeq = 0; - protected readonly channels = new Map(); - protected readonly onIncomingMessageActivityEmitter: Emitter = new Emitter(); get onIncomingMessageActivity(): Event { return this.onIncomingMessageActivityEmitter.event; @@ -75,50 +71,39 @@ export abstract class AbstractConnectionProvider return factory.createProxy(); } + protected channelMultiPlexer: ChannelMultiplexer; + + constructor() { + this.channelMultiPlexer = this.createMultiplexer(); + } + + protected createMultiplexer(): ChannelMultiplexer { + return new ChannelMultiplexer(this.createMainChannel()); + } + /** * Install a connection handler for the given path. */ listen(handler: ConnectionHandler, options?: AbstractOptions): void { this.openChannel(handler.path, channel => { - const connection = createWebSocketConnection(channel, this.createLogger()); - connection.onDispose(() => channel.close()); - handler.onConnection(connection); + handler.onConnection(channel); }, options); } - openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: AbstractOptions): void { - const id = this.channelIdSeq++; - const channel = this.createChannel(id); - this.channels.set(id, channel); - channel.onClose(() => { - if (this.channels.delete(channel.id)) { - const { reconnecting } = { reconnecting: true, ...options }; - if (reconnecting) { - this.openChannel(path, handler, options); - } - } else { - console.error('The ws channel does not exist', channel.id); + async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise { + const newChannel = await this.channelMultiPlexer.open(path); + newChannel.onClose(() => { + const { reconnecting } = { reconnecting: true, ...options }; + if (reconnecting) { + this.openChannel(path, handler, options); } }); - channel.onOpen(() => handler(channel)); - channel.open(path); + handler(newChannel); } - protected abstract createChannel(id: number): WebSocketChannel; - - protected handleIncomingRawMessage(data: string): void { - const message: WebSocketChannel.Message = JSON.parse(data); - const channel = this.channels.get(message.id); - if (channel) { - channel.handleMessage(message); - } else { - console.error('The ws channel does not exist', message.id); - } - this.onIncomingMessageActivityEmitter.fire(undefined); - } - - protected createLogger(): Logger { - return new ConsoleLogger(); - } + /** + * Create the main connection that is used for multiplexing all channels. + */ + protected abstract createMainChannel(): Channel; } diff --git a/packages/core/src/common/messaging/connection-error-handler.ts b/packages/core/src/common/messaging/connection-error-handler.ts index aecfe68901e24..89a27b60a50db 100644 --- a/packages/core/src/common/messaging/connection-error-handler.ts +++ b/packages/core/src/common/messaging/connection-error-handler.ts @@ -14,7 +14,6 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Message } from 'vscode-ws-jsonrpc'; import { ILogger } from '../../common'; export interface ResolvedConnectionErrorHandlerOptions { @@ -51,7 +50,7 @@ export class ConnectionErrorHandler { }; } - shouldStop(error: Error, message?: Message, count?: number): boolean { + shouldStop(error: Error, count?: number): boolean { return !count || count > this.options.maxErrors; } diff --git a/packages/core/src/common/messaging/handler.ts b/packages/core/src/common/messaging/handler.ts index ed03d9d331206..1e790d38aeec3 100644 --- a/packages/core/src/common/messaging/handler.ts +++ b/packages/core/src/common/messaging/handler.ts @@ -14,11 +14,11 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { Channel } from '../message-rpc/channel'; export const ConnectionHandler = Symbol('ConnectionHandler'); export interface ConnectionHandler { readonly path: string; - onConnection(connection: MessageConnection): void; + onConnection(connection: Channel): void; } diff --git a/packages/core/src/common/messaging/proxy-factory.spec.ts b/packages/core/src/common/messaging/proxy-factory.spec.ts index 2fd0700a41034..37280e4dbfdaa 100644 --- a/packages/core/src/common/messaging/proxy-factory.spec.ts +++ b/packages/core/src/common/messaging/proxy-factory.spec.ts @@ -15,21 +15,11 @@ // ***************************************************************************** import * as chai from 'chai'; -import { ConsoleLogger } from '../../node/messaging/logger'; import { JsonRpcProxyFactory, JsonRpcProxy } from './proxy-factory'; -import { createMessageConnection } from 'vscode-jsonrpc/lib/main'; -import * as stream from 'stream'; +import { ChannelPipe } from '../message-rpc/channel.spec'; const expect = chai.expect; -class NoTransform extends stream.Transform { - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - override _transform(chunk: any, encoding: string, callback: Function): void { - callback(undefined, chunk); - } -} - class TestServer { requests: string[] = []; doStuff(arg: string): Promise { @@ -102,15 +92,12 @@ function getSetup(): { const server = new TestServer(); const serverProxyFactory = new JsonRpcProxyFactory(client); - const client2server = new NoTransform(); - const server2client = new NoTransform(); - const serverConnection = createMessageConnection(server2client, client2server, new ConsoleLogger()); - serverProxyFactory.listen(serverConnection); + const pipe = new ChannelPipe(); + serverProxyFactory.listen(pipe.right); const serverProxy = serverProxyFactory.createProxy(); const clientProxyFactory = new JsonRpcProxyFactory(server); - const clientConnection = createMessageConnection(client2server, server2client, new ConsoleLogger()); - clientProxyFactory.listen(clientConnection); + clientProxyFactory.listen(pipe.left); const clientProxy = clientProxyFactory.createProxy(); return { client, diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts index f8869449eae94..765b70dd1a472 100644 --- a/packages/core/src/common/messaging/proxy-factory.ts +++ b/packages/core/src/common/messaging/proxy-factory.ts @@ -16,10 +16,12 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { MessageConnection, ResponseError } from 'vscode-ws-jsonrpc'; +import { ResponseError } from '../message-rpc/rpc-message-encoder'; import { ApplicationError } from '../application-error'; -import { Event, Emitter } from '../event'; import { Disposable } from '../disposable'; +import { Emitter, Event } from '../event'; +import { Channel } from '../message-rpc/channel'; +import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol'; import { ConnectionHandler } from './handler'; export type JsonRpcServer = Disposable & { @@ -45,13 +47,19 @@ export class JsonRpcConnectionHandler implements ConnectionHan readonly factoryConstructor: new () => JsonRpcProxyFactory = JsonRpcProxyFactory ) { } - onConnection(connection: MessageConnection): void { + onConnection(connection: Channel): void { const factory = new this.factoryConstructor(); const proxy = factory.createProxy(); factory.target = this.targetFactory(proxy); factory.listen(connection); } } +/** + * Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}. + */ +export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; + +const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); /** * Factory for JSON-RPC proxy objects. @@ -95,13 +103,14 @@ export class JsonRpcConnectionHandler implements ConnectionHan * * @param - The type of the object to expose to JSON-RPC. */ + export class JsonRpcProxyFactory implements ProxyHandler { protected readonly onDidOpenConnectionEmitter = new Emitter(); protected readonly onDidCloseConnectionEmitter = new Emitter(); - protected connectionPromiseResolve: (connection: MessageConnection) => void; - protected connectionPromise: Promise; + protected connectionPromiseResolve: (connection: RpcProtocol) => void; + protected connectionPromise: Promise; /** * Build a new JsonRpcProxyFactory. @@ -109,7 +118,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { * @param target - The object to expose to JSON-RPC methods calls. If this * is omitted, the proxy won't be able to handle requests, only send them. */ - constructor(public target?: any) { + constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) { this.waitForConnection(); } @@ -118,7 +127,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { this.connectionPromiseResolve = resolve ); this.connectionPromise.then(connection => { - connection.onClose(() => + connection.channel.onClose(() => this.onDidCloseConnectionEmitter.fire(undefined) ); this.onDidOpenConnectionEmitter.fire(undefined); @@ -131,11 +140,10 @@ export class JsonRpcProxyFactory implements ProxyHandler { * This connection will be used to send/receive JSON-RPC requests and * response. */ - listen(connection: MessageConnection): void { - connection.onRequest((prop, ...args) => this.onRequest(prop, ...args)); - connection.onNotification((prop, ...args) => this.onNotification(prop, ...args)); - connection.onDispose(() => this.waitForConnection()); - connection.listen(); + listen(channel: Channel): void { + const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args)); + connection.onNotification(event => this.onNotification(event.method, ...event.args)); + this.connectionPromiseResolve(connection); } @@ -239,10 +247,10 @@ export class JsonRpcProxyFactory implements ProxyHandler { new Promise((resolve, reject) => { try { if (isNotify) { - connection.sendNotification(method, ...args); + connection.sendNotification(method, args); resolve(undefined); } else { - const resultPromise = connection.sendRequest(method, ...args) as Promise; + const resultPromise = connection.sendRequest(method, args) as Promise; resultPromise .catch((err: any) => reject(this.deserializeError(capturedError, err))) .then((result: any) => resolve(result)); @@ -293,3 +301,4 @@ export class JsonRpcProxyFactory implements ProxyHandler { } } + diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts index 28dff9400068a..74f7503e0b997 100644 --- a/packages/core/src/common/messaging/web-socket-channel.ts +++ b/packages/core/src/common/messaging/web-socket-channel.ts @@ -16,157 +16,100 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { IWebSocket } from 'vscode-ws-jsonrpc/lib/socket/socket'; -import { Disposable, DisposableCollection } from '../disposable'; -import { Emitter } from '../event'; - -export class WebSocketChannel implements IWebSocket { - +import { Emitter, Event } from '../event'; +import { WriteBuffer } from '../message-rpc'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer'; +import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel'; + +/** + * A channel that manages the main websocket connection between frontend and backend. All service channels + * are reusing this main channel. (multiplexing). An {@link IWebSocket} abstraction is used to keep the implementation + * independent of the actual websocket implementation and its execution context (backend vs. frontend). + */ +export class WebSocketChannel implements Channel { static wsPath = '/services'; - protected readonly closeEmitter = new Emitter<[number, string]>(); - protected readonly toDispose = new DisposableCollection(this.closeEmitter); - - constructor( - readonly id: number, - protected readonly doSend: (content: string) => void - ) { } - - dispose(): void { - this.toDispose.dispose(); - } - - protected checkNotDisposed(): void { - if (this.toDispose.disposed) { - throw new Error('The channel has been disposed.'); - } - } - - handleMessage(message: WebSocketChannel.Message): void { - if (message.kind === 'ready') { - this.fireOpen(); - } else if (message.kind === 'data') { - this.fireMessage(message.content); - } else if (message.kind === 'close') { - this.fireClose(message.code, message.reason); - } - } - - open(path: string): void { - this.checkNotDisposed(); - this.doSend(JSON.stringify({ - kind: 'open', - id: this.id, - path - })); - } - - ready(): void { - this.checkNotDisposed(); - this.doSend(JSON.stringify({ - kind: 'ready', - id: this.id - })); + protected readonly onCloseEmitter: Emitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; } - send(content: string): void { - this.checkNotDisposed(); - this.doSend(JSON.stringify({ - kind: 'data', - id: this.id, - content - })); + protected readonly onMessageEmitter: Emitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; } - close(code: number = 1000, reason: string = ''): void { - if (this.closing) { - // Do not try to close the channel if it is already closing. - return; - } - this.checkNotDisposed(); - this.doSend(JSON.stringify({ - kind: 'close', - id: this.id, - code, - reason - })); - this.fireClose(code, reason); + protected readonly onErrorEmitter: Emitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; } - tryClose(code: number = 1000, reason: string = ''): void { - if (this.closing || this.toDispose.disposed) { - // Do not try to close the channel if it is already closing or disposed. - return; - } - this.doSend(JSON.stringify({ - kind: 'close', - id: this.id, - code, - reason + constructor(protected readonly socket: IWebSocket) { + socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code })); + socket.onError(error => this.onErrorEmitter.fire(error)); + // eslint-disable-next-line arrow-body-style + socket.onMessage(data => this.onMessageEmitter.fire(() => { + // In the browser context socketIO receives binary messages as ArrayBuffers. + // So we have to convert them to a Uint8Array before delegating the message to the read buffer. + const buffer = data instanceof ArrayBuffer ? new Uint8Array(data) : data; + return new Uint8ArrayReadBuffer(buffer); })); - this.fireClose(code, reason); } - protected fireOpen: () => void = () => { }; - onOpen(cb: () => void): void { - this.checkNotDisposed(); - this.fireOpen = cb; - this.toDispose.push(Disposable.create(() => this.fireOpen = () => { })); - } + getWriteBuffer(): WriteBuffer { + const result = new Uint8ArrayWriteBuffer(); - protected fireMessage: (data: any) => void = () => { }; - onMessage(cb: (data: any) => void): void { - this.checkNotDisposed(); - this.fireMessage = cb; - this.toDispose.push(Disposable.create(() => this.fireMessage = () => { })); - } + result.onCommit(buffer => { + if (this.socket.isConnected()) { + this.socket.send(buffer); + } else { + console.warn('Could not send message. Websocket is not connected'); + } + }); - fireError: (reason: any) => void = () => { }; - onError(cb: (reason: any) => void): void { - this.checkNotDisposed(); - this.fireError = cb; - this.toDispose.push(Disposable.create(() => this.fireError = () => { })); + return result; } - protected closing = false; - protected fireClose(code: number, reason: string): void { - if (this.closing) { - return; - } - this.closing = true; - try { - this.closeEmitter.fire([code, reason]); - } finally { - this.closing = false; - } - this.dispose(); - } - onClose(cb: (code: number, reason: string) => void): Disposable { - this.checkNotDisposed(); - return this.closeEmitter.event(([code, reason]) => cb(code, reason)); + close(): void { + this.socket.close(); + this.onCloseEmitter.dispose(); + this.onMessageEmitter.dispose(); + this.onErrorEmitter.dispose(); } - } -export namespace WebSocketChannel { - export interface OpenMessage { - kind: 'open' - id: number - path: string - } - export interface ReadyMessage { - kind: 'ready' - id: number - } - export interface DataMessage { - kind: 'data' - id: number - content: string - } - export interface CloseMessage { - kind: 'close' - id: number - code: number - reason: string - } - export type Message = OpenMessage | ReadyMessage | DataMessage | CloseMessage; + +/** + * An abstraction that enables reuse of the `{@link WebSocketChannel} class in the frontend and backend + * independent of the actual underlying socket implementation. + */ +export interface IWebSocket { + /** + * Sends the given message over the web socket in binary format. + * @param message The binary message. + */ + send(message: Uint8Array): void; + /** + * Closes the websocket from the local side. + */ + close(): void; + /** + * The connection state of the web socket. + */ + isConnected(): boolean; + /** + * Listener callback to handle incoming messages. + * @param cb The callback. + */ + onMessage(cb: (message: Uint8Array) => void): void; + /** + * Listener callback to handle socket errors. + * @param cb The callback. + */ + onError(cb: (reason: any) => void): void; + /** + * Listener callback to handle close events (Remote side). + * @param cb The callback. + */ + onClose(cb: (reason: string, code?: number) => void): void; } + diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts index b3d8ce8ab9415..6aa06861d2e93 100644 --- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts @@ -17,9 +17,11 @@ import { Event as ElectronEvent, ipcRenderer } from '@theia/electron/shared/electron'; import { injectable, interfaces } from 'inversify'; import { JsonRpcProxy } from '../../common/messaging'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; +import { Emitter, Event } from '../../common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; +import { Channel, MessageProvider } from '../../common/message-rpc/channel'; export interface ElectronIpcOptions { } @@ -34,17 +36,25 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider(path, arg); } - constructor() { - super(); - ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: ElectronEvent, data: string) => { - this.handleIncomingRawMessage(data); - }); - } - - protected createChannel(id: number): WebSocketChannel { - return new WebSocketChannel(id, content => { - ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content); + protected createMainChannel(): Channel { + const onMessageEmitter = new Emitter(); + ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => { + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); }); + return { + close: () => Event.None, + getWriteBuffer: () => { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => + // The ipcRenderer cannot handle ArrayBuffers directly=> we have to convert to Uint8Array. + ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) + ); + return writer; + }, + onClose: Event.None, + onError: Event.None, + onMessage: onMessageEmitter.event + }; } } diff --git a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts index 6f75ea31d0dae..ac99ee85bbab8 100644 --- a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts @@ -15,9 +15,9 @@ // ***************************************************************************** import { injectable } from 'inversify'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { WebSocketConnectionProvider, WebSocketOptions } from '../../browser/messaging/ws-connection-provider'; import { FrontendApplicationContribution } from '../../browser/frontend-application'; +import { Channel } from '../../common'; /** * Customized connection provider between the frontend and the backend in electron environment. @@ -34,16 +34,13 @@ export class ElectronWebSocketConnectionProvider extends WebSocketConnectionProv onStop(): void { this.stopping = true; - // Close the websocket connection `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. + // Manually close the websocket connections `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. // https://github.com/eclipse-theia/theia/issues/6499 - for (const channel of [...this.channels.values()]) { - // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. - // But we cannot use `1001`: https://github.com/TypeFox/vscode-ws-jsonrpc/issues/15 - channel.close(1000, 'The frontend is "going away"...'); - } + // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. + this.channelMultiPlexer.closeUnderlyingChannel({ reason: 'The frontend is "going away"', code: 1001 }); } - override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void { + override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { if (!this.stopping) { super.openChannel(path, handler, options); } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts index 071796c5cf0ca..dfd8f6115c8ce 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts @@ -16,24 +16,23 @@ import { IpcMainEvent, ipcMain, WebContents } from '@theia/electron/shared/electron'; import { inject, injectable, named, postConstruct } from 'inversify'; -import { MessageConnection } from 'vscode-ws-jsonrpc'; -import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection'; import { ContributionProvider } from '../../common/contribution-provider'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { MessagingContribution } from '../../node/messaging/messaging-contribution'; -import { ConsoleLogger } from '../../node/messaging/logger'; import { ElectronConnectionHandler, THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; import { ElectronMainApplicationContribution } from '../electron-main-application'; import { ElectronMessagingService } from './electron-messaging-service'; +import { Channel, ChannelCloseEvent, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; +import { Emitter, Event, WriteBuffer } from '../../common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; /** * This component replicates the role filled by `MessagingContribution` but for Electron. * Unlike the WebSocket based implementation, we do not expect to receive * connection events. Instead, we'll create channels based on incoming `open` * events on the `ipcMain` channel. - * * This component allows communication between renderer process (frontend) and electron main process. */ + @injectable() export class ElectronMessagingContribution implements ElectronMainApplicationContribution, ElectronMessagingService { @@ -43,89 +42,112 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon @inject(ContributionProvider) @named(ElectronConnectionHandler) protected readonly connectionHandlers: ContributionProvider; - protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); - protected readonly windowChannels = new Map>(); + protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); + /** + * Each electron window has a main chanel and its own multiplexer to route multiple client messages the same IPC connection. + */ + protected readonly windowChannelMultiplexer = new Map(); @postConstruct() protected init(): void { - ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: string) => { - this.handleIpcMessage(event, data); + ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: Uint8Array) => { + this.handleIpcEvent(event, data); }); } + protected handleIpcEvent(event: IpcMainEvent, data: Uint8Array): void { + const sender = event.sender; + // Get the multiplexer for a given window id + try { + const windowChannelData = this.windowChannelMultiplexer.get(sender.id) ?? this.createWindowChannelData(sender); + windowChannelData!.channel.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); + } catch (error) { + console.error('IPC: Failed to handle message', { error, data }); + } + } + + // Creates a new multiplexer for a given sender/window + protected createWindowChannelData(sender: Electron.WebContents): { channel: ElectronWebContentChannel, multiPlexer: ChannelMultiplexer } { + const mainChannel = this.createWindowMainChannel(sender); + const multiPlexer = new ChannelMultiplexer(mainChannel); + multiPlexer.onDidOpenChannel(openEvent => { + const { channel, id } = openEvent; + if (this.channelHandlers.route(id, channel)) { + console.debug(`Opening channel for service path '${id}'.`); + channel.onClose(() => console.debug(`Closing channel on service path '${id}'.`)); + } + }); + + sender.once('did-navigate', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was refreshed' })); // When refreshing the browser window. + sender.once('destroyed', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was closed' })); // When closing the browser window. + const data = { channel: mainChannel, multiPlexer }; + this.windowChannelMultiplexer.set(sender.id, data); + return data; + } + + /** + * Creates the main channel to a window. + * @param sender The window that the channel should be established to. + */ + protected createWindowMainChannel(sender: WebContents): ElectronWebContentChannel { + return new ElectronWebContentChannel(sender); + } + onStart(): void { for (const contribution of this.messagingContributions.getContributions()) { contribution.configure(this); } for (const connectionHandler of this.connectionHandlers.getContributions()) { this.channelHandlers.push(connectionHandler.path, (params, channel) => { - const connection = createWebSocketConnection(channel, new ConsoleLogger()); - connectionHandler.onConnection(connection); + connectionHandler.onConnection(channel); }); } } - listen(spec: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void { - this.ipcChannel(spec, (params, channel) => { - const connection = createWebSocketConnection(channel, new ConsoleLogger()); - callback(params, connection); - }); - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ipcChannel(spec: string, callback: (params: any, channel: WebSocketChannel) => void): void { + ipcChannel(spec: string, callback: (params: any, channel: Channel) => void): void { this.channelHandlers.push(spec, callback); } +} + +/** + * Used to establish a connection between the ipcMain and the Electron frontend (window). + * Messages a transferred via electron IPC. + */ +export class ElectronWebContentChannel implements Channel { + protected readonly onCloseEmitter: Emitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; + } - protected handleIpcMessage(event: IpcMainEvent, data: string): void { - const sender = event.sender; - try { - // Get the channel map for a given window id - let channels = this.windowChannels.get(sender.id)!; - if (!channels) { - this.windowChannels.set(sender.id, channels = new Map()); - } - // Start parsing the message to extract the channel id and route - const message: WebSocketChannel.Message = JSON.parse(data.toString()); - // Someone wants to open a logical channel - if (message.kind === 'open') { - const { id, path } = message; - const channel = this.createChannel(id, sender); - if (this.channelHandlers.route(path, channel)) { - channel.ready(); - channels.set(id, channel); - channel.onClose(() => channels.delete(id)); - } else { - console.error('Cannot find a service for the path: ' + path); - } - } else { - const { id } = message; - const channel = channels.get(id); - if (channel) { - channel.handleMessage(message); - } else { - console.error('The ipc channel does not exist', id); - } - } - const close = () => { - for (const channel of Array.from(channels.values())) { - channel.close(undefined, 'webContent destroyed'); - } - channels.clear(); - }; - sender.once('did-navigate', close); // When refreshing the browser window. - sender.once('destroyed', close); // When closing the browser window. - } catch (error) { - console.error('IPC: Failed to handle message', { error, data }); - } + // Make the message emitter public so that we can easily forward messages received from the ipcMain. + readonly onMessageEmitter: Emitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; + } + + protected readonly onErrorEmitter: Emitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; + } + + constructor(protected readonly sender: Electron.WebContents) { } - protected createChannel(id: number, sender: WebContents): WebSocketChannel { - return new WebSocketChannel(id, content => { - if (!sender.isDestroyed()) { - sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content); + getWriteBuffer(): WriteBuffer { + const writer = new Uint8ArrayWriteBuffer(); + + writer.onCommit(buffer => { + if (!this.sender.isDestroyed()) { + this.sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer); } }); - } + return writer; + } + close(): void { + this.onCloseEmitter.dispose(); + this.onMessageEmitter.dispose(); + this.onErrorEmitter.dispose(); + } } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-service.ts b/packages/core/src/electron-main/messaging/electron-messaging-service.ts index dde3fdde1d181..874d51237b4fd 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-service.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-service.ts @@ -14,20 +14,14 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import type { MessageConnection } from 'vscode-jsonrpc'; -import type { WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { Channel } from '../../common/message-rpc/channel'; export interface ElectronMessagingService { - /** - * Accept a JSON-RPC connection on the given path. - * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. - */ - listen(path: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void; /** * Accept an ipc channel on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. */ - ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: WebSocketChannel) => void): void; + ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: Channel) => void): void; } export namespace ElectronMessagingService { export interface PathParams { diff --git a/packages/core/src/node/messaging/binary-message-pipe.ts b/packages/core/src/node/messaging/binary-message-pipe.ts new file mode 100644 index 0000000000000..1143aef9cf4cc --- /dev/null +++ b/packages/core/src/node/messaging/binary-message-pipe.ts @@ -0,0 +1,168 @@ +// ***************************************************************************** +// Copyright (C) 2022 STMicroelectronics and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { Duplex } from 'stream'; +import { Disposable, Emitter, Event } from '../../common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; + +/** + * A `BinaryMessagePipe` is capable of sending and retrieving binary messages i.e. {@link Uint8Array}s over + * and underlying streamed process pipe/fd. The message length of individual messages is encoding at the beginning of + * a new message. This makes it possible to extract messages from the streamed data. + */ +export class BinaryMessagePipe implements Disposable { + static readonly MESSAGE_START_IDENTIFIER = ''; + protected dataHandler = (chunk: Uint8Array) => this.handleChunk(chunk); + + protected onMessageEmitter = new Emitter(); + protected cachedMessageData: StreamedMessageData = { + chunks: [], + missingBytes: 0 + }; + + get onMessage(): Event { + return this.onMessageEmitter.event; + } + + constructor(protected readonly underlyingPipe: Duplex) { + underlyingPipe.on('data', this.dataHandler); + } + + send(message: Uint8Array): void { + this.underlyingPipe.write(this.encodeMessageStart(message)); + this.underlyingPipe.write(message); + } + + protected handleChunk(chunk: Uint8Array): void { + if (this.cachedMessageData.missingBytes === 0) { + // There is no currently streamed message => We expect that the beginning of the chunk is the message start for a new message + this.handleNewMessage(chunk); + } else { + // The chunk contains message data intended for the currently cached message + this.handleMessageContentChunk(chunk); + } + } + + protected handleNewMessage(chunk: Uint8Array): void { + if (chunk.byteLength < this.messageStartByteLength) { + // The chunk only contains a part of the encoded message start + this.cachedMessageData.partialMessageStart = chunk; + return; + } + + const messageLength = this.readMessageStart(chunk); + + if (chunk.length - this.messageStartByteLength > messageLength) { + // The initial chunk contains more than one binary message => Fire `onMessage` for first message and handle remaining content + const firstMessage = chunk.slice(this.messageStartByteLength, this.messageStartByteLength + messageLength); + this.onMessageEmitter.fire(firstMessage); + this.handleNewMessage(chunk.slice(this.messageStartByteLength + messageLength)); + + } else if (chunk.length - this.messageStartByteLength === messageLength) { + // The initial chunk contains exactly one complete message. => Directly fire the `onMessage` event. + this.onMessageEmitter.fire(chunk.slice(this.messageStartByteLength)); + } else { + // The initial chunk contains only part of the message content => Cache message data + this.cachedMessageData.chunks = [chunk.slice(this.messageStartByteLength)]; + this.cachedMessageData.missingBytes = messageLength - chunk.byteLength + this.messageStartByteLength; + } + } + + protected handleMessageContentChunk(chunk: Uint8Array): void { + if (this.cachedMessageData) { + if (chunk.byteLength < this.cachedMessageData.missingBytes) { + // The chunk only contains parts of the missing bytes for the cached message. + this.cachedMessageData.chunks.push(chunk); + this.cachedMessageData.missingBytes -= chunk.byteLength; + } else if (chunk.byteLength === this.cachedMessageData.missingBytes) { + // Chunk contains exactly the missing data for the cached message + this.cachedMessageData.chunks.push(chunk); + this.emitCachedMessage(); + } else { + // Chunk contains missing data for the cached message + data for the next message + const messageEnd = this.cachedMessageData.missingBytes; + const missingData = chunk.slice(0, messageEnd); + this.cachedMessageData.chunks.push(missingData); + this.emitCachedMessage(); + this.handleNewMessage(chunk.slice(messageEnd)); + } + + } + } + + protected emitCachedMessage(): void { + const message = Buffer.concat(this.cachedMessageData.chunks); + this.onMessageEmitter.fire(message); + this.cachedMessageData.chunks = []; + this.cachedMessageData.missingBytes = 0; + } + + /** + * Encodes the start of a new message into a {@link Uint8Array}. + * The message start consists of a identifier string and the length of the following message. + * @returns the buffer contains the encoded message start + */ + protected encodeMessageStart(message: Uint8Array): Uint8Array { + const writer = new Uint8ArrayWriteBuffer() + .writeString(BinaryMessagePipe.MESSAGE_START_IDENTIFIER) + .writeUint32(message.length); + const messageStart = writer.getCurrentContents(); + writer.dispose(); + return messageStart; + } + + protected get messageStartByteLength(): number { + // 4 bytes for length of id + id string length + 4 bytes for length of message + return 4 + BinaryMessagePipe.MESSAGE_START_IDENTIFIER.length + 4; + } + + /** + * Reads the start of a new message from a stream chunk (or cached message) received from the underlying pipe. + * The message start is expected to consist of an identifier string and the length of the message. + * @param chunk The stream chunk. + * @returns The length of the message content to read. + * @throws An error if the message start can not be read successfully. + */ + protected readMessageStart(chunk: Uint8Array): number { + const messageData = this.cachedMessageData.partialMessageStart ? Buffer.concat([this.cachedMessageData.partialMessageStart, chunk]) : chunk; + this.cachedMessageData.partialMessageStart = undefined; + + const reader = new Uint8ArrayReadBuffer(messageData); + const identifier = reader.readString(); + + if (identifier !== BinaryMessagePipe.MESSAGE_START_IDENTIFIER) { + throw new Error(`Could not read message start. The start identifier should be '${BinaryMessagePipe.MESSAGE_START_IDENTIFIER}' but was '${identifier}`); + } + const length = reader.readUint32(); + return length; + } + + dispose(): void { + this.underlyingPipe.removeListener('data', this.dataHandler); + this.underlyingPipe.end(); + this.onMessageEmitter.dispose(); + this.cachedMessageData = { + chunks: [], + missingBytes: 0 + }; + } +} + +interface StreamedMessageData { + chunks: Uint8Array[]; + missingBytes: number; + partialMessageStart?: Uint8Array; +} diff --git a/packages/core/src/node/messaging/ipc-bootstrap.ts b/packages/core/src/node/messaging/ipc-bootstrap.ts index 0bac13bb163b8..d46d63efa9071 100644 --- a/packages/core/src/node/messaging/ipc-bootstrap.ts +++ b/packages/core/src/node/messaging/ipc-bootstrap.ts @@ -16,20 +16,12 @@ import 'reflect-metadata'; import { dynamicRequire } from '../dynamic-require'; -import { ConsoleLogger } from 'vscode-ws-jsonrpc/lib/logger'; -import { createMessageConnection, IPCMessageReader, IPCMessageWriter, Trace } from 'vscode-ws-jsonrpc'; +import { IPCChannel } from './ipc-channel'; import { checkParentAlive, IPCEntryPoint } from './ipc-protocol'; checkParentAlive(); const entryPoint = IPCEntryPoint.getScriptFromEnv(); -const reader = new IPCMessageReader(process); -const writer = new IPCMessageWriter(process); -const logger = new ConsoleLogger(); -const connection = createMessageConnection(reader, writer, logger); -connection.trace(Trace.Off, { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - log: (message: any, data?: string) => console.log(message, data) -}); -dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(connection); +dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(new IPCChannel()); + diff --git a/packages/core/src/node/messaging/ipc-channel.ts b/packages/core/src/node/messaging/ipc-channel.ts new file mode 100644 index 0000000000000..71fc65d774f08 --- /dev/null +++ b/packages/core/src/node/messaging/ipc-channel.ts @@ -0,0 +1,98 @@ +// ***************************************************************************** +// Copyright (C) 2022 STMicroelectronics and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import * as cp from 'child_process'; +import { Socket } from 'net'; +import { Duplex } from 'stream'; +import { Channel, ChannelCloseEvent, Disposable, DisposableCollection, Emitter, Event, MessageProvider, WriteBuffer } from '../../common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; +import { BinaryMessagePipe } from './binary-message-pipe'; + +/** + * A {@link Channel} to send messages between two processes using a dedicated pipe/fd for binary messages. + * This fd is opened as 5th channel in addition to the default stdios (stdin,stdout,stderr, ipc). This means the default channels + * are not blocked and can be used by the respective process for additional custom message handling. + */ +export class IPCChannel implements Channel { + + protected readonly onCloseEmitter: Emitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; + } + + protected readonly onMessageEmitter: Emitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; + } + + protected readonly onErrorEmitter: Emitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; + } + + protected messagePipe: BinaryMessagePipe; + protected toDispose = new DisposableCollection(); + + protected ipcErrorListener: (error: Error) => void = error => this.onErrorEmitter.fire(error); + + constructor(childProcess?: cp.ChildProcess) { + if (childProcess) { + this.setupChildProcess(childProcess); + } else { + this.setupProcess(); + } + this.messagePipe.onMessage(message => { + console.log(`IPChannel: fire on message with ${message.length}`); + this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message)); + }); + this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); + } + + protected setupChildProcess(childProcess: cp.ChildProcess): void { + childProcess.once('exit', code => this.onCloseEmitter.fire({ reason: 'Child process has been terminated', code: code ?? undefined })); + this.messagePipe = new BinaryMessagePipe(childProcess.stdio[4] as Duplex); + childProcess.on('error', this.ipcErrorListener); + this.toDispose.push(Disposable.create(() => { + childProcess.removeListener('error', this.ipcErrorListener); + this.messagePipe.dispose(); + })); + } + + protected setupProcess(): void { + process.once('beforeExit', code => this.onCloseEmitter.fire({ reason: 'Process is about to be terminated', code })); + this.messagePipe = new BinaryMessagePipe(new Socket({ fd: 4 })); + process.on('uncaughtException', this.ipcErrorListener); + this.toDispose.push(Disposable.create(() => { + (process as NodeJS.EventEmitter).removeListener('uncaughtException', this.ipcErrorListener); + this.messagePipe.dispose(); + })); + } + + getWriteBuffer(): WriteBuffer { + const result = new Uint8ArrayWriteBuffer(); + result.onCommit(buffer => { + this.messagePipe.send(buffer); + }); + + return result; + } + + close(): void { + this.toDispose.dispose(); + } + +} diff --git a/packages/core/src/node/messaging/ipc-connection-provider.ts b/packages/core/src/node/messaging/ipc-connection-provider.ts index 84c256997257a..06f57836df6c9 100644 --- a/packages/core/src/node/messaging/ipc-connection-provider.ts +++ b/packages/core/src/node/messaging/ipc-connection-provider.ts @@ -15,10 +15,11 @@ // ***************************************************************************** import * as cp from 'child_process'; +import { inject, injectable } from 'inversify'; import * as path from 'path'; -import { injectable, inject } from 'inversify'; -import { Trace, Tracer, IPCMessageReader, IPCMessageWriter, createMessageConnection, MessageConnection, Message } from 'vscode-ws-jsonrpc'; -import { ILogger, ConnectionErrorHandler, DisposableCollection, Disposable } from '../../common'; +import { createInterface } from 'readline'; +import { Channel, ConnectionErrorHandler, Disposable, DisposableCollection, ILogger } from '../../common'; +import { IPCChannel } from './ipc-channel'; import { createIpcEnv } from './ipc-protocol'; export interface ResolvedIPCConnectionOptions { @@ -40,7 +41,7 @@ export class IPCConnectionProvider { @inject(ILogger) protected readonly logger: ILogger; - listen(options: IPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable { + listen(options: IPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable { return this.doListen({ logger: this.logger, args: [], @@ -48,19 +49,21 @@ export class IPCConnectionProvider { }, acceptor); } - protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable { + protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable { const childProcess = this.fork(options); - const connection = this.createConnection(childProcess, options); + const channel = new IPCChannel(childProcess); const toStop = new DisposableCollection(); const toCancelStop = toStop.push(Disposable.create(() => childProcess.kill())); const errorHandler = options.errorHandler; if (errorHandler) { - connection.onError((e: [Error, Message | undefined, number | undefined]) => { - if (errorHandler.shouldStop(e[0], e[1], e[2])) { + let errorCount = 0; + channel.onError((err: Error) => { + errorCount++; + if (errorHandler.shouldStop(err, errorCount)) { toStop.dispose(); } }); - connection.onClose(() => { + channel.onClose(() => { if (toStop.disposed) { return; } @@ -70,36 +73,15 @@ export class IPCConnectionProvider { } }); } - acceptor(connection); + acceptor(channel); return toStop; } - protected createConnection(childProcess: cp.ChildProcess, options: ResolvedIPCConnectionOptions): MessageConnection { - const reader = new IPCMessageReader(childProcess); - const writer = new IPCMessageWriter(childProcess); - const connection = createMessageConnection(reader, writer, { - error: (message: string) => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${message}`), - warn: (message: string) => this.logger.warn(`[${options.serverName}: ${childProcess.pid}] ${message}`), - info: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`), - log: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`) - }); - const tracer: Tracer = { - log: (message: unknown, data?: string) => this.logger.debug(`[${options.serverName}: ${childProcess.pid}] ${message}` + (typeof data === 'string' ? ' ' + data : '')) - }; - connection.trace(Trace.Verbose, tracer); - this.logger.isDebug().then(isDebug => { - if (!isDebug) { - connection.trace(Trace.Off, tracer); - } - }); - return connection; - } - protected fork(options: ResolvedIPCConnectionOptions): cp.ChildProcess { const forkOptions: cp.ForkOptions = { - silent: true, env: createIpcEnv(options), - execArgv: [] + execArgv: [], + stdio: ['pipe', 'pipe', 'pipe', 'ipc', 'pipe'] }; const inspectArgPrefix = `--${options.serverName}-inspect`; const inspectArg = process.argv.find(v => v.startsWith(inspectArgPrefix)); @@ -108,8 +90,9 @@ export class IPCConnectionProvider { } const childProcess = cp.fork(path.join(__dirname, 'ipc-bootstrap'), options.args, forkOptions); - childProcess.stdout!.on('data', data => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`)); - childProcess.stderr!.on('data', data => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`)); + + createInterface(childProcess.stdout!).on('line', line => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${line}`)); + createInterface(childProcess.stderr!).on('line', line => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${line}`)); this.logger.debug(`[${options.serverName}: ${childProcess.pid}] IPC started`); childProcess.once('exit', () => this.logger.debug(`[${options.serverName}: ${childProcess.pid}] IPC exited`)); diff --git a/packages/core/src/node/messaging/ipc-protocol.ts b/packages/core/src/node/messaging/ipc-protocol.ts index de9a77394b03e..03aa3944521c3 100644 --- a/packages/core/src/node/messaging/ipc-protocol.ts +++ b/packages/core/src/node/messaging/ipc-protocol.ts @@ -15,14 +15,14 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { Channel } from '../../common/message-rpc/channel'; const THEIA_PARENT_PID = 'THEIA_PARENT_PID'; const THEIA_ENTRY_POINT = 'THEIA_ENTRY_POINT'; export const ipcEntryPoint: string | undefined = process.env[THEIA_ENTRY_POINT]; -export type IPCEntryPoint = (connection: MessageConnection) => void; +export type IPCEntryPoint = (connection: Channel) => void; export namespace IPCEntryPoint { /** * Throws if `THEIA_ENTRY_POINT` is undefined or empty. diff --git a/packages/core/src/node/messaging/messaging-contribution.ts b/packages/core/src/node/messaging/messaging-contribution.ts index 2ee8764854780..20dea50a483e4 100644 --- a/packages/core/src/node/messaging/messaging-contribution.ts +++ b/packages/core/src/node/messaging/messaging-contribution.ts @@ -18,19 +18,15 @@ import * as http from 'http'; import * as https from 'https'; import { Server, Socket } from 'socket.io'; import { injectable, inject, named, postConstruct, interfaces, Container } from 'inversify'; -import { MessageConnection } from 'vscode-ws-jsonrpc'; -import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection'; -import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection'; -import * as launch from 'vscode-ws-jsonrpc/lib/server/launch'; import { ContributionProvider, ConnectionHandler, bindContributionProvider } from '../../common'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { BackendApplicationContribution } from '../backend-application'; -import { MessagingService, WebSocketChannelConnection } from './messaging-service'; -import { ConsoleLogger } from './logger'; +import { MessagingService } from './messaging-service'; import { ConnectionContainerModule } from './connection-container-module'; import Route = require('route-parser'); import { WsRequestValidator } from '../ws-request-validators'; import { MessagingListener } from './messaging-listeners'; +import { Channel, ChannelMultiplexer } from '../../common/message-rpc/channel'; export const MessagingContainer = Symbol('MessagingContainer'); @@ -53,7 +49,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me protected readonly messagingListener: MessagingListener; protected readonly wsHandlers = new MessagingContribution.ConnectionHandlers(); - protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); + protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); @postConstruct() protected init(): void { @@ -63,21 +59,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me } } - listen(spec: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void { - this.wsChannel(spec, (params, channel) => { - const connection = createWebSocketConnection(channel, new ConsoleLogger()); - callback(params, connection); - }); - } - - forward(spec: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void { - this.wsChannel(spec, (params, channel) => { - const connection = launch.createWebSocketConnection(channel); - callback(params, WebSocketChannelConnection.create(connection, channel)); - }); - } - - wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: WebSocketChannel) => void): void { + wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void { this.channelHandlers.push(spec, (params, channel) => callback(params, channel)); } @@ -125,49 +107,31 @@ export class MessagingContribution implements BackendApplicationContribution, Me } protected handleChannels(socket: Socket): void { + const socketChannel = new WebSocketChannel(this.toIWebSocket(socket)); + const mulitplexer = new ChannelMultiplexer(socketChannel); const channelHandlers = this.getConnectionChannelHandlers(socket); - const channels = new Map(); - socket.on('message', data => { - try { - const message: WebSocketChannel.Message = JSON.parse(data.toString()); - if (message.kind === 'open') { - const { id, path } = message; - const channel = this.createChannel(id, socket); - if (channelHandlers.route(path, channel)) { - channel.ready(); - console.debug(`Opening channel for service path '${path}'. [ID: ${id}]`); - channels.set(id, channel); - channel.onClose(() => { - console.debug(`Closing channel on service path '${path}'. [ID: ${id}]`); - channels.delete(id); - }); - } else { - console.error('Cannot find a service for the path: ' + path); - } - } else { - const { id } = message; - const channel = channels.get(id); - if (channel) { - channel.handleMessage(message); - } else { - console.error('The ws channel does not exist', id); - } - } - } catch (error) { - console.error('Failed to handle message', { error, data }); + mulitplexer.onDidOpenChannel(event => { + if (channelHandlers.route(event.id, event.channel)) { + console.debug(`Opening channel for service path '${event.id}'.`); + event.channel.onClose(() => console.debug(`Closing channel on service path '${event.id}'.`)); } }); - socket.on('error', err => { - for (const channel of channels.values()) { - channel.fireError(err); - } - }); - socket.on('disconnect', reason => { - for (const channel of channels.values()) { - channel.close(undefined, reason); - } - channels.clear(); - }); + } + + protected toIWebSocket(socket: Socket): IWebSocket { + return { + close: () => { + socket.removeAllListeners('disconnect'); + socket.removeAllListeners('error'); + socket.removeAllListeners('message'); + socket.disconnect(); + }, + isConnected: () => socket.connected, + onClose: cb => socket.on('disconnect', reason => cb(reason)), + onError: cb => socket.on('error', error => cb(error)), + onMessage: cb => socket.on('message', data => cb(data)), + send: message => socket.emit('message', message) + }; } protected createSocketContainer(socket: Socket): Container { @@ -176,7 +140,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me return connectionContainer; } - protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers { + protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers { const connectionContainer = this.createSocketContainer(socket); bindContributionProvider(connectionContainer, ConnectionHandler); connectionContainer.load(...this.connectionModules.getContributions()); @@ -184,21 +148,12 @@ export class MessagingContribution implements BackendApplicationContribution, Me const connectionHandlers = connectionContainer.getNamed>(ContributionProvider, ConnectionHandler); for (const connectionHandler of connectionHandlers.getContributions(true)) { connectionChannelHandlers.push(connectionHandler.path, (_, channel) => { - const connection = createWebSocketConnection(channel, new ConsoleLogger()); - connectionHandler.onConnection(connection); + connectionHandler.onConnection(channel); }); } return connectionChannelHandlers; } - protected createChannel(id: number, socket: Socket): WebSocketChannel { - return new WebSocketChannel(id, content => { - if (socket.connected) { - socket.send(content); - } - }); - } - } export namespace MessagingContribution { diff --git a/packages/core/src/node/messaging/messaging-service.ts b/packages/core/src/node/messaging/messaging-service.ts index 087f6d5850def..276b58734bcff 100644 --- a/packages/core/src/node/messaging/messaging-service.ts +++ b/packages/core/src/node/messaging/messaging-service.ts @@ -15,33 +15,21 @@ // ***************************************************************************** import { Socket } from 'socket.io'; -import { MessageConnection } from 'vscode-ws-jsonrpc'; -import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { Channel } from '../../common/message-rpc/channel'; export interface MessagingService { - /** - * Accept a JSON-RPC connection on the given path. - * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. - */ - listen(path: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void; - /** - * Accept a raw JSON-RPC connection on the given path. - * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. - */ - forward(path: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void; /** * Accept a web socket channel on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. */ - wsChannel(path: string, callback: (params: MessagingService.PathParams, socket: WebSocketChannel) => void): void; + wsChannel(path: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void; /** * Accept a web socket connection on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. * * #### Important - * Prefer JSON-RPC connections or web socket channels over web sockets. Clients can handle only limited amount of web sockets - * and excessive amount can cause performance degradation. All JSON-RPC connections and web socket channels share the single web socket connection. + * Prefer using web socket channels over establishing new web socket connection. Clients can handle only limited amount of web sockets + * and excessive amount can cause performance degradation. All web socket channels share a single web socket connection. */ ws(path: string, callback: (params: MessagingService.PathParams, socket: Socket) => void): void; } @@ -56,18 +44,3 @@ export namespace MessagingService { configure(service: MessagingService): void; } } - -export interface WebSocketChannelConnection extends IConnection { - channel: WebSocketChannel; -} -export namespace WebSocketChannelConnection { - export function is(connection: IConnection): connection is WebSocketChannelConnection { - return (connection as WebSocketChannelConnection).channel instanceof WebSocketChannel; - } - - export function create(connection: IConnection, channel: WebSocketChannel): WebSocketChannelConnection { - const result = connection as WebSocketChannelConnection; - result.channel = channel; - return result; - } -} diff --git a/packages/core/src/node/messaging/test/test-web-socket-channel.ts b/packages/core/src/node/messaging/test/test-web-socket-channel.ts index 2fbb17c9aa8ec..0ef0c50186cee 100644 --- a/packages/core/src/node/messaging/test/test-web-socket-channel.ts +++ b/packages/core/src/node/messaging/test/test-web-socket-channel.ts @@ -16,32 +16,41 @@ import * as http from 'http'; import * as https from 'https'; -import { WebSocketChannel } from '../../../common/messaging/web-socket-channel'; -import { Disposable } from '../../../common/disposable'; import { AddressInfo } from 'net'; -import { io } from 'socket.io-client'; +import { io, Socket } from 'socket.io-client'; +import { Channel, ChannelMultiplexer } from '../../../common/message-rpc/channel'; +import { IWebSocket, WebSocketChannel } from '../../../common/messaging/web-socket-channel'; -export class TestWebSocketChannel extends WebSocketChannel { +export class TestWebSocketChannelSetup { + public readonly multiplexer: ChannelMultiplexer; + public readonly channel: Channel; constructor({ server, path }: { server: http.Server | https.Server, path: string }) { - super(0, content => socket.send(content)); const socket = io(`ws://localhost:${(server.address() as AddressInfo).port}${WebSocketChannel.wsPath}`); - socket.on('error', error => - this.fireError(error) - ); - socket.on('disconnect', reason => - this.fireClose(0, reason) - ); - socket.on('message', data => { - this.handleMessage(JSON.parse(data.toString())); + this.channel = new WebSocketChannel(toIWebSocket(socket)); + this.multiplexer = new ChannelMultiplexer(this.channel); + socket.on('connect', () => { + this.multiplexer.open(path); }); - socket.on('connect', () => - this.open(path) - ); - this.toDispose.push(Disposable.create(() => socket.close())); + socket.connect(); } +} +function toIWebSocket(socket: Socket): IWebSocket { + return { + close: () => { + socket.removeAllListeners('disconnect'); + socket.removeAllListeners('error'); + socket.removeAllListeners('message'); + socket.close(); + }, + isConnected: () => socket.connected, + onClose: cb => socket.on('disconnect', reason => cb(reason)), + onError: cb => socket.on('error', reason => cb(reason)), + onMessage: cb => socket.on('message', data => cb(data)), + send: message => socket.emit('message', message) + }; } diff --git a/packages/debug/src/browser/debug-session-connection.ts b/packages/debug/src/browser/debug-session-connection.ts index 47c5ac06b8b2c..7ecafafcc6b4f 100644 --- a/packages/debug/src/browser/debug-session-connection.ts +++ b/packages/debug/src/browser/debug-session-connection.ts @@ -18,13 +18,9 @@ import { DebugProtocol } from 'vscode-debugprotocol'; import { Deferred } from '@theia/core/lib/common/promise-util'; -import { Event, Emitter, DisposableCollection, Disposable, MaybePromise } from '@theia/core'; +import { Event, Emitter, DisposableCollection, Disposable, MaybePromise, Channel } from '@theia/core'; import { OutputChannel } from '@theia/output/lib/browser/output-channel'; -import { Channel } from '../common/debug-service'; - -export type DebugRequestHandler = (request: DebugProtocol.Request) => MaybePromise; - export interface DebugRequestTypes { 'attach': [DebugProtocol.AttachRequestArguments, DebugProtocol.AttachResponse] 'breakpointLocations': [DebugProtocol.BreakpointLocationsArguments, DebugProtocol.BreakpointLocationsResponse] @@ -116,6 +112,8 @@ const standardDebugEvents = new Set([ 'thread' ]); +export type DebugRequestHandler = (request: DebugProtocol.Request) => MaybePromise; + export class DebugSessionConnection implements Disposable { private sequence = 1; @@ -168,7 +166,7 @@ export class DebugSessionConnection implements Disposable { this.cancelPendingRequests(); this.onDidCloseEmitter.fire(); }); - connection.onMessage(data => this.handleMessage(data)); + connection.onMessage(data => this.handleMessage(data().readString())); return connection; } @@ -247,7 +245,7 @@ export class DebugSessionConnection implements Disposable { const dateStr = `${now.toLocaleString(undefined, { hour12: false })}.${now.getMilliseconds()}`; this.traceOutputChannel.appendLine(`${this.sessionId.substring(0, 8)} ${dateStr} theia -> adapter: ${JSON.stringify(message, undefined, 4)}`); } - connection.send(messageStr); + connection.getWriteBuffer().writeString(messageStr).commit(); } protected handleMessage(data: string): void { diff --git a/packages/debug/src/browser/debug-session-contribution.ts b/packages/debug/src/browser/debug-session-contribution.ts index cf8b5e7424cc0..a14db324cdf81 100644 --- a/packages/debug/src/browser/debug-session-contribution.ts +++ b/packages/debug/src/browser/debug-session-contribution.ts @@ -26,10 +26,11 @@ import { DebugSessionOptions } from './debug-session-options'; import { OutputChannelManager, OutputChannel } from '@theia/output/lib/browser/output-channel'; import { DebugPreferences } from './debug-preferences'; import { DebugSessionConnection } from './debug-session-connection'; -import { Channel, DebugAdapterPath } from '../common/debug-service'; +import { DebugAdapterPath } from '../common/debug-service'; import { ContributionProvider } from '@theia/core/lib/common/contribution-provider'; import { FileService } from '@theia/filesystem/lib/browser/file-service'; import { DebugContribution } from './debug-contribution'; +import { Channel } from '@theia/core/lib/common/message-rpc/channel'; import { WorkspaceService } from '@theia/workspace/lib/browser'; /** diff --git a/packages/debug/src/node/debug-adapter-session.ts b/packages/debug/src/node/debug-adapter-session.ts index 03ff950d38a90..b47e552ae6344 100644 --- a/packages/debug/src/node/debug-adapter-session.ts +++ b/packages/debug/src/node/debug-adapter-session.ts @@ -26,7 +26,7 @@ import { DebugAdapterSession } from './debug-model'; import { DebugProtocol } from 'vscode-debugprotocol'; -import { Channel } from '../common/debug-service'; +import { Channel } from '@theia/core'; /** * [DebugAdapterSession](#DebugAdapterSession) implementation. @@ -53,7 +53,7 @@ export class DebugAdapterSessionImpl implements DebugAdapterSession { throw new Error('The session has already been started, id: ' + this.id); } this.channel = channel; - this.channel.onMessage((message: string) => this.write(message)); + this.channel.onMessage(message => this.write(message().readString())); this.channel.onClose(() => this.channel = undefined); } @@ -80,7 +80,7 @@ export class DebugAdapterSessionImpl implements DebugAdapterSession { protected send(message: string): void { if (this.channel) { - this.channel.send(message); + this.channel.getWriteBuffer().writeString(message); } } diff --git a/packages/debug/src/node/debug-model.ts b/packages/debug/src/node/debug-model.ts index a39352fabbddf..3e6de9ce68fc6 100644 --- a/packages/debug/src/node/debug-model.ts +++ b/packages/debug/src/node/debug-model.ts @@ -25,8 +25,7 @@ import { DebugConfiguration } from '../common/debug-configuration'; import { IJSONSchema, IJSONSchemaSnippet } from '@theia/core/lib/common/json-schema'; import { MaybePromise } from '@theia/core/lib/common/types'; -import { Event } from '@theia/core/lib/common/event'; -import { Channel } from '../common/debug-service'; +import { Channel, Event } from '@theia/core'; // FIXME: break down this file to debug adapter and debug adapter contribution (see Theia file naming conventions) diff --git a/packages/filesystem/src/common/files.ts b/packages/filesystem/src/common/files.ts index 95ee67c57d8de..63f789b9da2a3 100644 --- a/packages/filesystem/src/common/files.ts +++ b/packages/filesystem/src/common/files.ts @@ -846,7 +846,7 @@ export function hasOpenReadWriteCloseCapability(provider: FileSystemProvider): p */ export interface FileSystemProviderWithFileReadStreamCapability extends FileSystemProvider { /** - * Read the contents of the given file as stream. + * Read the contents of the given file as stream. * @param resource The `URI` of the file. * * @return The `ReadableStreamEvents` for the readable stream of the given file. diff --git a/packages/filesystem/src/common/remote-file-system-provider.ts b/packages/filesystem/src/common/remote-file-system-provider.ts index 5edb5dbbad9e7..f67e198db75f7 100644 --- a/packages/filesystem/src/common/remote-file-system-provider.ts +++ b/packages/filesystem/src/common/remote-file-system-provider.ts @@ -42,11 +42,11 @@ export interface RemoteFileSystemServer extends JsonRpcServer; open(resource: string, opts: FileOpenOptions): Promise; close(fd: number): Promise; - read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }>; + read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }>; readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise; - readFile(resource: string): Promise; - write(fd: number, pos: number, data: number[], offset: number, length: number): Promise; - writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise; + readFile(resource: string): Promise; + write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise; + writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise; delete(resource: string, opts: FileDeleteOptions): Promise; mkdir(resource: string): Promise; readdir(resource: string): Promise<[string, FileType][]>; @@ -70,7 +70,7 @@ export interface RemoteFileSystemClient { notifyDidChangeFile(event: { changes: RemoteFileChange[] }): void; notifyFileWatchError(): void; notifyDidChangeCapabilities(capabilities: FileSystemProviderCapabilities): void; - onFileStreamData(handle: number, data: number[]): void; + onFileStreamData(handle: number, data: Uint8Array): void; onFileStreamEnd(handle: number, error: RemoteFileStreamError | undefined): void; } @@ -169,7 +169,7 @@ export class RemoteFileSystemProvider implements Required, D this.onFileWatchErrorEmitter.fire(); }, notifyDidChangeCapabilities: capabilities => this.setCapabilities(capabilities), - onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, Uint8Array.from(data)]), + onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, data]), onFileStreamEnd: (handle, error) => this.onFileStreamEndEmitter.fire([handle, error]) }); const onInitialized = this.server.onDidOpenConnection(() => { @@ -224,7 +224,7 @@ export class RemoteFileSystemProvider implements Required, D async readFile(resource: URI): Promise { const bytes = await this.server.readFile(resource.toString()); - return Uint8Array.from(bytes); + return bytes; } readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents { @@ -264,11 +264,11 @@ export class RemoteFileSystemProvider implements Required, D } write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise { - return this.server.write(fd, pos, [...data.values()], offset, length); + return this.server.write(fd, pos, data, offset, length); } writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise { - return this.server.writeFile(resource.toString(), [...content.values()], opts); + return this.server.writeFile(resource.toString(), content, opts); } delete(resource: URI, opts: FileDeleteOptions): Promise { @@ -412,34 +412,33 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { throw new Error('not supported'); } - async read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }> { + async read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }> { if (hasOpenReadWriteCloseCapability(this.provider)) { const buffer = BinaryBuffer.alloc(this.BUFFER_SIZE); const bytes = buffer.buffer; const bytesRead = await this.provider.read(fd, pos, bytes, 0, length); - return { bytes: [...bytes.values()], bytesRead }; + return { bytes, bytesRead }; } throw new Error('not supported'); } - write(fd: number, pos: number, data: number[], offset: number, length: number): Promise { + write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise { if (hasOpenReadWriteCloseCapability(this.provider)) { - return this.provider.write(fd, pos, Uint8Array.from(data), offset, length); + return this.provider.write(fd, pos, data, offset, length); } throw new Error('not supported'); } - async readFile(resource: string): Promise { + async readFile(resource: string): Promise { if (hasReadWriteCapability(this.provider)) { - const buffer = await this.provider.readFile(new URI(resource)); - return [...buffer.values()]; + return this.provider.readFile(new URI(resource)); } throw new Error('not supported'); } - writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise { + writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise { if (hasReadWriteCapability(this.provider)) { - return this.provider.writeFile(new URI(resource), Uint8Array.from(content), opts); + return this.provider.writeFile(new URI(resource), content, opts); } throw new Error('not supported'); } @@ -497,7 +496,7 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { if (hasFileReadStreamCapability(this.provider)) { const handle = this.readFileStreamSeq++; const stream = this.provider.readFileStream(new URI(resource), opts, token); - stream.on('data', data => this.client?.onFileStreamData(handle, [...data.values()])); + stream.on('data', data => this.client?.onFileStreamData(handle, data)); stream.on('error', error => { const code = error instanceof FileSystemProviderError ? error.code : undefined; const { name, message, stack } = error; diff --git a/packages/plugin-ext/src/common/connection.ts b/packages/plugin-ext/src/common/connection.ts index 48ae3adb36363..ee8e43ea7c639 100644 --- a/packages/plugin-ext/src/common/connection.ts +++ b/packages/plugin-ext/src/common/connection.ts @@ -13,27 +13,38 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '@theia/debug/lib/common/debug-service'; import { ConnectionExt, ConnectionMain } from './plugin-api-rpc'; -import { Emitter } from '@theia/core/lib/common/event'; +import { Emitter, Event } from '@theia/core/lib/common/event'; +import { ChannelCloseEvent, MessageProvider } from '@theia/core/lib/common/message-rpc/channel'; +import { WriteBuffer, Channel } from '@theia/core'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; /** * A channel communicating with a counterpart in a plugin host. */ export class PluginChannel implements Channel { - private messageEmitter: Emitter = new Emitter(); + private messageEmitter: Emitter = new Emitter(); private errorEmitter: Emitter = new Emitter(); - private closedEmitter: Emitter = new Emitter(); + private closedEmitter: Emitter = new Emitter(); constructor( - protected readonly id: string, + readonly id: string, protected readonly connection: ConnectionExt | ConnectionMain) { } + getWriteBuffer(): WriteBuffer { + const result = new Uint8ArrayWriteBuffer(); + result.onCommit(buffer => { + this.connection.$sendMessage(this.id, new Uint8ArrayReadBuffer(buffer).readString()); + }); + + return result; + } + send(content: string): void { this.connection.$sendMessage(this.id, content); } - fireMessageReceived(msg: string): void { + fireMessageReceived(msg: MessageProvider): void { this.messageEmitter.fire(msg); } @@ -42,21 +53,19 @@ export class PluginChannel implements Channel { } fireClosed(): void { - this.closedEmitter.fire(); + this.closedEmitter.fire({ reason: 'Plugin channel has been closed from the extension side' }); } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onMessage(cb: (data: any) => void): void { - this.messageEmitter.event(cb); + get onMessage(): Event { + return this.messageEmitter.event; } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onError(cb: (reason: any) => void): void { - this.errorEmitter.event(cb); + get onError(): Event { + return this.errorEmitter.event; } - onClose(cb: (code: number, reason: string) => void): void { - this.closedEmitter.event(() => cb(-1, 'closed')); + get onClose(): Event { + return this.closedEmitter.event; } close(): void { @@ -80,7 +89,10 @@ export class ConnectionImpl implements ConnectionMain, ConnectionExt { */ async $sendMessage(id: string, message: string): Promise { if (this.connections.has(id)) { - this.connections.get(id)!.fireMessageReceived(message); + const writer = new Uint8ArrayWriteBuffer().writeString(message); + const reader = new Uint8ArrayReadBuffer(writer.getCurrentContents()); + writer.dispose(); + this.connections.get(id)!.fireMessageReceived(() => reader); } else { console.warn(`Received message for unknown connection: ${id}`); } diff --git a/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts b/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts index b16892dccc727..176fdb246a693 100644 --- a/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts +++ b/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts @@ -30,7 +30,7 @@ import { TerminalOptionsExt } from '../../../common/plugin-api-rpc'; import { FileService } from '@theia/filesystem/lib/browser/file-service'; import { DebugContribution } from '@theia/debug/lib/browser/debug-contribution'; import { ContributionProvider } from '@theia/core/lib/common/contribution-provider'; -import { Channel } from '@theia/debug/lib/common/debug-service'; +import { Channel } from '@theia/core/lib/common/message-rpc/channel'; import { WorkspaceService } from '@theia/workspace/lib/browser'; export class PluginDebugSession extends DebugSession { diff --git a/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts b/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts index b05859750198d..b34a73ae201be 100644 --- a/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts +++ b/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts @@ -17,7 +17,7 @@ import { DebugAdapterSessionImpl } from '@theia/debug/lib/node/debug-adapter-session'; import * as theia from '@theia/plugin'; import { DebugAdapter } from '@theia/debug/lib/node/debug-model'; -import { Channel } from '@theia/debug/lib/common/debug-service'; +import { Channel } from '@theia/core/lib/common/message-rpc/channel'; /* eslint-disable @typescript-eslint/no-explicit-any */ diff --git a/packages/task/src/node/task-server.slow-spec.ts b/packages/task/src/node/task-server.slow-spec.ts index fbe968348d9d2..7e63ddd14927c 100644 --- a/packages/task/src/node/task-server.slow-spec.ts +++ b/packages/task/src/node/task-server.slow-spec.ts @@ -28,9 +28,10 @@ import { isWindows, isOSX } from '@theia/core/lib/common/os'; import { FileUri } from '@theia/core/lib/node'; import { terminalsPath } from '@theia/terminal/lib/common/terminal-protocol'; import { expectThrowsAsync } from '@theia/core/lib/common/test/expect'; -import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; +import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; import { expect } from 'chai'; import URI from '@theia/core/lib/common/uri'; +import { RpcProtocol } from '@theia/core'; // test scripts that we bundle with tasks const commandShortRunning = './task'; @@ -106,26 +107,38 @@ describe('Task server / back-end', function (): void { // hook-up to terminal's ws and confirm that it outputs expected tasks' output await new Promise((resolve, reject) => { - const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` }); - channel.onError(reject); - channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`))); - channel.onMessage(msg => { - // check output of task on terminal is what we expect - const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`; - // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected. - // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n` - const currentMessage = msg.toString(); - messages.unshift(currentMessage); - if (currentMessage.indexOf(expected) !== -1) { - resolve(); - channel.close(); - return; - } - if (messages.length >= messagesToWaitFor) { - reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`)); - channel.close(); - } + const setup = new TestWebSocketChannelSetup({ server, path: `${terminalsPath}/${terminalId}` }); + setup.multiplexer.onDidOpenChannel(event => { + const channel = event.channel; + const connection = new RpcProtocol(channel, async (method, args) => { + const error = new Error(`Received unexpected request: ${method} with args: ${args} `); + reject(error); + throw error; + }); + channel.onError(reject); + channel.onClose(() => reject(new Error('Channel has been closed'))); + connection.onNotification(not => { + // check output of task on terminal is what we expect + const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`; + // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected. + // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n` + const currentMessage = not.args[0]; + messages.unshift(currentMessage); + if (currentMessage.indexOf(expected) !== -1) { + resolve(); + channel.close(); + return; + } + if (messages.length >= messagesToWaitFor) { + reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`)); + channel.close(); + } + }); + channel.onMessage(reader => { + + }); }); + }); }); diff --git a/packages/terminal/src/browser/terminal-widget-impl.ts b/packages/terminal/src/browser/terminal-widget-impl.ts index d936f2484ed09..f81326ce6724f 100644 --- a/packages/terminal/src/browser/terminal-widget-impl.ts +++ b/packages/terminal/src/browser/terminal-widget-impl.ts @@ -17,7 +17,7 @@ import { Terminal, RendererType } from 'xterm'; import { FitAddon } from 'xterm-addon-fit'; import { inject, injectable, named, postConstruct } from '@theia/core/shared/inversify'; -import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection } from '@theia/core'; +import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection, RpcProtocol, RequestHandler } from '@theia/core'; import { Widget, Message, WebSocketConnectionProvider, StatefulWidget, isFirefox, MessageLoop, KeyCode, codicon } from '@theia/core/lib/browser'; import { isOSX } from '@theia/core/lib/common'; import { WorkspaceService } from '@theia/workspace/lib/browser'; @@ -26,7 +26,6 @@ import { terminalsPath } from '../common/terminal-protocol'; import { IBaseTerminalServer, TerminalProcessInfo } from '../common/base-terminal-protocol'; import { TerminalWatcher } from '../common/terminal-watcher'; import { TerminalWidgetOptions, TerminalWidget, TerminalDimensions } from './base/terminal-widget'; -import { MessageConnection } from '@theia/core/shared/vscode-ws-jsonrpc'; import { Deferred } from '@theia/core/lib/common/promise-util'; import { TerminalPreferences, TerminalRendererType, isTerminalRendererType, DEFAULT_TERMINAL_RENDERER_TYPE, CursorStyle } from './terminal-preferences'; import { TerminalContribution } from './terminal-contribution'; @@ -58,7 +57,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget protected searchBox: TerminalSearchWidget; protected restored = false; protected closeOnDispose = true; - protected waitForConnection: Deferred | undefined; + protected waitForConnection: Deferred | undefined; protected hoverMessage: HTMLDivElement; protected lastTouchEnd: TouchEvent | undefined; protected isAttachedCloseListener: boolean = false; @@ -506,16 +505,23 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget } this.toDisposeOnConnect.dispose(); this.toDispose.push(this.toDisposeOnConnect); - const waitForConnection = this.waitForConnection = new Deferred(); + const waitForConnection = this.waitForConnection = new Deferred(); this.webSocketConnectionProvider.listen({ path: `${terminalsPath}/${this.terminalId}`, onConnection: connection => { - connection.onNotification('onData', (data: string) => this.write(data)); + const requestHandler: RequestHandler = _method => this.logger.warn('Received an unhandled RPC request from the terminal process'); + + const rpc = new RpcProtocol(connection, requestHandler); + rpc.onNotification(event => { + if (event.method === 'onData') { + this.write(event.args[0]); + } + }); // Excludes the device status code emitted by Xterm.js const sendData = (data?: string) => { if (data && !this.deviceStatusCodes.has(data) && !this.disableEnterWhenAttachCloseListener()) { - return connection.sendRequest('write', data); + return rpc.sendRequest('write', [data]); } }; @@ -523,12 +529,10 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget disposable.push(this.term.onData(sendData)); disposable.push(this.term.onBinary(sendData)); - connection.onDispose(() => disposable.dispose()); + connection.onClose(() => disposable.dispose()); - this.toDisposeOnConnect.push(connection); - connection.listen(); if (waitForConnection) { - waitForConnection.resolve(connection); + waitForConnection.resolve(rpc); } } }, { reconnecting: false }); @@ -578,7 +582,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget sendText(text: string): void { if (this.waitForConnection) { this.waitForConnection.promise.then(connection => - connection.sendRequest('write', text) + connection.sendRequest('write', [text]) ); } } diff --git a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts index 6d39ddd973f20..aa4a54e72deff 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts @@ -20,7 +20,7 @@ import { IShellTerminalServer } from '../common/shell-terminal-protocol'; import * as http from 'http'; import * as https from 'https'; import { terminalsPath } from '../common/terminal-protocol'; -import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; +import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; describe('Terminal Backend Contribution', function (): void { @@ -45,13 +45,19 @@ describe('Terminal Backend Contribution', function (): void { it('is data received from the terminal ws server', async () => { const terminalId = await shellTerminalServer.create({}); await new Promise((resolve, reject) => { - const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` }); + const path = `${terminalsPath}/${terminalId}`; + const { channel, multiplexer } = new TestWebSocketChannelSetup({ server, path }); channel.onError(reject); - channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`))); - channel.onOpen(() => { - resolve(); - channel.close(); + channel.onClose(event => reject(new Error(`channel is closed with '${event.code}' code and '${event.reason}' reason}`))); + + multiplexer.onDidOpenChannel(event => { + if (event.id === path) { + resolve(); + channel.close(); + } }); + }); }); + }); diff --git a/packages/terminal/src/node/terminal-backend-contribution.ts b/packages/terminal/src/node/terminal-backend-contribution.ts index 4675b7a32290c..dea4504e0ffea 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.ts @@ -15,10 +15,11 @@ // ***************************************************************************** import { injectable, inject, named } from '@theia/core/shared/inversify'; -import { ILogger } from '@theia/core/lib/common'; +import { ILogger, RequestHandler } from '@theia/core/lib/common'; import { TerminalProcess, ProcessManager } from '@theia/process/lib/node'; import { terminalsPath } from '../common/terminal-protocol'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; +import { RpcProtocol } from '@theia/core/'; @injectable() export class TerminalBackendContribution implements MessagingService.Contribution { @@ -30,19 +31,27 @@ export class TerminalBackendContribution implements MessagingService.Contributio protected readonly logger: ILogger; configure(service: MessagingService): void { - service.listen(`${terminalsPath}/:id`, (params: { id: string }, connection) => { + service.wsChannel(`${terminalsPath}/:id`, (params: { id: string }, channel) => { const id = parseInt(params.id, 10); const termProcess = this.processManager.get(id); if (termProcess instanceof TerminalProcess) { const output = termProcess.createOutputStream(); - output.on('data', data => connection.sendNotification('onData', data.toString())); - connection.onRequest('write', (data: string) => termProcess.write(data)); - connection.onClose(() => output.dispose()); - connection.listen(); - } else { - connection.dispose(); + // Create a RPC connection to the terminal process + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const requestHandler: RequestHandler = async (method: string, args: any[]) => { + if (method === 'write' && args[0]) { + termProcess.write(args[0]); + } else { + this.logger.warn('Terminal process received a request with an unsupported method or argument', { method, args }); + } + }; + + const rpc = new RpcProtocol(channel, requestHandler); + output.on('data', data => { + rpc.sendNotification('onData', [data]); + }); + channel.onClose(() => output.dispose()); } }); } - } diff --git a/yarn.lock b/yarn.lock index 52802cb3c200a..896f24662ccb3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2177,6 +2177,13 @@ resolved "https://registry.yarnpkg.com/@types/caseless/-/caseless-0.12.2.tgz#f65d3d6389e01eeb458bd54dc8f52b95a9463bc8" integrity sha512-6ckxMjBBD8URvjB6J3NcnuAn5Pkl7t3TizAg+xdlzzQGSPSmBcXf8KoIH0ua/i+tio+ZRUHEXp0HEmvaR4kt0w== +"@types/chai-spies@1.0.3": + version "1.0.3" + resolved "https://registry.yarnpkg.com/@types%2fchai-spies/-/chai-spies-1.0.3.tgz#a52dc61af3853ec9b80965040811d15dfd401542" + integrity sha512-RBZjhVuK7vrg4rWMt04UF5zHYwfHnpk5mIWu3nQvU3AKGDixXzSjZ6v0zke6pBcaJqMv3IBZ5ibLWPMRDL0sLw== + dependencies: + "@types/chai" "*" + "@types/chai-string@^1.4.0": version "1.4.2" resolved "https://registry.yarnpkg.com/@types/chai-string/-/chai-string-1.4.2.tgz#0f116504a666b6c6a3c42becf86634316c9a19ac" @@ -2189,6 +2196,11 @@ resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.1.tgz#e2c6e73e0bdeb2521d00756d099218e9f5d90a04" integrity sha512-/zPMqDkzSZ8t3VtxOa4KPq7uzzW978M9Tvh+j7GHKuo6k6GTLxPJ4J5gE5cjfJ26pnXst0N5Hax8Sr0T2Mi9zQ== +"@types/chai@4.3.0": + version "4.3.0" + resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.0.tgz#23509ebc1fa32f1b4d50d6a66c4032d5b8eaabdc" + integrity sha512-/ceqdqeRraGolFTcfoXNiqjyQhZzbINDngeoAq9GoHa8PPK1yNzTaxWjA6BFWp5Ua9JpXEMSS4s5i9tS0hOJtw== + "@types/component-emitter@^1.2.10": version "1.2.11" resolved "https://registry.yarnpkg.com/@types/component-emitter/-/component-emitter-1.2.11.tgz#50d47d42b347253817a39709fef03ce66a108506" @@ -3900,11 +3912,28 @@ caseless@~0.12.0: resolved "https://registry.yarnpkg.com/caseless/-/caseless-0.12.0.tgz#1b681c21ff84033c826543090689420d187151dc" integrity sha1-G2gcIf+EAzyCZUMJBolCDRhxUdw= +chai-spies@1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/chai-spies/-/chai-spies-1.0.0.tgz#d16b39336fb316d03abf8c375feb23c0c8bb163d" + integrity sha512-elF2ZUczBsFoP07qCfMO/zeggs8pqCf3fZGyK5+2X4AndS8jycZYID91ztD9oQ7d/0tnS963dPkd0frQEThDsg== + chai-string@^1.4.0: version "1.5.0" resolved "https://registry.yarnpkg.com/chai-string/-/chai-string-1.5.0.tgz#0bdb2d8a5f1dbe90bc78ec493c1c1c180dd4d3d2" integrity sha512-sydDC3S3pNAQMYwJrs6dQX0oBQ6KfIPuOZ78n7rocW0eJJlsHPh2t3kwW7xfwYA/1Bf6/arGtSUo16rxR2JFlw== +chai@4.3.4: + version "4.3.4" + resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.4.tgz#b55e655b31e1eac7099be4c08c21964fce2e6c49" + integrity sha512-yS5H68VYOCtN1cjfwumDSuzn/9c+yza4f3reKXlE5rUg7SFcCEy90gJvydNgOYtblyf4Zi6jIWRnXOgErta0KA== + dependencies: + assertion-error "^1.1.0" + check-error "^1.0.2" + deep-eql "^3.0.1" + get-func-name "^2.0.0" + pathval "^1.1.1" + type-detect "^4.0.5" + chai@^4.2.0: version "4.3.6" resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.6.tgz#ffe4ba2d9fa9d6680cc0b370adae709ec9011e9c" @@ -11599,7 +11628,7 @@ vscode-debugprotocol@^1.32.0: resolved "https://registry.yarnpkg.com/vscode-debugprotocol/-/vscode-debugprotocol-1.51.0.tgz#c03168dac778b6c24ce17b3511cb61e89c11b2df" integrity sha512-dzKWTMMyebIMPF1VYMuuQj7gGFq7guR8AFya0mKacu+ayptJfaRuM0mdHCqiOth4FnRP8mPhEroFPx6Ift8wHA== -vscode-jsonrpc@^5.0.0, vscode-jsonrpc@^5.0.1: +vscode-jsonrpc@^5.0.1: version "5.0.1" resolved "https://registry.yarnpkg.com/vscode-jsonrpc/-/vscode-jsonrpc-5.0.1.tgz#9bab9c330d89f43fc8c1e8702b5c36e058a01794" integrity sha512-JvONPptw3GAQGXlVV2utDcHx0BiY34FupW/kI6mZ5x06ER5DdPG/tXWMVHjTNULF5uKPOUUD0SaXg5QaubJL0A== @@ -11652,13 +11681,6 @@ vscode-uri@^2.1.1: resolved "https://registry.yarnpkg.com/vscode-uri/-/vscode-uri-2.1.2.tgz#c8d40de93eb57af31f3c715dd650e2ca2c096f1c" integrity sha512-8TEXQxlldWAuIODdukIb+TR5s+9Ds40eSJrw+1iDDA9IFORPjMELarNQE3myz5XIkWWpdprmJjm1/SxMlWOC8A== -vscode-ws-jsonrpc@^0.2.0: - version "0.2.0" - resolved "https://registry.yarnpkg.com/vscode-ws-jsonrpc/-/vscode-ws-jsonrpc-0.2.0.tgz#5e9c26e10da54a1a235da7d59e74508bbcb8edd9" - integrity sha512-NE9HNRgPjCaPyTJvIudcpyIWPImxwRDtuTX16yks7SAiZgSXigxAiZOvSvVBGmD1G/OMfrFo6BblOtjVR9DdVA== - dependencies: - vscode-jsonrpc "^5.0.0" - w3c-hr-time@^1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/w3c-hr-time/-/w3c-hr-time-1.0.2.tgz#0a89cdf5cc15822df9c360543676963e0cc308cd"