From 1e0484ab6084f8a821ed6ffbd98b506c8b3e9ae4 Mon Sep 17 00:00:00 2001 From: Carey Williams Date: Fri, 11 Nov 2022 18:41:26 +0000 Subject: [PATCH] Revert "Integrate new message-rpc prototype into core messaging API (extensions) (#11011) (#11228)" This reverts commit 83d430846182a2ccd8465bcfd28ba7214c42abf3. --- package.json | 4 - packages/core/README.md | 1 + packages/core/package.json | 5 +- .../core/shared/vscode-ws-jsonrpc/index.d.ts | 1 + .../core/shared/vscode-ws-jsonrpc/index.js | 1 + .../messaging/ws-connection-provider.ts | 59 ++-- .../src/browser/progress-status-bar-item.ts | 2 +- packages/core/src/common/cancellation.ts | 8 - packages/core/src/common/index.ts | 1 - .../src/common/message-rpc/channel.spec.ts | 88 ------ .../core/src/common/message-rpc/channel.ts | 260 ------------------ .../src/common/message-rpc/message-buffer.ts | 99 ------- .../src/common/message-rpc/rpc-protocol.ts | 217 --------------- .../uint8-array-message-buffer.spec.ts | 41 --- .../message-rpc/uint8-array-message-buffer.ts | 206 -------------- .../messaging/abstract-connection-provider.ts | 65 +++-- .../messaging/connection-error-handler.ts | 3 +- packages/core/src/common/messaging/handler.ts | 4 +- .../common/messaging/proxy-factory.spec.ts | 21 +- .../src/common/messaging/proxy-factory.ts | 43 ++- .../common/messaging/web-socket-channel.ts | 214 ++++++++------ .../electron-ipc-connection-provider.ts | 28 +- .../electron-ws-connection-provider.ts | 13 +- .../electron-messaging-contribution.ts | 152 +++++----- .../messaging/electron-messaging-service.ts | 10 +- .../src/node/messaging/binary-message-pipe.ts | 168 ----------- .../core/src/node/messaging/ipc-bootstrap.ts | 14 +- .../node/messaging/ipc-connection-provider.ts | 53 ++-- .../core/src/node/messaging/ipc-protocol.ts | 4 +- .../index.ts => node/messaging/logger.ts} | 27 +- .../node/messaging/messaging-contribution.ts | 68 +++-- .../src/node/messaging/messaging-service.ts | 35 ++- .../messaging/test/test-web-socket-channel.ts | 43 ++- packages/filesystem/src/common/files.ts | 2 +- .../src/common/remote-file-system-provider.ts | 37 +-- .../task/src/node/task-server.slow-spec.ts | 53 ++-- .../src/browser/terminal-widget-impl.ts | 26 +- ...terminal-backend-contribution.slow-spec.ts | 18 +- .../src/node/terminal-backend-contribution.ts | 27 +- yarn.lock | 36 +-- 40 files changed, 573 insertions(+), 1584 deletions(-) create mode 100644 packages/core/shared/vscode-ws-jsonrpc/index.d.ts create mode 100644 packages/core/shared/vscode-ws-jsonrpc/index.js delete mode 100644 packages/core/src/common/message-rpc/channel.spec.ts delete mode 100644 packages/core/src/common/message-rpc/channel.ts delete mode 100644 packages/core/src/common/message-rpc/message-buffer.ts delete mode 100644 packages/core/src/common/message-rpc/rpc-protocol.ts delete mode 100644 packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts delete mode 100644 packages/core/src/common/message-rpc/uint8-array-message-buffer.ts delete mode 100644 packages/core/src/node/messaging/binary-message-pipe.ts rename packages/core/src/{common/message-rpc/index.ts => node/messaging/logger.ts} (64%) diff --git a/package.json b/package.json index ce1e103d0f51a..75bfe960c3945 100644 --- a/package.json +++ b/package.json @@ -10,8 +10,6 @@ "**/@types/node": "14" }, "devDependencies": { - "@types/chai": "4.3.0", - "@types/chai-spies": "1.0.3", "@types/chai-string": "^1.4.0", "@types/jsdom": "^11.0.4", "@types/node": "14", @@ -22,8 +20,6 @@ "@typescript-eslint/eslint-plugin": "^4.8.1", "@typescript-eslint/eslint-plugin-tslint": "^4.8.1", "@typescript-eslint/parser": "^4.8.1", - "chai": "4.3.4", - "chai-spies": "1.0.0", "chai-string": "^1.4.0", "chalk": "4.0.0", "concurrently": "^3.5.0", diff --git a/packages/core/README.md b/packages/core/README.md index 8f9282a7d1e47..6053258469e63 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -99,6 +99,7 @@ export class SomeClass { - `react-virtualized` (from [`react-virtualized@^9.20.0`](https://www.npmjs.com/package/react-virtualized)) - `vscode-languageserver-protocol` (from [`vscode-languageserver-protocol@~3.15.3`](https://www.npmjs.com/package/vscode-languageserver-protocol)) - `vscode-uri` (from [`vscode-uri@^2.1.1`](https://www.npmjs.com/package/vscode-uri)) + - `vscode-ws-jsonrpc` (from [`vscode-ws-jsonrpc@^0.2.0`](https://www.npmjs.com/package/vscode-ws-jsonrpc)) - `dompurify` (from [`dompurify@^2.2.9`](https://www.npmjs.com/package/dompurify)) - `express` (from [`express@^4.16.3`](https://www.npmjs.com/package/express)) - `lodash.debounce` (from [`lodash.debounce@^4.0.8`](https://www.npmjs.com/package/lodash.debounce)) diff --git a/packages/core/package.json b/packages/core/package.json index 20302e7ac149e..70b87509ba5ee 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -61,6 +61,7 @@ "react-dom": "^16.8.0", "react-tooltip": "^4.2.21", "react-virtualized": "^9.20.0", + "reconnecting-websocket": "^4.2.0", "reflect-metadata": "^0.1.10", "route-parser": "^0.0.5", "safer-buffer": "^2.1.2", @@ -69,6 +70,7 @@ "uuid": "^8.3.2", "vscode-languageserver-protocol": "~3.15.3", "vscode-uri": "^2.1.1", + "vscode-ws-jsonrpc": "^0.2.0", "ws": "^7.1.2", "yargs": "^15.3.1" }, @@ -111,7 +113,8 @@ "react-dom", "react-virtualized", "vscode-languageserver-protocol", - "vscode-uri" + "vscode-uri", + "vscode-ws-jsonrpc" ], "export =": [ "dompurify as DOMPurify", diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.d.ts b/packages/core/shared/vscode-ws-jsonrpc/index.d.ts new file mode 100644 index 0000000000000..b11ff897103ed --- /dev/null +++ b/packages/core/shared/vscode-ws-jsonrpc/index.d.ts @@ -0,0 +1 @@ +export * from 'vscode-ws-jsonrpc'; diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.js b/packages/core/shared/vscode-ws-jsonrpc/index.js new file mode 100644 index 0000000000000..3aed560b82d62 --- /dev/null +++ b/packages/core/shared/vscode-ws-jsonrpc/index.js @@ -0,0 +1 @@ +module.exports = require('vscode-ws-jsonrpc'); diff --git a/packages/core/src/browser/messaging/ws-connection-provider.ts b/packages/core/src/browser/messaging/ws-connection-provider.ts index b44e34841ee2c..f83aabda22826 100644 --- a/packages/core/src/browser/messaging/ws-connection-provider.ts +++ b/packages/core/src/browser/messaging/ws-connection-provider.ts @@ -15,11 +15,11 @@ // ***************************************************************************** import { injectable, interfaces, decorate, unmanaged } from 'inversify'; -import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../../common'; +import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { Endpoint } from '../endpoint'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { io, Socket } from 'socket.io-client'; -import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; decorate(injectable(), JsonRpcProxyFactory); decorate(unmanaged(), JsonRpcProxyFactory, 0); @@ -53,42 +53,26 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider { - this.initializeMultiplexer(); - if (this.reconnectChannelOpeners.length > 0) { - this.reconnectChannelOpeners.forEach(opener => opener()); - this.reconnectChannelOpeners = []; - } - this.socket.on('disconnect', () => this.fireSocketDidClose()); - this.socket.on('message', () => this.onIncomingMessageActivityEmitter.fire(undefined)); + const socket = this.createWebSocket(url); + socket.on('connect', () => { this.fireSocketDidOpen(); }); - this.socket.connect(); - } - - protected createMainChannel(): Channel { - return new WebSocketChannel(this.toIWebSocket(this.socket)); - } - - protected toIWebSocket(socket: Socket): IWebSocket { - return { - close: () => { - socket.removeAllListeners('disconnect'); - socket.removeAllListeners('error'); - socket.removeAllListeners('message'); - }, - isConnected: () => socket.connected, - onClose: cb => socket.on('disconnect', reason => cb(reason)), - onError: cb => socket.on('error', reason => cb(reason)), - onMessage: cb => socket.on('message', data => cb(data)), - send: message => socket.emit('message', message) - }; + socket.on('disconnect', reason => { + for (const channel of [...this.channels.values()]) { + channel.close(undefined, reason); + } + this.fireSocketDidClose(); + }); + socket.on('message', data => { + this.handleIncomingRawMessage(data); + }); + socket.connect(); + this.socket = socket; } - override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { + override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void { if (this.socket.connected) { - return super.openChannel(path, handler, options); + super.openChannel(path, handler, options); } else { const openChannel = () => { this.socket.off('connect', openChannel); @@ -98,6 +82,14 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider { + if (this.socket.connected) { + this.socket.send(content); + } + }); + } + /** * @param path The handler to reach in the backend. */ @@ -151,4 +143,3 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => { - const leftWrite = new Uint8ArrayWriteBuffer(); - leftWrite.onCommit(buffer => { - this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); - }); - return leftWrite; - }); - readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => { - const rightWrite = new Uint8ArrayWriteBuffer(); - rightWrite.onCommit(buffer => { - this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); - }); - return rightWrite; - }); -} -describe('Message Channel', () => { - describe('Channel multiplexer', () => { - it('should forward messages to intended target channel', async () => { - const pipe = new ChannelPipe(); - - const leftMultiplexer = new ChannelMultiplexer(pipe.left); - const rightMultiplexer = new ChannelMultiplexer(pipe.right); - const openChannelSpy = spy(() => { - }); - - rightMultiplexer.onDidOpenChannel(openChannelSpy); - leftMultiplexer.onDidOpenChannel(openChannelSpy); - - const leftFirst = await leftMultiplexer.open('first'); - const leftSecond = await leftMultiplexer.open('second'); - - const rightFirst = rightMultiplexer.getOpenChannel('first'); - const rightSecond = rightMultiplexer.getOpenChannel('second'); - - assert.isNotNull(rightFirst); - assert.isNotNull(rightSecond); - - const leftSecondSpy = spy((buf: MessageProvider) => { - const message = buf().readString(); - expect(message).equal('message for second'); - }); - - leftSecond.onMessage(leftSecondSpy); - - const rightFirstSpy = spy((buf: MessageProvider) => { - const message = buf().readString(); - expect(message).equal('message for first'); - }); - - rightFirst!.onMessage(rightFirstSpy); - - leftFirst.getWriteBuffer().writeString('message for first').commit(); - rightSecond!.getWriteBuffer().writeString('message for second').commit(); - - expect(leftSecondSpy).to.be.called(); - expect(rightFirstSpy).to.be.called(); - - expect(openChannelSpy).to.be.called.exactly(4); - }); - }); -}); diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts deleted file mode 100644 index 5d9fc6985cc07..0000000000000 --- a/packages/core/src/common/message-rpc/channel.ts +++ /dev/null @@ -1,260 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** - -import { Disposable, DisposableCollection } from '../disposable'; -import { Emitter, Event } from '../event'; -import { ReadBuffer, WriteBuffer } from './message-buffer'; - -/** - * A channel is a bidirectional communications channel with lifecycle and - * error signalling. Note that creation of channels is specific to particular - * implementations and thus not part of the protocol. - */ -export interface Channel { - - /** - * The remote side has closed the channel - */ - onClose: Event; - - /** - * An error has occurred while writing to or reading from the channel - */ - onError: Event; - - /** - * A message has arrived and can be read by listeners using a {@link MessageProvider}. - */ - onMessage: Event; - - /** - * Obtain a {@link WriteBuffer} to write a message to the channel. - */ - getWriteBuffer(): WriteBuffer; - - /** - * Close this channel. No {@link onClose} event should be sent - */ - close(): void; -} - -/** - * The event that is emitted when a channel is closed from the remote side. - */ -export interface ChannelCloseEvent { - reason: string, - code?: number -}; - -/** - * The `MessageProvider` is emitted when a channel receives a new message. - * Listeners can invoke the provider to obtain a new {@link ReadBuffer} for the received message. - * This ensures that each listener has its own isolated {@link ReadBuffer} instance. - * - */ -export type MessageProvider = () => ReadBuffer; - -/** - * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to - * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}. - */ -export class ForwardingChannel implements Channel { - - protected toDispose = new DisposableCollection(); - constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) { - this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]); - } - - onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - }; - - onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - }; - - onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - }; - - getWriteBuffer(): WriteBuffer { - return this.writeBufferSource(); - } - - close(): void { - this.closeHandler(); - this.toDispose.dispose(); - } -} - -/** - * The different message types used in the messaging protocol of the {@link ChannelMultiplexer} - */ -export enum MessageTypes { - Open = 1, - Close = 2, - AckOpen = 3, - Data = 4 -} - -/** - * The write buffers in this implementation immediately write to the underlying - * channel, so we rely on writers to the multiplexed channels to always commit their - * messages and always in one go. - */ -export class ChannelMultiplexer implements Disposable { - protected pendingOpen: Map void> = new Map(); - protected openChannels: Map = new Map(); - - protected readonly onOpenChannelEmitter = new Emitter<{ id: string, channel: Channel }>(); - get onDidOpenChannel(): Event<{ id: string, channel: Channel }> { - return this.onOpenChannelEmitter.event; - } - - protected toDispose = new DisposableCollection(); - - constructor(protected readonly underlyingChannel: Channel) { - this.toDispose.pushAll([ - this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer())), - this.underlyingChannel.onClose(event => this.onUnderlyingChannelClose(event)), - this.underlyingChannel.onError(error => this.handleError(error)), - this.onOpenChannelEmitter - ]); - } - - protected handleError(error: unknown): void { - this.openChannels.forEach(channel => { - channel.onErrorEmitter.fire(error); - }); - } - - onUnderlyingChannelClose(event?: ChannelCloseEvent): void { - if (!this.toDispose.disposed) { - this.toDispose.push(Disposable.create(() => { - this.pendingOpen.clear(); - this.openChannels.forEach(channel => { - channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' }); - }); - - this.openChannels.clear(); - })); - this.dispose(); - } - - } - - protected handleMessage(buffer: ReadBuffer): void { - const type = buffer.readUint8(); - const id = buffer.readString(); - switch (type) { - case MessageTypes.AckOpen: { - return this.handleAckOpen(id); - } - case MessageTypes.Open: { - return this.handleOpen(id); - } - case MessageTypes.Close: { - return this.handleClose(id); - } - case MessageTypes.Data: { - return this.handleData(id, buffer.sliceAtReadPosition()); - } - } - } - - protected handleAckOpen(id: string): void { - // edge case: both side try to open a channel at the same time. - const resolve = this.pendingOpen.get(id); - if (resolve) { - const channel = this.createChannel(id); - this.pendingOpen.delete(id); - this.openChannels.set(id, channel); - resolve!(channel); - this.onOpenChannelEmitter.fire({ id, channel }); - } - } - - protected handleOpen(id: string): void { - if (!this.openChannels.has(id)) { - const channel = this.createChannel(id); - this.openChannels.set(id, channel); - const resolve = this.pendingOpen.get(id); - if (resolve) { - // edge case: both side try to open a channel at the same time. - resolve(channel); - } - this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit(); - this.onOpenChannelEmitter.fire({ id, channel }); - } - } - - protected handleClose(id: string): void { - const channel = this.openChannels.get(id); - if (channel) { - channel.onCloseEmitter.fire({ reason: 'Channel has been closed from the remote side' }); - this.openChannels.delete(id); - } - } - - protected handleData(id: string, data: ReadBuffer): void { - const channel = this.openChannels.get(id); - if (channel) { - channel.onMessageEmitter.fire(() => data); - } - } - - protected createChannel(id: string): ForwardingChannel { - return new ForwardingChannel(id, () => this.closeChannel(id), () => this.prepareWriteBuffer(id)); - } - - // Prepare the write buffer for the channel with the give, id. The channel id has to be encoded - // and written to the buffer before the actual message. - protected prepareWriteBuffer(id: string): WriteBuffer { - const underlying = this.underlyingChannel.getWriteBuffer(); - underlying.writeUint8(MessageTypes.Data); - underlying.writeString(id); - return underlying; - } - - protected closeChannel(id: string): void { - this.underlyingChannel.getWriteBuffer() - .writeUint8(MessageTypes.Close) - .writeString(id) - .commit(); - - this.openChannels.delete(id); - } - - open(id: string): Promise { - const result = new Promise((resolve, reject) => { - this.pendingOpen.set(id, resolve); - }); - this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit(); - return result; - } - - getOpenChannel(id: string): Channel | undefined { - return this.openChannels.get(id); - } - - dispose(): void { - this.toDispose.dispose(); - } -} - diff --git a/packages/core/src/common/message-rpc/message-buffer.ts b/packages/core/src/common/message-rpc/message-buffer.ts deleted file mode 100644 index 396ba95d93d73..0000000000000 --- a/packages/core/src/common/message-rpc/message-buffer.ts +++ /dev/null @@ -1,99 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** - -/** - * A buffer maintaining a write position capable of writing primitive values - */ -export interface WriteBuffer { - writeUint8(byte: number): this - writeUint16(value: number): this - writeUint32(value: number): this - writeString(value: string): this - writeBytes(value: Uint8Array): this - writeNumber(value: number): this - writeLength(value: number): this - /** - * Makes any writes to the buffer permanent, for example by sending the writes over a channel. - * You must obtain a new write buffer after committing - */ - commit(): void; -} - -export class ForwardingWriteBuffer implements WriteBuffer { - constructor(protected readonly underlying: WriteBuffer) { - } - - writeUint8(byte: number): this { - this.underlying.writeUint8(byte); - return this; - } - - writeUint16(value: number): this { - this.underlying.writeUint16(value); - return this; - } - - writeUint32(value: number): this { - this.underlying.writeUint32(value); - return this; - } - - writeLength(value: number): this { - this.underlying.writeLength(value); - return this; - } - - writeString(value: string): this { - this.underlying.writeString(value); - return this; - } - - writeBytes(value: Uint8Array): this { - this.underlying.writeBytes(value); - return this; - } - - writeNumber(value: number): this { - this.underlying.writeNumber(value); - return this; - } - - commit(): void { - this.underlying.commit(); - } -} - -/** - * A buffer maintaining a read position in a buffer containing a received message capable of - * reading primitive values. - */ -export interface ReadBuffer { - readUint8(): number; - readUint16(): number; - readUint32(): number; - readString(): string; - readNumber(): number, - readLength(): number, - readBytes(): Uint8Array; - - /** - * Returns a new read buffer whose starting read position is the current read position of this buffer. - * This is useful to create read buffers sub messages. - * (e.g. when using a multiplexer the beginning of the message might contain some protocol overhead which should not be part - * of the message reader that is sent to the target channel) - */ - sliceAtReadPosition(): ReadBuffer -} diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts deleted file mode 100644 index 6e037e6e8befd..0000000000000 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ /dev/null @@ -1,217 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** -/* eslint-disable @typescript-eslint/no-explicit-any */ - -import { CancellationToken, CancellationTokenSource } from '../cancellation'; -import { DisposableCollection } from '../disposable'; -import { Emitter, Event } from '../event'; -import { Deferred } from '../promise-util'; -import { Channel } from './channel'; -import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; -import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; - -/** - * Handles request messages received by the {@link RpcServer}. - */ -export type RequestHandler = (method: string, args: any[]) => Promise; - -/** - * Initialization options for a {@link RpcProtocol}. - */ -export interface RpcProtocolOptions { - /** - * The message encoder that should be used. If `undefined` the default {@link RpcMessageEncoder} will be used. - */ - encoder?: RpcMessageEncoder, - /** - * The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used. - */ - decoder?: RpcMessageDecoder -} - -/** - * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send - * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side. - * Clients can get a promise for a remote request result that will be either resolved or - * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request - * Currently, there is no timeout handling for long running requests implemented. - */ -export class RpcProtocol { - static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token'; - - protected readonly pendingRequests: Map> = new Map(); - - protected nextMessageId: number = 0; - - protected readonly encoder: RpcMessageEncoder; - protected readonly decoder: RpcMessageDecoder; - - protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter(); - protected readonly cancellationTokenSources = new Map(); - - get onNotification(): Event<{ method: string; args: any[]; }> { - return this.onNotificationEmitter.event; - } - - protected toDispose = new DisposableCollection(); - - constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { - this.encoder = options.encoder ?? new RpcMessageEncoder(); - this.decoder = options.decoder ?? new RpcMessageDecoder(); - this.toDispose.push(this.onNotificationEmitter); - this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); - channel.onClose(() => this.toDispose.dispose()); - } - - handleMessage(message: RpcMessage): void { - switch (message.type) { - case RpcMessageType.Cancel: { - this.handleCancel(message.id); - break; - } - case RpcMessageType.Request: { - this.handleRequest(message.id, message.method, message.args); - break; - } - case RpcMessageType.Notification: { - this.handleNotify(message.id, message.method, message.args); - break; - } - case RpcMessageType.Reply: { - this.handleReply(message.id, message.res); - break; - } - case RpcMessageType.ReplyErr: { - this.handleReplyErr(message.id, message.err); - break; - } - } - } - - protected handleReply(id: number, value: any): void { - const replyHandler = this.pendingRequests.get(id); - if (replyHandler) { - this.pendingRequests.delete(id); - replyHandler.resolve(value); - } else { - throw new Error(`No reply handler for reply with id: ${id}`); - } - } - - protected handleReplyErr(id: number, error: any): void { - try { - const replyHandler = this.pendingRequests.get(id); - if (replyHandler) { - this.pendingRequests.delete(id); - replyHandler.reject(error); - } else { - throw new Error(`No reply handler for error reply with id: ${id}`); - } - } catch (err) { - throw err; - } - } - - sendRequest(method: string, args: any[]): Promise { - const id = this.nextMessageId++; - const reply = new Deferred(); - - // The last element of the request args might be a cancellation token. As these tokens are not serializable we have to remove it from the - // args array and the `CANCELLATION_TOKEN_KEY` string instead. - const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined; - if (cancellationToken && cancellationToken.isCancellationRequested) { - return Promise.reject(this.cancelError()); - } - - if (cancellationToken) { - args.push(RpcProtocol.CANCELLATION_TOKEN_KEY); - cancellationToken.onCancellationRequested(() => { - this.sendCancel(id); - this.pendingRequests.get(id)?.reject(this.cancelError()); - } - ); - } - this.pendingRequests.set(id, reply); - - const output = this.channel.getWriteBuffer(); - this.encoder.request(output, id, method, args); - output.commit(); - return reply.promise; - } - - sendNotification(method: string, args: any[]): void { - const output = this.channel.getWriteBuffer(); - this.encoder.notification(output, this.nextMessageId++, method, args); - output.commit(); - } - - sendCancel(requestId: number): void { - const output = this.channel.getWriteBuffer(); - this.encoder.cancel(output, requestId); - output.commit(); - } - - cancelError(): Error { - const error = new Error('"Request has already been canceled by the sender"'); - error.name = 'Cancel'; - return error; - } - - protected handleCancel(id: number): void { - const cancellationTokenSource = this.cancellationTokenSources.get(id); - if (cancellationTokenSource) { - this.cancellationTokenSources.delete(id); - cancellationTokenSource.cancel(); - } - } - - protected async handleRequest(id: number, method: string, args: any[]): Promise { - const output = this.channel.getWriteBuffer(); - - // Check if the last argument of the received args is the key for indicating that a cancellation token should be used - // If so remove the key from the args and create a new cancellation token. - const addToken = args.length && args[args.length - 1] === RpcProtocol.CANCELLATION_TOKEN_KEY ? args.pop() : false; - if (addToken) { - const tokenSource = new CancellationTokenSource(); - this.cancellationTokenSources.set(id, tokenSource); - args.push(tokenSource.token); - } - - try { - const result = await this.requestHandler(method, args); - this.cancellationTokenSources.delete(id); - this.encoder.replyOK(output, id, result); - output.commit(); - } catch (err) { - // In case of an error the output buffer might already contains parts of an message. - // => Dispose the current buffer and retrieve a new, clean one for writing the response error. - if (output instanceof Uint8ArrayWriteBuffer) { - output.dispose(); - } - const errorOutput = this.channel.getWriteBuffer(); - this.cancellationTokenSources.delete(id); - this.encoder.replyErr(errorOutput, id, err); - errorOutput.commit(); - } - } - - protected async handleNotify(id: number, method: string, args: any[]): Promise { - if (this.toDispose.disposed) { - return; - } - this.onNotificationEmitter.fire({ method, args }); - } -} diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts deleted file mode 100644 index 59cccbf90a605..0000000000000 --- a/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts +++ /dev/null @@ -1,41 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** -import { expect } from 'chai'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; - -describe('array message buffer tests', () => { - it('basic read write test', () => { - const buffer = new Uint8Array(1024); - const writer = new Uint8ArrayWriteBuffer(buffer); - - writer.writeUint8(8); - writer.writeUint32(10000); - writer.writeBytes(new Uint8Array([1, 2, 3, 4])); - writer.writeString('this is a string'); - writer.writeString('another string'); - writer.commit(); - - const written = writer.getCurrentContents(); - - const reader = new Uint8ArrayReadBuffer(written); - - expect(reader.readUint8()).equal(8); - expect(reader.readUint32()).equal(10000); - expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4])); - expect(reader.readString()).equal('this is a string'); - expect(reader.readString()).equal('another string'); - }); -}); diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts deleted file mode 100644 index feec31dcd69cf..0000000000000 --- a/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts +++ /dev/null @@ -1,206 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** -import { Disposable } from '../disposable'; -import { Emitter, Event } from '../event'; -import { ReadBuffer, WriteBuffer } from './message-buffer'; - -/** - * The default {@link WriteBuffer} implementation. Uses a {@link Uint8Array} for buffering. - * The {@link Uint8ArrayWriteBuffer.onCommit} hook can be used to rect to on-commit events. - * After the {@link Uint8ArrayWriteBuffer.commit} method has been called the buffer is disposed - * and can no longer be used for writing data. If the writer buffer is no longer needed but the message - * has not been committed yet it has to be disposed manually. - */ -export class Uint8ArrayWriteBuffer implements WriteBuffer, Disposable { - - private encoder = new TextEncoder(); - private msg: DataView; - private isDisposed = false; - private offset: number; - - constructor(private buffer: Uint8Array = new Uint8Array(1024), writePosition: number = 0) { - this.offset = buffer.byteOffset + writePosition; - this.msg = new DataView(buffer.buffer); - } - - ensureCapacity(value: number): this { - let newLength = this.buffer.byteLength; - while (newLength < this.offset + value) { - newLength *= 2; - } - if (newLength !== this.buffer.byteLength) { - const newBuffer = new Uint8Array(newLength); - newBuffer.set(this.buffer); - this.buffer = newBuffer; - this.msg = new DataView(this.buffer.buffer); - } - return this; - } - - writeLength(length: number): this { - if (length < 0 || (length % 1) !== 0) { - throw new Error(`Could not write the given length value. '${length}' is not an integer >= 0`); - } - if (length < 127) { - this.writeUint8(length); - } else { - this.writeUint8(128 + (length & 127)); - this.writeLength(length >> 7); - } - return this; - } - - writeNumber(value: number): this { - this.ensureCapacity(8); - this.msg.setFloat64(this.offset, value); - this.offset += 8; - return this; - } - - writeUint8(value: number): this { - this.ensureCapacity(1); - this.buffer[this.offset++] = value; - return this; - } - - writeUint16(value: number): this { - this.ensureCapacity(2); - this.msg.setUint16(this.offset, value); - this.offset += 2; - return this; - } - - writeUint32(value: number): this { - this.ensureCapacity(4); - this.msg.setUint32(this.offset, value); - this.offset += 4; - return this; - } - - writeString(value: string): this { - this.ensureCapacity(4 * value.length); - const result = this.encoder.encodeInto(value, this.buffer.subarray(this.offset + 4)); - this.msg.setUint32(this.offset, result.written!); - this.offset += 4 + result.written!; - return this; - } - - writeBytes(value: Uint8Array): this { - this.writeLength(value.byteLength); - this.ensureCapacity(value.length); - this.buffer.set(value, this.offset); - this.offset += value.length; - return this; - } - - private onCommitEmitter = new Emitter(); - get onCommit(): Event { - return this.onCommitEmitter.event; - } - - commit(): void { - if (this.isDisposed) { - throw new Error("Could not invoke 'commit'. The WriteBuffer is already disposed."); - } - this.onCommitEmitter.fire(this.getCurrentContents()); - this.dispose(); - } - - getCurrentContents(): Uint8Array { - return this.buffer.slice(this.buffer.byteOffset, this.offset); - } - - dispose(): void { - if (!this.isDisposed) { - this.onCommitEmitter.dispose(); - this.isDisposed = true; - } - } - -} -/** - * The default {@link ReadBuffer} implementation. Uses a {@link Uint8Array} for buffering. - * Is for single message read. A message can only be read once. - */ -export class Uint8ArrayReadBuffer implements ReadBuffer { - private offset: number = 0; - private msg: DataView; - private decoder = new TextDecoder(); - - constructor(private readonly buffer: Uint8Array, readPosition = 0) { - this.offset = buffer.byteOffset + readPosition; - this.msg = new DataView(buffer.buffer); - } - - readUint8(): number { - return this.msg.getUint8(this.offset++); - } - - readUint16(): number { - const result = this.msg.getUint16(this.offset); - this.offset += 2; - return result; - } - - readUint32(): number { - const result = this.msg.getUint32(this.offset); - this.offset += 4; - return result; - } - - readLength(): number { - let shift = 0; - let byte = this.readUint8(); - let value = (byte & 127) << shift; - while (byte > 127) { - shift += 7; - byte = this.readUint8(); - value = value + ((byte & 127) << shift); - } - return value; - } - - readNumber(): number { - const result = this.msg.getFloat64(this.offset); - this.offset += 8; - return result; - } - - readString(): string { - const len = this.readUint32(); - const sliceOffset = this.offset - this.buffer.byteOffset; - const result = this.decodeString(this.buffer.slice(sliceOffset, sliceOffset + len)); - this.offset += len; - return result; - } - - private decodeString(buf: Uint8Array): string { - return this.decoder.decode(buf); - } - - readBytes(): Uint8Array { - const length = this.readLength(); - const sliceOffset = this.offset - this.buffer.byteOffset; - const result = this.buffer.slice(sliceOffset, sliceOffset + length); - this.offset += length; - return result; - } - - sliceAtReadPosition(): ReadBuffer { - const sliceOffset = this.offset - this.buffer.byteOffset; - return new Uint8ArrayReadBuffer(this.buffer, sliceOffset); - } -} diff --git a/packages/core/src/common/messaging/abstract-connection-provider.ts b/packages/core/src/common/messaging/abstract-connection-provider.ts index 64f29bf185178..d4c5c3bd3aaf3 100644 --- a/packages/core/src/common/messaging/abstract-connection-provider.ts +++ b/packages/core/src/common/messaging/abstract-connection-provider.ts @@ -15,10 +15,11 @@ // ***************************************************************************** import { injectable, interfaces } from 'inversify'; +import { ConsoleLogger, createWebSocketConnection, Logger } from 'vscode-ws-jsonrpc'; import { Emitter, Event } from '../event'; import { ConnectionHandler } from './handler'; import { JsonRpcProxy, JsonRpcProxyFactory } from './proxy-factory'; -import { Channel, ChannelMultiplexer } from '../message-rpc/channel'; +import { WebSocketChannel } from './web-socket-channel'; /** * Factor common logic according to `ElectronIpcConnectionProvider` and @@ -44,6 +45,9 @@ export abstract class AbstractConnectionProvider throw new Error('abstract'); } + protected channelIdSeq = 0; + protected readonly channels = new Map(); + protected readonly onIncomingMessageActivityEmitter: Emitter = new Emitter(); get onIncomingMessageActivity(): Event { return this.onIncomingMessageActivityEmitter.event; @@ -71,45 +75,50 @@ export abstract class AbstractConnectionProvider return factory.createProxy(); } - protected channelMultiPlexer?: ChannelMultiplexer; - - // A set of channel opening functions that are executed if the backend reconnects to restore the - // the channels that were open before the disconnect occurred. - protected reconnectChannelOpeners: Array<() => Promise> = []; - - protected initializeMultiplexer(): void { - const mainChannel = this.createMainChannel(); - mainChannel.onMessage(() => this.onIncomingMessageActivityEmitter.fire()); - this.channelMultiPlexer = new ChannelMultiplexer(mainChannel); - } - /** * Install a connection handler for the given path. */ listen(handler: ConnectionHandler, options?: AbstractOptions): void { this.openChannel(handler.path, channel => { - handler.onConnection(channel); + const connection = createWebSocketConnection(channel, this.createLogger()); + connection.onDispose(() => channel.close()); + handler.onConnection(connection); }, options); } - async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise { - if (!this.channelMultiPlexer) { - throw new Error('The channel multiplexer has not been initialized yet!'); - } - const newChannel = await this.channelMultiPlexer.open(path); - newChannel.onClose(() => { - const { reconnecting } = { reconnecting: true, ...options }; - if (reconnecting) { - this.reconnectChannelOpeners.push(() => this.openChannel(path, handler, options)); + openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: AbstractOptions): void { + const id = this.channelIdSeq++; + const channel = this.createChannel(id); + this.channels.set(id, channel); + channel.onClose(() => { + if (this.channels.delete(channel.id)) { + const { reconnecting } = { reconnecting: true, ...options }; + if (reconnecting) { + this.openChannel(path, handler, options); + } + } else { + console.error('The ws channel does not exist', channel.id); } }); + channel.onOpen(() => handler(channel)); + channel.open(path); + } + + protected abstract createChannel(id: number): WebSocketChannel; - handler(newChannel); + protected handleIncomingRawMessage(data: string): void { + const message: WebSocketChannel.Message = JSON.parse(data); + const channel = this.channels.get(message.id); + if (channel) { + channel.handleMessage(message); + } else { + console.error('The ws channel does not exist', message.id); + } + this.onIncomingMessageActivityEmitter.fire(undefined); } - /** - * Create the main connection that is used for multiplexing all service channels. - */ - protected abstract createMainChannel(): Channel; + protected createLogger(): Logger { + return new ConsoleLogger(); + } } diff --git a/packages/core/src/common/messaging/connection-error-handler.ts b/packages/core/src/common/messaging/connection-error-handler.ts index 89a27b60a50db..aecfe68901e24 100644 --- a/packages/core/src/common/messaging/connection-error-handler.ts +++ b/packages/core/src/common/messaging/connection-error-handler.ts @@ -14,6 +14,7 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** +import { Message } from 'vscode-ws-jsonrpc'; import { ILogger } from '../../common'; export interface ResolvedConnectionErrorHandlerOptions { @@ -50,7 +51,7 @@ export class ConnectionErrorHandler { }; } - shouldStop(error: Error, count?: number): boolean { + shouldStop(error: Error, message?: Message, count?: number): boolean { return !count || count > this.options.maxErrors; } diff --git a/packages/core/src/common/messaging/handler.ts b/packages/core/src/common/messaging/handler.ts index 1e790d38aeec3..ed03d9d331206 100644 --- a/packages/core/src/common/messaging/handler.ts +++ b/packages/core/src/common/messaging/handler.ts @@ -14,11 +14,11 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '../message-rpc/channel'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; export const ConnectionHandler = Symbol('ConnectionHandler'); export interface ConnectionHandler { readonly path: string; - onConnection(connection: Channel): void; + onConnection(connection: MessageConnection): void; } diff --git a/packages/core/src/common/messaging/proxy-factory.spec.ts b/packages/core/src/common/messaging/proxy-factory.spec.ts index 37280e4dbfdaa..2fd0700a41034 100644 --- a/packages/core/src/common/messaging/proxy-factory.spec.ts +++ b/packages/core/src/common/messaging/proxy-factory.spec.ts @@ -15,11 +15,21 @@ // ***************************************************************************** import * as chai from 'chai'; +import { ConsoleLogger } from '../../node/messaging/logger'; import { JsonRpcProxyFactory, JsonRpcProxy } from './proxy-factory'; -import { ChannelPipe } from '../message-rpc/channel.spec'; +import { createMessageConnection } from 'vscode-jsonrpc/lib/main'; +import * as stream from 'stream'; const expect = chai.expect; +class NoTransform extends stream.Transform { + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + override _transform(chunk: any, encoding: string, callback: Function): void { + callback(undefined, chunk); + } +} + class TestServer { requests: string[] = []; doStuff(arg: string): Promise { @@ -92,12 +102,15 @@ function getSetup(): { const server = new TestServer(); const serverProxyFactory = new JsonRpcProxyFactory(client); - const pipe = new ChannelPipe(); - serverProxyFactory.listen(pipe.right); + const client2server = new NoTransform(); + const server2client = new NoTransform(); + const serverConnection = createMessageConnection(server2client, client2server, new ConsoleLogger()); + serverProxyFactory.listen(serverConnection); const serverProxy = serverProxyFactory.createProxy(); const clientProxyFactory = new JsonRpcProxyFactory(server); - clientProxyFactory.listen(pipe.left); + const clientConnection = createMessageConnection(client2server, server2client, new ConsoleLogger()); + clientProxyFactory.listen(clientConnection); const clientProxy = clientProxyFactory.createProxy(); return { client, diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts index 0562ada54e1bb..f8869449eae94 100644 --- a/packages/core/src/common/messaging/proxy-factory.ts +++ b/packages/core/src/common/messaging/proxy-factory.ts @@ -16,12 +16,10 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { ResponseError } from '../message-rpc/rpc-message-encoder'; +import { MessageConnection, ResponseError } from 'vscode-ws-jsonrpc'; import { ApplicationError } from '../application-error'; +import { Event, Emitter } from '../event'; import { Disposable } from '../disposable'; -import { Emitter, Event } from '../event'; -import { Channel } from '../message-rpc/channel'; -import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol'; import { ConnectionHandler } from './handler'; export type JsonRpcServer = Disposable & { @@ -47,19 +45,13 @@ export class JsonRpcConnectionHandler implements ConnectionHan readonly factoryConstructor: new () => JsonRpcProxyFactory = JsonRpcProxyFactory ) { } - onConnection(connection: Channel): void { + onConnection(connection: MessageConnection): void { const factory = new this.factoryConstructor(); const proxy = factory.createProxy(); factory.target = this.targetFactory(proxy); factory.listen(connection); } } -/** - * Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}. - */ -export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; - -const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); /** * Factory for JSON-RPC proxy objects. @@ -103,14 +95,13 @@ const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandl * * @param - The type of the object to expose to JSON-RPC. */ - export class JsonRpcProxyFactory implements ProxyHandler { protected readonly onDidOpenConnectionEmitter = new Emitter(); protected readonly onDidCloseConnectionEmitter = new Emitter(); - protected connectionPromiseResolve: (connection: RpcProtocol) => void; - protected connectionPromise: Promise; + protected connectionPromiseResolve: (connection: MessageConnection) => void; + protected connectionPromise: Promise; /** * Build a new JsonRpcProxyFactory. @@ -118,7 +109,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { * @param target - The object to expose to JSON-RPC methods calls. If this * is omitted, the proxy won't be able to handle requests, only send them. */ - constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) { + constructor(public target?: any) { this.waitForConnection(); } @@ -127,11 +118,9 @@ export class JsonRpcProxyFactory implements ProxyHandler { this.connectionPromiseResolve = resolve ); this.connectionPromise.then(connection => { - connection.channel.onClose(() => { - this.onDidCloseConnectionEmitter.fire(undefined); - // Wait for connection in case the backend reconnects - this.waitForConnection(); - }); + connection.onClose(() => + this.onDidCloseConnectionEmitter.fire(undefined) + ); this.onDidOpenConnectionEmitter.fire(undefined); }); } @@ -142,10 +131,11 @@ export class JsonRpcProxyFactory implements ProxyHandler { * This connection will be used to send/receive JSON-RPC requests and * response. */ - listen(channel: Channel): void { - const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args)); - connection.onNotification(event => this.onNotification(event.method, ...event.args)); - + listen(connection: MessageConnection): void { + connection.onRequest((prop, ...args) => this.onRequest(prop, ...args)); + connection.onNotification((prop, ...args) => this.onNotification(prop, ...args)); + connection.onDispose(() => this.waitForConnection()); + connection.listen(); this.connectionPromiseResolve(connection); } @@ -249,10 +239,10 @@ export class JsonRpcProxyFactory implements ProxyHandler { new Promise((resolve, reject) => { try { if (isNotify) { - connection.sendNotification(method, args); + connection.sendNotification(method, ...args); resolve(undefined); } else { - const resultPromise = connection.sendRequest(method, args) as Promise; + const resultPromise = connection.sendRequest(method, ...args) as Promise; resultPromise .catch((err: any) => reject(this.deserializeError(capturedError, err))) .then((result: any) => resolve(result)); @@ -303,4 +293,3 @@ export class JsonRpcProxyFactory implements ProxyHandler { } } - diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts index 66b90eed33739..28dff9400068a 100644 --- a/packages/core/src/common/messaging/web-socket-channel.ts +++ b/packages/core/src/common/messaging/web-socket-channel.ts @@ -16,101 +16,157 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Emitter, Event } from '../event'; -import { WriteBuffer } from '../message-rpc'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel'; -import { DisposableCollection } from '../disposable'; - -/** - * A channel that manages the main websocket connection between frontend and backend. All service channels - * are reusing this main channel. (multiplexing). An {@link IWebSocket} abstraction is used to keep the implementation - * independent of the actual websocket implementation and its execution context (backend vs. frontend). - */ -export class WebSocketChannel implements Channel { +import { IWebSocket } from 'vscode-ws-jsonrpc/lib/socket/socket'; +import { Disposable, DisposableCollection } from '../disposable'; +import { Emitter } from '../event'; + +export class WebSocketChannel implements IWebSocket { + static wsPath = '/services'; - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; + protected readonly closeEmitter = new Emitter<[number, string]>(); + protected readonly toDispose = new DisposableCollection(this.closeEmitter); + + constructor( + readonly id: number, + protected readonly doSend: (content: string) => void + ) { } + + dispose(): void { + this.toDispose.dispose(); } - protected readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; + protected checkNotDisposed(): void { + if (this.toDispose.disposed) { + throw new Error('The channel has been disposed.'); + } } - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; + handleMessage(message: WebSocketChannel.Message): void { + if (message.kind === 'ready') { + this.fireOpen(); + } else if (message.kind === 'data') { + this.fireMessage(message.content); + } else if (message.kind === 'close') { + this.fireClose(message.code, message.reason); + } } - protected toDispose = new DisposableCollection(); + open(path: string): void { + this.checkNotDisposed(); + this.doSend(JSON.stringify({ + kind: 'open', + id: this.id, + path + })); + } - constructor(protected readonly socket: IWebSocket) { - this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); - socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code })); - socket.onClose(() => this.close()); - socket.onError(error => this.onErrorEmitter.fire(error)); - // eslint-disable-next-line arrow-body-style - socket.onMessage(data => this.onMessageEmitter.fire(() => { - // In the browser context socketIO receives binary messages as ArrayBuffers. - // So we have to convert them to a Uint8Array before delegating the message to the read buffer. - const buffer = data instanceof ArrayBuffer ? new Uint8Array(data) : data; - return new Uint8ArrayReadBuffer(buffer); + ready(): void { + this.checkNotDisposed(); + this.doSend(JSON.stringify({ + kind: 'ready', + id: this.id })); } - getWriteBuffer(): WriteBuffer { - const result = new Uint8ArrayWriteBuffer(); + send(content: string): void { + this.checkNotDisposed(); + this.doSend(JSON.stringify({ + kind: 'data', + id: this.id, + content + })); + } - result.onCommit(buffer => { - if (this.socket.isConnected()) { - this.socket.send(buffer); - } - }); + close(code: number = 1000, reason: string = ''): void { + if (this.closing) { + // Do not try to close the channel if it is already closing. + return; + } + this.checkNotDisposed(); + this.doSend(JSON.stringify({ + kind: 'close', + id: this.id, + code, + reason + })); + this.fireClose(code, reason); + } - return result; + tryClose(code: number = 1000, reason: string = ''): void { + if (this.closing || this.toDispose.disposed) { + // Do not try to close the channel if it is already closing or disposed. + return; + } + this.doSend(JSON.stringify({ + kind: 'close', + id: this.id, + code, + reason + })); + this.fireClose(code, reason); } - close(): void { - this.toDispose.dispose(); - this.socket.close(); + protected fireOpen: () => void = () => { }; + onOpen(cb: () => void): void { + this.checkNotDisposed(); + this.fireOpen = cb; + this.toDispose.push(Disposable.create(() => this.fireOpen = () => { })); } -} -/** - * An abstraction that enables reuse of the `{@link WebSocketChannel} class in the frontend and backend - * independent of the actual underlying socket implementation. - */ -export interface IWebSocket { - /** - * Sends the given message over the web socket in binary format. - * @param message The binary message. - */ - send(message: Uint8Array): void; - /** - * Closes the websocket from the local side. - */ - close(): void; - /** - * The connection state of the web socket. - */ - isConnected(): boolean; - /** - * Listener callback to handle incoming messages. - * @param cb The callback. - */ - onMessage(cb: (message: Uint8Array) => void): void; - /** - * Listener callback to handle socket errors. - * @param cb The callback. - */ - onError(cb: (reason: any) => void): void; - /** - * Listener callback to handle close events (Remote side). - * @param cb The callback. - */ - onClose(cb: (reason: string, code?: number) => void): void; -} + protected fireMessage: (data: any) => void = () => { }; + onMessage(cb: (data: any) => void): void { + this.checkNotDisposed(); + this.fireMessage = cb; + this.toDispose.push(Disposable.create(() => this.fireMessage = () => { })); + } + + fireError: (reason: any) => void = () => { }; + onError(cb: (reason: any) => void): void { + this.checkNotDisposed(); + this.fireError = cb; + this.toDispose.push(Disposable.create(() => this.fireError = () => { })); + } + + protected closing = false; + protected fireClose(code: number, reason: string): void { + if (this.closing) { + return; + } + this.closing = true; + try { + this.closeEmitter.fire([code, reason]); + } finally { + this.closing = false; + } + this.dispose(); + } + onClose(cb: (code: number, reason: string) => void): Disposable { + this.checkNotDisposed(); + return this.closeEmitter.event(([code, reason]) => cb(code, reason)); + } +} +export namespace WebSocketChannel { + export interface OpenMessage { + kind: 'open' + id: number + path: string + } + export interface ReadyMessage { + kind: 'ready' + id: number + } + export interface DataMessage { + kind: 'data' + id: number + content: string + } + export interface CloseMessage { + kind: 'close' + id: number + code: number + reason: string + } + export type Message = OpenMessage | ReadyMessage | DataMessage | CloseMessage; +} diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts index 3cce9e75c3951..b3d8ce8ab9415 100644 --- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts @@ -17,11 +17,9 @@ import { Event as ElectronEvent, ipcRenderer } from '@theia/electron/shared/electron'; import { injectable, interfaces } from 'inversify'; import { JsonRpcProxy } from '../../common/messaging'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; -import { Emitter, Event } from '../../common'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider } from '../../common/message-rpc/channel'; export interface ElectronIpcOptions { } @@ -38,27 +36,15 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider { + this.handleIncomingRawMessage(data); + }); } - protected createMainChannel(): Channel { - const onMessageEmitter = new Emitter(); - ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => { - onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); + protected createChannel(id: number): WebSocketChannel { + return new WebSocketChannel(id, content => { + ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content); }); - return { - close: () => Event.None, - getWriteBuffer: () => { - const writer = new Uint8ArrayWriteBuffer(); - writer.onCommit(buffer => - ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) - ); - return writer; - }, - onClose: Event.None, - onError: Event.None, - onMessage: onMessageEmitter.event - }; } } diff --git a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts index f29776df9fa48..6f75ea31d0dae 100644 --- a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts @@ -15,9 +15,9 @@ // ***************************************************************************** import { injectable } from 'inversify'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { WebSocketConnectionProvider, WebSocketOptions } from '../../browser/messaging/ws-connection-provider'; import { FrontendApplicationContribution } from '../../browser/frontend-application'; -import { Channel } from '../../common'; /** * Customized connection provider between the frontend and the backend in electron environment. @@ -34,13 +34,16 @@ export class ElectronWebSocketConnectionProvider extends WebSocketConnectionProv onStop(): void { this.stopping = true; - // Manually close the websocket connections `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. + // Close the websocket connection `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. // https://github.com/eclipse-theia/theia/issues/6499 - // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. - this.channelMultiPlexer?.onUnderlyingChannelClose({ reason: 'The frontend is "going away"', code: 1001 }); + for (const channel of [...this.channels.values()]) { + // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. + // But we cannot use `1001`: https://github.com/TypeFox/vscode-ws-jsonrpc/issues/15 + channel.close(1000, 'The frontend is "going away"...'); + } } - override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { + override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void { if (!this.stopping) { super.openChannel(path, handler, options); } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts index b2e4b00e2e1a3..071796c5cf0ca 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts @@ -16,23 +16,24 @@ import { IpcMainEvent, ipcMain, WebContents } from '@theia/electron/shared/electron'; import { inject, injectable, named, postConstruct } from 'inversify'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection'; import { ContributionProvider } from '../../common/contribution-provider'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { MessagingContribution } from '../../node/messaging/messaging-contribution'; +import { ConsoleLogger } from '../../node/messaging/logger'; import { ElectronConnectionHandler, THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; import { ElectronMainApplicationContribution } from '../electron-main-application'; import { ElectronMessagingService } from './electron-messaging-service'; -import { Channel, ChannelCloseEvent, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; -import { Emitter, Event, WriteBuffer } from '../../common'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; /** * This component replicates the role filled by `MessagingContribution` but for Electron. * Unlike the WebSocket based implementation, we do not expect to receive * connection events. Instead, we'll create channels based on incoming `open` * events on the `ipcMain` channel. + * * This component allows communication between renderer process (frontend) and electron main process. */ - @injectable() export class ElectronMessagingContribution implements ElectronMainApplicationContribution, ElectronMessagingService { @@ -42,112 +43,89 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon @inject(ContributionProvider) @named(ElectronConnectionHandler) protected readonly connectionHandlers: ContributionProvider; - protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); - /** - * Each electron window has a main chanel and its own multiplexer to route multiple client messages the same IPC connection. - */ - protected readonly windowChannelMultiplexer = new Map(); + protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); + protected readonly windowChannels = new Map>(); @postConstruct() protected init(): void { - ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: Uint8Array) => { - this.handleIpcEvent(event, data); + ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: string) => { + this.handleIpcMessage(event, data); }); } - protected handleIpcEvent(event: IpcMainEvent, data: Uint8Array): void { - const sender = event.sender; - // Get the multiplexer for a given window id - try { - const windowChannelData = this.windowChannelMultiplexer.get(sender.id) ?? this.createWindowChannelData(sender); - windowChannelData!.channel.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); - } catch (error) { - console.error('IPC: Failed to handle message', { error, data }); - } - } - - // Creates a new multiplexer for a given sender/window - protected createWindowChannelData(sender: Electron.WebContents): { channel: ElectronWebContentChannel, multiPlexer: ChannelMultiplexer } { - const mainChannel = this.createWindowMainChannel(sender); - const multiPlexer = new ChannelMultiplexer(mainChannel); - multiPlexer.onDidOpenChannel(openEvent => { - const { channel, id } = openEvent; - if (this.channelHandlers.route(id, channel)) { - console.debug(`Opening channel for service path '${id}'.`); - channel.onClose(() => console.debug(`Closing channel on service path '${id}'.`)); - } - }); - - sender.once('did-navigate', () => multiPlexer.onUnderlyingChannelClose({ reason: 'Window was refreshed' })); // When refreshing the browser window. - sender.once('destroyed', () => multiPlexer.onUnderlyingChannelClose({ reason: 'Window was closed' })); // When closing the browser window. - const data = { channel: mainChannel, multiPlexer }; - this.windowChannelMultiplexer.set(sender.id, data); - return data; - } - - /** - * Creates the main channel to a window. - * @param sender The window that the channel should be established to. - */ - protected createWindowMainChannel(sender: WebContents): ElectronWebContentChannel { - return new ElectronWebContentChannel(sender); - } - onStart(): void { for (const contribution of this.messagingContributions.getContributions()) { contribution.configure(this); } for (const connectionHandler of this.connectionHandlers.getContributions()) { this.channelHandlers.push(connectionHandler.path, (params, channel) => { - connectionHandler.onConnection(channel); + const connection = createWebSocketConnection(channel, new ConsoleLogger()); + connectionHandler.onConnection(connection); }); } } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ipcChannel(spec: string, callback: (params: any, channel: Channel) => void): void { - this.channelHandlers.push(spec, callback); - } -} - -/** - * Used to establish a connection between the ipcMain and the Electron frontend (window). - * Messages a transferred via electron IPC. - */ -export class ElectronWebContentChannel implements Channel { - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } - - // Make the message emitter public so that we can easily forward messages received from the ipcMain. - readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; + listen(spec: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void { + this.ipcChannel(spec, (params, channel) => { + const connection = createWebSocketConnection(channel, new ConsoleLogger()); + callback(params, connection); + }); } - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ipcChannel(spec: string, callback: (params: any, channel: WebSocketChannel) => void): void { + this.channelHandlers.push(spec, callback); } - constructor(protected readonly sender: Electron.WebContents) { + protected handleIpcMessage(event: IpcMainEvent, data: string): void { + const sender = event.sender; + try { + // Get the channel map for a given window id + let channels = this.windowChannels.get(sender.id)!; + if (!channels) { + this.windowChannels.set(sender.id, channels = new Map()); + } + // Start parsing the message to extract the channel id and route + const message: WebSocketChannel.Message = JSON.parse(data.toString()); + // Someone wants to open a logical channel + if (message.kind === 'open') { + const { id, path } = message; + const channel = this.createChannel(id, sender); + if (this.channelHandlers.route(path, channel)) { + channel.ready(); + channels.set(id, channel); + channel.onClose(() => channels.delete(id)); + } else { + console.error('Cannot find a service for the path: ' + path); + } + } else { + const { id } = message; + const channel = channels.get(id); + if (channel) { + channel.handleMessage(message); + } else { + console.error('The ipc channel does not exist', id); + } + } + const close = () => { + for (const channel of Array.from(channels.values())) { + channel.close(undefined, 'webContent destroyed'); + } + channels.clear(); + }; + sender.once('did-navigate', close); // When refreshing the browser window. + sender.once('destroyed', close); // When closing the browser window. + } catch (error) { + console.error('IPC: Failed to handle message', { error, data }); + } } - getWriteBuffer(): WriteBuffer { - const writer = new Uint8ArrayWriteBuffer(); - - writer.onCommit(buffer => { - if (!this.sender.isDestroyed()) { - this.sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer); + protected createChannel(id: number, sender: WebContents): WebSocketChannel { + return new WebSocketChannel(id, content => { + if (!sender.isDestroyed()) { + sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content); } }); - - return writer; - } - close(): void { - this.onCloseEmitter.dispose(); - this.onMessageEmitter.dispose(); - this.onErrorEmitter.dispose(); } + } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-service.ts b/packages/core/src/electron-main/messaging/electron-messaging-service.ts index 874d51237b4fd..dde3fdde1d181 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-service.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-service.ts @@ -14,14 +14,20 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '../../common/message-rpc/channel'; +import type { MessageConnection } from 'vscode-jsonrpc'; +import type { WebSocketChannel } from '../../common/messaging/web-socket-channel'; export interface ElectronMessagingService { + /** + * Accept a JSON-RPC connection on the given path. + * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. + */ + listen(path: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void; /** * Accept an ipc channel on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. */ - ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: Channel) => void): void; + ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: WebSocketChannel) => void): void; } export namespace ElectronMessagingService { export interface PathParams { diff --git a/packages/core/src/node/messaging/binary-message-pipe.ts b/packages/core/src/node/messaging/binary-message-pipe.ts deleted file mode 100644 index 1143aef9cf4cc..0000000000000 --- a/packages/core/src/node/messaging/binary-message-pipe.ts +++ /dev/null @@ -1,168 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2022 STMicroelectronics and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** - -import { Duplex } from 'stream'; -import { Disposable, Emitter, Event } from '../../common'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; - -/** - * A `BinaryMessagePipe` is capable of sending and retrieving binary messages i.e. {@link Uint8Array}s over - * and underlying streamed process pipe/fd. The message length of individual messages is encoding at the beginning of - * a new message. This makes it possible to extract messages from the streamed data. - */ -export class BinaryMessagePipe implements Disposable { - static readonly MESSAGE_START_IDENTIFIER = ''; - protected dataHandler = (chunk: Uint8Array) => this.handleChunk(chunk); - - protected onMessageEmitter = new Emitter(); - protected cachedMessageData: StreamedMessageData = { - chunks: [], - missingBytes: 0 - }; - - get onMessage(): Event { - return this.onMessageEmitter.event; - } - - constructor(protected readonly underlyingPipe: Duplex) { - underlyingPipe.on('data', this.dataHandler); - } - - send(message: Uint8Array): void { - this.underlyingPipe.write(this.encodeMessageStart(message)); - this.underlyingPipe.write(message); - } - - protected handleChunk(chunk: Uint8Array): void { - if (this.cachedMessageData.missingBytes === 0) { - // There is no currently streamed message => We expect that the beginning of the chunk is the message start for a new message - this.handleNewMessage(chunk); - } else { - // The chunk contains message data intended for the currently cached message - this.handleMessageContentChunk(chunk); - } - } - - protected handleNewMessage(chunk: Uint8Array): void { - if (chunk.byteLength < this.messageStartByteLength) { - // The chunk only contains a part of the encoded message start - this.cachedMessageData.partialMessageStart = chunk; - return; - } - - const messageLength = this.readMessageStart(chunk); - - if (chunk.length - this.messageStartByteLength > messageLength) { - // The initial chunk contains more than one binary message => Fire `onMessage` for first message and handle remaining content - const firstMessage = chunk.slice(this.messageStartByteLength, this.messageStartByteLength + messageLength); - this.onMessageEmitter.fire(firstMessage); - this.handleNewMessage(chunk.slice(this.messageStartByteLength + messageLength)); - - } else if (chunk.length - this.messageStartByteLength === messageLength) { - // The initial chunk contains exactly one complete message. => Directly fire the `onMessage` event. - this.onMessageEmitter.fire(chunk.slice(this.messageStartByteLength)); - } else { - // The initial chunk contains only part of the message content => Cache message data - this.cachedMessageData.chunks = [chunk.slice(this.messageStartByteLength)]; - this.cachedMessageData.missingBytes = messageLength - chunk.byteLength + this.messageStartByteLength; - } - } - - protected handleMessageContentChunk(chunk: Uint8Array): void { - if (this.cachedMessageData) { - if (chunk.byteLength < this.cachedMessageData.missingBytes) { - // The chunk only contains parts of the missing bytes for the cached message. - this.cachedMessageData.chunks.push(chunk); - this.cachedMessageData.missingBytes -= chunk.byteLength; - } else if (chunk.byteLength === this.cachedMessageData.missingBytes) { - // Chunk contains exactly the missing data for the cached message - this.cachedMessageData.chunks.push(chunk); - this.emitCachedMessage(); - } else { - // Chunk contains missing data for the cached message + data for the next message - const messageEnd = this.cachedMessageData.missingBytes; - const missingData = chunk.slice(0, messageEnd); - this.cachedMessageData.chunks.push(missingData); - this.emitCachedMessage(); - this.handleNewMessage(chunk.slice(messageEnd)); - } - - } - } - - protected emitCachedMessage(): void { - const message = Buffer.concat(this.cachedMessageData.chunks); - this.onMessageEmitter.fire(message); - this.cachedMessageData.chunks = []; - this.cachedMessageData.missingBytes = 0; - } - - /** - * Encodes the start of a new message into a {@link Uint8Array}. - * The message start consists of a identifier string and the length of the following message. - * @returns the buffer contains the encoded message start - */ - protected encodeMessageStart(message: Uint8Array): Uint8Array { - const writer = new Uint8ArrayWriteBuffer() - .writeString(BinaryMessagePipe.MESSAGE_START_IDENTIFIER) - .writeUint32(message.length); - const messageStart = writer.getCurrentContents(); - writer.dispose(); - return messageStart; - } - - protected get messageStartByteLength(): number { - // 4 bytes for length of id + id string length + 4 bytes for length of message - return 4 + BinaryMessagePipe.MESSAGE_START_IDENTIFIER.length + 4; - } - - /** - * Reads the start of a new message from a stream chunk (or cached message) received from the underlying pipe. - * The message start is expected to consist of an identifier string and the length of the message. - * @param chunk The stream chunk. - * @returns The length of the message content to read. - * @throws An error if the message start can not be read successfully. - */ - protected readMessageStart(chunk: Uint8Array): number { - const messageData = this.cachedMessageData.partialMessageStart ? Buffer.concat([this.cachedMessageData.partialMessageStart, chunk]) : chunk; - this.cachedMessageData.partialMessageStart = undefined; - - const reader = new Uint8ArrayReadBuffer(messageData); - const identifier = reader.readString(); - - if (identifier !== BinaryMessagePipe.MESSAGE_START_IDENTIFIER) { - throw new Error(`Could not read message start. The start identifier should be '${BinaryMessagePipe.MESSAGE_START_IDENTIFIER}' but was '${identifier}`); - } - const length = reader.readUint32(); - return length; - } - - dispose(): void { - this.underlyingPipe.removeListener('data', this.dataHandler); - this.underlyingPipe.end(); - this.onMessageEmitter.dispose(); - this.cachedMessageData = { - chunks: [], - missingBytes: 0 - }; - } -} - -interface StreamedMessageData { - chunks: Uint8Array[]; - missingBytes: number; - partialMessageStart?: Uint8Array; -} diff --git a/packages/core/src/node/messaging/ipc-bootstrap.ts b/packages/core/src/node/messaging/ipc-bootstrap.ts index d46d63efa9071..0bac13bb163b8 100644 --- a/packages/core/src/node/messaging/ipc-bootstrap.ts +++ b/packages/core/src/node/messaging/ipc-bootstrap.ts @@ -16,12 +16,20 @@ import 'reflect-metadata'; import { dynamicRequire } from '../dynamic-require'; -import { IPCChannel } from './ipc-channel'; +import { ConsoleLogger } from 'vscode-ws-jsonrpc/lib/logger'; +import { createMessageConnection, IPCMessageReader, IPCMessageWriter, Trace } from 'vscode-ws-jsonrpc'; import { checkParentAlive, IPCEntryPoint } from './ipc-protocol'; checkParentAlive(); const entryPoint = IPCEntryPoint.getScriptFromEnv(); +const reader = new IPCMessageReader(process); +const writer = new IPCMessageWriter(process); +const logger = new ConsoleLogger(); +const connection = createMessageConnection(reader, writer, logger); +connection.trace(Trace.Off, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + log: (message: any, data?: string) => console.log(message, data) +}); -dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(new IPCChannel()); - +dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(connection); diff --git a/packages/core/src/node/messaging/ipc-connection-provider.ts b/packages/core/src/node/messaging/ipc-connection-provider.ts index 06f57836df6c9..84c256997257a 100644 --- a/packages/core/src/node/messaging/ipc-connection-provider.ts +++ b/packages/core/src/node/messaging/ipc-connection-provider.ts @@ -15,11 +15,10 @@ // ***************************************************************************** import * as cp from 'child_process'; -import { inject, injectable } from 'inversify'; import * as path from 'path'; -import { createInterface } from 'readline'; -import { Channel, ConnectionErrorHandler, Disposable, DisposableCollection, ILogger } from '../../common'; -import { IPCChannel } from './ipc-channel'; +import { injectable, inject } from 'inversify'; +import { Trace, Tracer, IPCMessageReader, IPCMessageWriter, createMessageConnection, MessageConnection, Message } from 'vscode-ws-jsonrpc'; +import { ILogger, ConnectionErrorHandler, DisposableCollection, Disposable } from '../../common'; import { createIpcEnv } from './ipc-protocol'; export interface ResolvedIPCConnectionOptions { @@ -41,7 +40,7 @@ export class IPCConnectionProvider { @inject(ILogger) protected readonly logger: ILogger; - listen(options: IPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable { + listen(options: IPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable { return this.doListen({ logger: this.logger, args: [], @@ -49,21 +48,19 @@ export class IPCConnectionProvider { }, acceptor); } - protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable { + protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable { const childProcess = this.fork(options); - const channel = new IPCChannel(childProcess); + const connection = this.createConnection(childProcess, options); const toStop = new DisposableCollection(); const toCancelStop = toStop.push(Disposable.create(() => childProcess.kill())); const errorHandler = options.errorHandler; if (errorHandler) { - let errorCount = 0; - channel.onError((err: Error) => { - errorCount++; - if (errorHandler.shouldStop(err, errorCount)) { + connection.onError((e: [Error, Message | undefined, number | undefined]) => { + if (errorHandler.shouldStop(e[0], e[1], e[2])) { toStop.dispose(); } }); - channel.onClose(() => { + connection.onClose(() => { if (toStop.disposed) { return; } @@ -73,15 +70,36 @@ export class IPCConnectionProvider { } }); } - acceptor(channel); + acceptor(connection); return toStop; } + protected createConnection(childProcess: cp.ChildProcess, options: ResolvedIPCConnectionOptions): MessageConnection { + const reader = new IPCMessageReader(childProcess); + const writer = new IPCMessageWriter(childProcess); + const connection = createMessageConnection(reader, writer, { + error: (message: string) => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${message}`), + warn: (message: string) => this.logger.warn(`[${options.serverName}: ${childProcess.pid}] ${message}`), + info: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`), + log: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`) + }); + const tracer: Tracer = { + log: (message: unknown, data?: string) => this.logger.debug(`[${options.serverName}: ${childProcess.pid}] ${message}` + (typeof data === 'string' ? ' ' + data : '')) + }; + connection.trace(Trace.Verbose, tracer); + this.logger.isDebug().then(isDebug => { + if (!isDebug) { + connection.trace(Trace.Off, tracer); + } + }); + return connection; + } + protected fork(options: ResolvedIPCConnectionOptions): cp.ChildProcess { const forkOptions: cp.ForkOptions = { + silent: true, env: createIpcEnv(options), - execArgv: [], - stdio: ['pipe', 'pipe', 'pipe', 'ipc', 'pipe'] + execArgv: [] }; const inspectArgPrefix = `--${options.serverName}-inspect`; const inspectArg = process.argv.find(v => v.startsWith(inspectArgPrefix)); @@ -90,9 +108,8 @@ export class IPCConnectionProvider { } const childProcess = cp.fork(path.join(__dirname, 'ipc-bootstrap'), options.args, forkOptions); - - createInterface(childProcess.stdout!).on('line', line => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${line}`)); - createInterface(childProcess.stderr!).on('line', line => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${line}`)); + childProcess.stdout!.on('data', data => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`)); + childProcess.stderr!.on('data', data => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`)); this.logger.debug(`[${options.serverName}: ${childProcess.pid}] IPC started`); childProcess.once('exit', () => this.logger.debug(`[${options.serverName}: ${childProcess.pid}] IPC exited`)); diff --git a/packages/core/src/node/messaging/ipc-protocol.ts b/packages/core/src/node/messaging/ipc-protocol.ts index 03aa3944521c3..de9a77394b03e 100644 --- a/packages/core/src/node/messaging/ipc-protocol.ts +++ b/packages/core/src/node/messaging/ipc-protocol.ts @@ -15,14 +15,14 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '../../common/message-rpc/channel'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; const THEIA_PARENT_PID = 'THEIA_PARENT_PID'; const THEIA_ENTRY_POINT = 'THEIA_ENTRY_POINT'; export const ipcEntryPoint: string | undefined = process.env[THEIA_ENTRY_POINT]; -export type IPCEntryPoint = (connection: Channel) => void; +export type IPCEntryPoint = (connection: MessageConnection) => void; export namespace IPCEntryPoint { /** * Throws if `THEIA_ENTRY_POINT` is undefined or empty. diff --git a/packages/core/src/common/message-rpc/index.ts b/packages/core/src/node/messaging/logger.ts similarity index 64% rename from packages/core/src/common/message-rpc/index.ts rename to packages/core/src/node/messaging/logger.ts index 671540a31c42b..45229f3f47423 100644 --- a/packages/core/src/common/message-rpc/index.ts +++ b/packages/core/src/node/messaging/logger.ts @@ -1,5 +1,5 @@ // ***************************************************************************** -// Copyright (C) 2022 STMicroelectronics and others. +// Copyright (C) 2017 TypeFox and others. // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License v. 2.0 which is available at @@ -13,6 +13,25 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol'; -export { Channel, ChannelCloseEvent, MessageProvider } from './channel'; -export { ReadBuffer, WriteBuffer } from './message-buffer'; + +import { Logger } from 'vscode-ws-jsonrpc'; + +export class ConsoleLogger implements Logger { + + error(message: string): void { + console.log(message); + } + + warn(message: string): void { + console.log(message); + } + + info(message: string): void { + console.log(message); + } + + log(message: string): void { + console.log(message); + } + +} diff --git a/packages/core/src/node/messaging/messaging-contribution.ts b/packages/core/src/node/messaging/messaging-contribution.ts index 2f906da0b1189..70c667bdf48bf 100644 --- a/packages/core/src/node/messaging/messaging-contribution.ts +++ b/packages/core/src/node/messaging/messaging-contribution.ts @@ -18,15 +18,19 @@ import * as http from 'http'; import * as https from 'https'; import { Server, Socket } from 'socket.io'; import { injectable, inject, named, postConstruct, interfaces, Container } from 'inversify'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection'; +import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection'; +import * as launch from 'vscode-ws-jsonrpc/lib/server/launch'; import { ContributionProvider, ConnectionHandler, bindContributionProvider } from '../../common'; -import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { BackendApplicationContribution } from '../backend-application'; -import { MessagingService } from './messaging-service'; +import { MessagingService, WebSocketChannelConnection } from './messaging-service'; +import { ConsoleLogger } from './logger'; import { ConnectionContainerModule } from './connection-container-module'; import Route = require('route-parser'); import { WsRequestValidator } from '../ws-request-validators'; import { MessagingListener } from './messaging-listeners'; -import { Channel, ChannelMultiplexer } from '../../common/message-rpc/channel'; export const MessagingContainer = Symbol('MessagingContainer'); @@ -49,7 +53,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me protected readonly messagingListener: MessagingListener; protected readonly wsHandlers = new MessagingContribution.ConnectionHandlers(); - protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); + protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); @postConstruct() protected init(): void { @@ -59,7 +63,21 @@ export class MessagingContribution implements BackendApplicationContribution, Me } } - wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void { + listen(spec: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void { + this.wsChannel(spec, (params, channel) => { + const connection = createWebSocketConnection(channel, new ConsoleLogger()); + callback(params, connection); + }); + } + + forward(spec: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void { + this.wsChannel(spec, (params, channel) => { + const connection = launch.createWebSocketConnection(channel); + callback(params, WebSocketChannelConnection.create(connection, channel)); + }); + } + + wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: WebSocketChannel) => void): void { this.channelHandlers.push(spec, (params, channel) => callback(params, channel)); } @@ -116,22 +134,17 @@ export class MessagingContribution implements BackendApplicationContribution, Me event.channel.onClose(() => console.debug(`Closing channel on service path '${event.id}'.`)); } }); - } - - protected toIWebSocket(socket: Socket): IWebSocket { - return { - close: () => { - socket.removeAllListeners('disconnect'); - socket.removeAllListeners('error'); - socket.removeAllListeners('message'); - socket.disconnect(); - }, - isConnected: () => socket.connected, - onClose: cb => socket.on('disconnect', reason => cb(reason)), - onError: cb => socket.on('error', error => cb(error)), - onMessage: cb => socket.on('message', data => cb(data)), - send: message => socket.emit('message', message) - }; + socket.on('error', err => { + for (const channel of channels.values()) { + channel.fireError(err); + } + }); + socket.on('disconnect', reason => { + for (const channel of channels.values()) { + channel.close(undefined, reason); + } + channels.clear(); + }); } protected createSocketContainer(socket: Socket): Container { @@ -140,7 +153,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me return connectionContainer; } - protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers { + protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers { const connectionContainer = this.createSocketContainer(socket); bindContributionProvider(connectionContainer, ConnectionHandler); connectionContainer.load(...this.connectionModules.getContributions()); @@ -148,12 +161,21 @@ export class MessagingContribution implements BackendApplicationContribution, Me const connectionHandlers = connectionContainer.getNamed>(ContributionProvider, ConnectionHandler); for (const connectionHandler of connectionHandlers.getContributions(true)) { connectionChannelHandlers.push(connectionHandler.path, (_, channel) => { - connectionHandler.onConnection(channel); + const connection = createWebSocketConnection(channel, new ConsoleLogger()); + connectionHandler.onConnection(connection); }); } return connectionChannelHandlers; } + protected createChannel(id: number, socket: Socket): WebSocketChannel { + return new WebSocketChannel(id, content => { + if (socket.connected) { + socket.send(content); + } + }); + } + } export namespace MessagingContribution { diff --git a/packages/core/src/node/messaging/messaging-service.ts b/packages/core/src/node/messaging/messaging-service.ts index 276b58734bcff..087f6d5850def 100644 --- a/packages/core/src/node/messaging/messaging-service.ts +++ b/packages/core/src/node/messaging/messaging-service.ts @@ -15,21 +15,33 @@ // ***************************************************************************** import { Socket } from 'socket.io'; -import { Channel } from '../../common/message-rpc/channel'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; export interface MessagingService { + /** + * Accept a JSON-RPC connection on the given path. + * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. + */ + listen(path: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void; + /** + * Accept a raw JSON-RPC connection on the given path. + * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. + */ + forward(path: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void; /** * Accept a web socket channel on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. */ - wsChannel(path: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void; + wsChannel(path: string, callback: (params: MessagingService.PathParams, socket: WebSocketChannel) => void): void; /** * Accept a web socket connection on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. * * #### Important - * Prefer using web socket channels over establishing new web socket connection. Clients can handle only limited amount of web sockets - * and excessive amount can cause performance degradation. All web socket channels share a single web socket connection. + * Prefer JSON-RPC connections or web socket channels over web sockets. Clients can handle only limited amount of web sockets + * and excessive amount can cause performance degradation. All JSON-RPC connections and web socket channels share the single web socket connection. */ ws(path: string, callback: (params: MessagingService.PathParams, socket: Socket) => void): void; } @@ -44,3 +56,18 @@ export namespace MessagingService { configure(service: MessagingService): void; } } + +export interface WebSocketChannelConnection extends IConnection { + channel: WebSocketChannel; +} +export namespace WebSocketChannelConnection { + export function is(connection: IConnection): connection is WebSocketChannelConnection { + return (connection as WebSocketChannelConnection).channel instanceof WebSocketChannel; + } + + export function create(connection: IConnection, channel: WebSocketChannel): WebSocketChannelConnection { + const result = connection as WebSocketChannelConnection; + result.channel = channel; + return result; + } +} diff --git a/packages/core/src/node/messaging/test/test-web-socket-channel.ts b/packages/core/src/node/messaging/test/test-web-socket-channel.ts index 0ef0c50186cee..2fbb17c9aa8ec 100644 --- a/packages/core/src/node/messaging/test/test-web-socket-channel.ts +++ b/packages/core/src/node/messaging/test/test-web-socket-channel.ts @@ -16,41 +16,32 @@ import * as http from 'http'; import * as https from 'https'; +import { WebSocketChannel } from '../../../common/messaging/web-socket-channel'; +import { Disposable } from '../../../common/disposable'; import { AddressInfo } from 'net'; -import { io, Socket } from 'socket.io-client'; -import { Channel, ChannelMultiplexer } from '../../../common/message-rpc/channel'; -import { IWebSocket, WebSocketChannel } from '../../../common/messaging/web-socket-channel'; +import { io } from 'socket.io-client'; -export class TestWebSocketChannelSetup { - public readonly multiplexer: ChannelMultiplexer; - public readonly channel: Channel; +export class TestWebSocketChannel extends WebSocketChannel { constructor({ server, path }: { server: http.Server | https.Server, path: string }) { + super(0, content => socket.send(content)); const socket = io(`ws://localhost:${(server.address() as AddressInfo).port}${WebSocketChannel.wsPath}`); - this.channel = new WebSocketChannel(toIWebSocket(socket)); - this.multiplexer = new ChannelMultiplexer(this.channel); - socket.on('connect', () => { - this.multiplexer.open(path); + socket.on('error', error => + this.fireError(error) + ); + socket.on('disconnect', reason => + this.fireClose(0, reason) + ); + socket.on('message', data => { + this.handleMessage(JSON.parse(data.toString())); }); - socket.connect(); + socket.on('connect', () => + this.open(path) + ); + this.toDispose.push(Disposable.create(() => socket.close())); } -} -function toIWebSocket(socket: Socket): IWebSocket { - return { - close: () => { - socket.removeAllListeners('disconnect'); - socket.removeAllListeners('error'); - socket.removeAllListeners('message'); - socket.close(); - }, - isConnected: () => socket.connected, - onClose: cb => socket.on('disconnect', reason => cb(reason)), - onError: cb => socket.on('error', reason => cb(reason)), - onMessage: cb => socket.on('message', data => cb(data)), - send: message => socket.emit('message', message) - }; } diff --git a/packages/filesystem/src/common/files.ts b/packages/filesystem/src/common/files.ts index 63f789b9da2a3..95ee67c57d8de 100644 --- a/packages/filesystem/src/common/files.ts +++ b/packages/filesystem/src/common/files.ts @@ -846,7 +846,7 @@ export function hasOpenReadWriteCloseCapability(provider: FileSystemProvider): p */ export interface FileSystemProviderWithFileReadStreamCapability extends FileSystemProvider { /** - * Read the contents of the given file as stream. + * Read the contents of the given file as stream. * @param resource The `URI` of the file. * * @return The `ReadableStreamEvents` for the readable stream of the given file. diff --git a/packages/filesystem/src/common/remote-file-system-provider.ts b/packages/filesystem/src/common/remote-file-system-provider.ts index f67e198db75f7..5edb5dbbad9e7 100644 --- a/packages/filesystem/src/common/remote-file-system-provider.ts +++ b/packages/filesystem/src/common/remote-file-system-provider.ts @@ -42,11 +42,11 @@ export interface RemoteFileSystemServer extends JsonRpcServer; open(resource: string, opts: FileOpenOptions): Promise; close(fd: number): Promise; - read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }>; + read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }>; readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise; - readFile(resource: string): Promise; - write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise; - writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise; + readFile(resource: string): Promise; + write(fd: number, pos: number, data: number[], offset: number, length: number): Promise; + writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise; delete(resource: string, opts: FileDeleteOptions): Promise; mkdir(resource: string): Promise; readdir(resource: string): Promise<[string, FileType][]>; @@ -70,7 +70,7 @@ export interface RemoteFileSystemClient { notifyDidChangeFile(event: { changes: RemoteFileChange[] }): void; notifyFileWatchError(): void; notifyDidChangeCapabilities(capabilities: FileSystemProviderCapabilities): void; - onFileStreamData(handle: number, data: Uint8Array): void; + onFileStreamData(handle: number, data: number[]): void; onFileStreamEnd(handle: number, error: RemoteFileStreamError | undefined): void; } @@ -169,7 +169,7 @@ export class RemoteFileSystemProvider implements Required, D this.onFileWatchErrorEmitter.fire(); }, notifyDidChangeCapabilities: capabilities => this.setCapabilities(capabilities), - onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, data]), + onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, Uint8Array.from(data)]), onFileStreamEnd: (handle, error) => this.onFileStreamEndEmitter.fire([handle, error]) }); const onInitialized = this.server.onDidOpenConnection(() => { @@ -224,7 +224,7 @@ export class RemoteFileSystemProvider implements Required, D async readFile(resource: URI): Promise { const bytes = await this.server.readFile(resource.toString()); - return bytes; + return Uint8Array.from(bytes); } readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents { @@ -264,11 +264,11 @@ export class RemoteFileSystemProvider implements Required, D } write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise { - return this.server.write(fd, pos, data, offset, length); + return this.server.write(fd, pos, [...data.values()], offset, length); } writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise { - return this.server.writeFile(resource.toString(), content, opts); + return this.server.writeFile(resource.toString(), [...content.values()], opts); } delete(resource: URI, opts: FileDeleteOptions): Promise { @@ -412,33 +412,34 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { throw new Error('not supported'); } - async read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }> { + async read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }> { if (hasOpenReadWriteCloseCapability(this.provider)) { const buffer = BinaryBuffer.alloc(this.BUFFER_SIZE); const bytes = buffer.buffer; const bytesRead = await this.provider.read(fd, pos, bytes, 0, length); - return { bytes, bytesRead }; + return { bytes: [...bytes.values()], bytesRead }; } throw new Error('not supported'); } - write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise { + write(fd: number, pos: number, data: number[], offset: number, length: number): Promise { if (hasOpenReadWriteCloseCapability(this.provider)) { - return this.provider.write(fd, pos, data, offset, length); + return this.provider.write(fd, pos, Uint8Array.from(data), offset, length); } throw new Error('not supported'); } - async readFile(resource: string): Promise { + async readFile(resource: string): Promise { if (hasReadWriteCapability(this.provider)) { - return this.provider.readFile(new URI(resource)); + const buffer = await this.provider.readFile(new URI(resource)); + return [...buffer.values()]; } throw new Error('not supported'); } - writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise { + writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise { if (hasReadWriteCapability(this.provider)) { - return this.provider.writeFile(new URI(resource), content, opts); + return this.provider.writeFile(new URI(resource), Uint8Array.from(content), opts); } throw new Error('not supported'); } @@ -496,7 +497,7 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { if (hasFileReadStreamCapability(this.provider)) { const handle = this.readFileStreamSeq++; const stream = this.provider.readFileStream(new URI(resource), opts, token); - stream.on('data', data => this.client?.onFileStreamData(handle, data)); + stream.on('data', data => this.client?.onFileStreamData(handle, [...data.values()])); stream.on('error', error => { const code = error instanceof FileSystemProviderError ? error.code : undefined; const { name, message, stack } = error; diff --git a/packages/task/src/node/task-server.slow-spec.ts b/packages/task/src/node/task-server.slow-spec.ts index 7e63ddd14927c..fbe968348d9d2 100644 --- a/packages/task/src/node/task-server.slow-spec.ts +++ b/packages/task/src/node/task-server.slow-spec.ts @@ -28,10 +28,9 @@ import { isWindows, isOSX } from '@theia/core/lib/common/os'; import { FileUri } from '@theia/core/lib/node'; import { terminalsPath } from '@theia/terminal/lib/common/terminal-protocol'; import { expectThrowsAsync } from '@theia/core/lib/common/test/expect'; -import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; +import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; import { expect } from 'chai'; import URI from '@theia/core/lib/common/uri'; -import { RpcProtocol } from '@theia/core'; // test scripts that we bundle with tasks const commandShortRunning = './task'; @@ -107,38 +106,26 @@ describe('Task server / back-end', function (): void { // hook-up to terminal's ws and confirm that it outputs expected tasks' output await new Promise((resolve, reject) => { - const setup = new TestWebSocketChannelSetup({ server, path: `${terminalsPath}/${terminalId}` }); - setup.multiplexer.onDidOpenChannel(event => { - const channel = event.channel; - const connection = new RpcProtocol(channel, async (method, args) => { - const error = new Error(`Received unexpected request: ${method} with args: ${args} `); - reject(error); - throw error; - }); - channel.onError(reject); - channel.onClose(() => reject(new Error('Channel has been closed'))); - connection.onNotification(not => { - // check output of task on terminal is what we expect - const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`; - // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected. - // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n` - const currentMessage = not.args[0]; - messages.unshift(currentMessage); - if (currentMessage.indexOf(expected) !== -1) { - resolve(); - channel.close(); - return; - } - if (messages.length >= messagesToWaitFor) { - reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`)); - channel.close(); - } - }); - channel.onMessage(reader => { - - }); + const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` }); + channel.onError(reject); + channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`))); + channel.onMessage(msg => { + // check output of task on terminal is what we expect + const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`; + // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected. + // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n` + const currentMessage = msg.toString(); + messages.unshift(currentMessage); + if (currentMessage.indexOf(expected) !== -1) { + resolve(); + channel.close(); + return; + } + if (messages.length >= messagesToWaitFor) { + reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`)); + channel.close(); + } }); - }); }); diff --git a/packages/terminal/src/browser/terminal-widget-impl.ts b/packages/terminal/src/browser/terminal-widget-impl.ts index 04228a13c6275..47af4261c8ada 100644 --- a/packages/terminal/src/browser/terminal-widget-impl.ts +++ b/packages/terminal/src/browser/terminal-widget-impl.ts @@ -17,7 +17,7 @@ import { Terminal, RendererType } from 'xterm'; import { FitAddon } from 'xterm-addon-fit'; import { inject, injectable, named, postConstruct } from '@theia/core/shared/inversify'; -import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection, RpcProtocol, RequestHandler } from '@theia/core'; +import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection } from '@theia/core'; import { Widget, Message, WebSocketConnectionProvider, StatefulWidget, isFirefox, MessageLoop, KeyCode, codicon } from '@theia/core/lib/browser'; import { isOSX } from '@theia/core/lib/common'; import { WorkspaceService } from '@theia/workspace/lib/browser'; @@ -26,6 +26,7 @@ import { terminalsPath } from '../common/terminal-protocol'; import { IBaseTerminalServer, TerminalProcessInfo } from '../common/base-terminal-protocol'; import { TerminalWatcher } from '../common/terminal-watcher'; import { TerminalWidgetOptions, TerminalWidget, TerminalDimensions, TerminalExitStatus } from './base/terminal-widget'; +import { MessageConnection } from '@theia/core/shared/vscode-ws-jsonrpc'; import { Deferred } from '@theia/core/lib/common/promise-util'; import { TerminalPreferences, TerminalRendererType, isTerminalRendererType, DEFAULT_TERMINAL_RENDERER_TYPE, CursorStyle } from './terminal-preferences'; import { TerminalContribution } from './terminal-contribution'; @@ -60,7 +61,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget protected searchBox: TerminalSearchWidget; protected restored = false; protected closeOnDispose = true; - protected waitForConnection: Deferred | undefined; + protected waitForConnection: Deferred | undefined; protected hoverMessage: HTMLDivElement; protected lastTouchEnd: TouchEvent | undefined; protected isAttachedCloseListener: boolean = false; @@ -506,23 +507,16 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget } this.toDisposeOnConnect.dispose(); this.toDispose.push(this.toDisposeOnConnect); - const waitForConnection = this.waitForConnection = new Deferred(); + const waitForConnection = this.waitForConnection = new Deferred(); this.webSocketConnectionProvider.listen({ path: `${terminalsPath}/${this.terminalId}`, onConnection: connection => { - const requestHandler: RequestHandler = _method => this.logger.warn('Received an unhandled RPC request from the terminal process'); - - const rpc = new RpcProtocol(connection, requestHandler); - rpc.onNotification(event => { - if (event.method === 'onData') { - this.write(event.args[0]); - } - }); + connection.onNotification('onData', (data: string) => this.write(data)); // Excludes the device status code emitted by Xterm.js const sendData = (data?: string) => { if (data && !this.deviceStatusCodes.has(data) && !this.disableEnterWhenAttachCloseListener()) { - return rpc.sendRequest('write', [data]); + return connection.sendRequest('write', data); } }; @@ -530,10 +524,12 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget disposable.push(this.term.onData(sendData)); disposable.push(this.term.onBinary(sendData)); - connection.onClose(() => disposable.dispose()); + connection.onDispose(() => disposable.dispose()); + this.toDisposeOnConnect.push(connection); + connection.listen(); if (waitForConnection) { - waitForConnection.resolve(rpc); + waitForConnection.resolve(connection); } } }, { reconnecting: false }); @@ -583,7 +579,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget sendText(text: string): void { if (this.waitForConnection) { this.waitForConnection.promise.then(connection => - connection.sendRequest('write', [text]) + connection.sendRequest('write', text) ); } } diff --git a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts index aa4a54e72deff..6d39ddd973f20 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts @@ -20,7 +20,7 @@ import { IShellTerminalServer } from '../common/shell-terminal-protocol'; import * as http from 'http'; import * as https from 'https'; import { terminalsPath } from '../common/terminal-protocol'; -import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; +import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; describe('Terminal Backend Contribution', function (): void { @@ -45,19 +45,13 @@ describe('Terminal Backend Contribution', function (): void { it('is data received from the terminal ws server', async () => { const terminalId = await shellTerminalServer.create({}); await new Promise((resolve, reject) => { - const path = `${terminalsPath}/${terminalId}`; - const { channel, multiplexer } = new TestWebSocketChannelSetup({ server, path }); + const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` }); channel.onError(reject); - channel.onClose(event => reject(new Error(`channel is closed with '${event.code}' code and '${event.reason}' reason}`))); - - multiplexer.onDidOpenChannel(event => { - if (event.id === path) { - resolve(); - channel.close(); - } + channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`))); + channel.onOpen(() => { + resolve(); + channel.close(); }); - }); }); - }); diff --git a/packages/terminal/src/node/terminal-backend-contribution.ts b/packages/terminal/src/node/terminal-backend-contribution.ts index dea4504e0ffea..4675b7a32290c 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.ts @@ -15,11 +15,10 @@ // ***************************************************************************** import { injectable, inject, named } from '@theia/core/shared/inversify'; -import { ILogger, RequestHandler } from '@theia/core/lib/common'; +import { ILogger } from '@theia/core/lib/common'; import { TerminalProcess, ProcessManager } from '@theia/process/lib/node'; import { terminalsPath } from '../common/terminal-protocol'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; -import { RpcProtocol } from '@theia/core/'; @injectable() export class TerminalBackendContribution implements MessagingService.Contribution { @@ -31,27 +30,19 @@ export class TerminalBackendContribution implements MessagingService.Contributio protected readonly logger: ILogger; configure(service: MessagingService): void { - service.wsChannel(`${terminalsPath}/:id`, (params: { id: string }, channel) => { + service.listen(`${terminalsPath}/:id`, (params: { id: string }, connection) => { const id = parseInt(params.id, 10); const termProcess = this.processManager.get(id); if (termProcess instanceof TerminalProcess) { const output = termProcess.createOutputStream(); - // Create a RPC connection to the terminal process - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const requestHandler: RequestHandler = async (method: string, args: any[]) => { - if (method === 'write' && args[0]) { - termProcess.write(args[0]); - } else { - this.logger.warn('Terminal process received a request with an unsupported method or argument', { method, args }); - } - }; - - const rpc = new RpcProtocol(channel, requestHandler); - output.on('data', data => { - rpc.sendNotification('onData', [data]); - }); - channel.onClose(() => output.dispose()); + output.on('data', data => connection.sendNotification('onData', data.toString())); + connection.onRequest('write', (data: string) => termProcess.write(data)); + connection.onClose(() => output.dispose()); + connection.listen(); + } else { + connection.dispose(); } }); } + } diff --git a/yarn.lock b/yarn.lock index 128b96c0a50b0..6fbc6d10d83dc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2182,11 +2182,6 @@ resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.1.tgz#e2c6e73e0bdeb2521d00756d099218e9f5d90a04" integrity sha512-/zPMqDkzSZ8t3VtxOa4KPq7uzzW978M9Tvh+j7GHKuo6k6GTLxPJ4J5gE5cjfJ26pnXst0N5Hax8Sr0T2Mi9zQ== -"@types/chai@4.3.0": - version "4.3.0" - resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.0.tgz#23509ebc1fa32f1b4d50d6a66c4032d5b8eaabdc" - integrity sha512-/ceqdqeRraGolFTcfoXNiqjyQhZzbINDngeoAq9GoHa8PPK1yNzTaxWjA6BFWp5Ua9JpXEMSS4s5i9tS0hOJtw== - "@types/component-emitter@^1.2.10": version "1.2.11" resolved "https://registry.yarnpkg.com/@types/component-emitter/-/component-emitter-1.2.11.tgz#50d47d42b347253817a39709fef03ce66a108506" @@ -3865,28 +3860,11 @@ caseless@~0.12.0: resolved "https://registry.yarnpkg.com/caseless/-/caseless-0.12.0.tgz#1b681c21ff84033c826543090689420d187151dc" integrity sha512-4tYFyifaFfGacoiObjJegolkwSU4xQNGbVgUiNYVUxbQ2x2lUsFvY4hVgVzGiIe6WLOPqycWXA40l+PWsxthUw== -chai-spies@1.0.0: - version "1.0.0" - resolved "https://registry.yarnpkg.com/chai-spies/-/chai-spies-1.0.0.tgz#d16b39336fb316d03abf8c375feb23c0c8bb163d" - integrity sha512-elF2ZUczBsFoP07qCfMO/zeggs8pqCf3fZGyK5+2X4AndS8jycZYID91ztD9oQ7d/0tnS963dPkd0frQEThDsg== - chai-string@^1.4.0: version "1.5.0" resolved "https://registry.yarnpkg.com/chai-string/-/chai-string-1.5.0.tgz#0bdb2d8a5f1dbe90bc78ec493c1c1c180dd4d3d2" integrity sha512-sydDC3S3pNAQMYwJrs6dQX0oBQ6KfIPuOZ78n7rocW0eJJlsHPh2t3kwW7xfwYA/1Bf6/arGtSUo16rxR2JFlw== -chai@4.3.4: - version "4.3.4" - resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.4.tgz#b55e655b31e1eac7099be4c08c21964fce2e6c49" - integrity sha512-yS5H68VYOCtN1cjfwumDSuzn/9c+yza4f3reKXlE5rUg7SFcCEy90gJvydNgOYtblyf4Zi6jIWRnXOgErta0KA== - dependencies: - assertion-error "^1.1.0" - check-error "^1.0.2" - deep-eql "^3.0.1" - get-func-name "^2.0.0" - pathval "^1.1.1" - type-detect "^4.0.5" - chai@^4.2.0: version "4.3.6" resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.6.tgz#ffe4ba2d9fa9d6680cc0b370adae709ec9011e9c" @@ -9671,6 +9649,11 @@ rechoir@^0.7.0: dependencies: resolve "^1.9.0" +reconnecting-websocket@^4.2.0: + version "4.4.0" + resolved "https://registry.yarnpkg.com/reconnecting-websocket/-/reconnecting-websocket-4.4.0.tgz#3b0e5b96ef119e78a03135865b8bb0af1b948783" + integrity sha512-D2E33ceRPga0NvTDhJmphEgJ7FUYF0v4lr1ki0csq06OdlxKfugGzN0dSkxM/NfqCxYELK4KcaTOUOjTV6Dcng== + redent@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/redent/-/redent-3.0.0.tgz#e557b7998316bb53c9f1f56fa626352c6963059f" @@ -11451,7 +11434,7 @@ vscode-debugprotocol@^1.32.0: resolved "https://registry.yarnpkg.com/vscode-debugprotocol/-/vscode-debugprotocol-1.51.0.tgz#c03168dac778b6c24ce17b3511cb61e89c11b2df" integrity sha512-dzKWTMMyebIMPF1VYMuuQj7gGFq7guR8AFya0mKacu+ayptJfaRuM0mdHCqiOth4FnRP8mPhEroFPx6Ift8wHA== -vscode-jsonrpc@^5.0.1: +vscode-jsonrpc@^5.0.0, vscode-jsonrpc@^5.0.1: version "5.0.1" resolved "https://registry.yarnpkg.com/vscode-jsonrpc/-/vscode-jsonrpc-5.0.1.tgz#9bab9c330d89f43fc8c1e8702b5c36e058a01794" integrity sha512-JvONPptw3GAQGXlVV2utDcHx0BiY34FupW/kI6mZ5x06ER5DdPG/tXWMVHjTNULF5uKPOUUD0SaXg5QaubJL0A== @@ -11526,6 +11509,13 @@ vscode-windows-ca-certs@^0.3.0: dependencies: node-addon-api "^3.0.2" +vscode-ws-jsonrpc@^0.2.0: + version "0.2.0" + resolved "https://registry.yarnpkg.com/vscode-ws-jsonrpc/-/vscode-ws-jsonrpc-0.2.0.tgz#5e9c26e10da54a1a235da7d59e74508bbcb8edd9" + integrity sha512-NE9HNRgPjCaPyTJvIudcpyIWPImxwRDtuTX16yks7SAiZgSXigxAiZOvSvVBGmD1G/OMfrFo6BblOtjVR9DdVA== + dependencies: + vscode-jsonrpc "^5.0.0" + w3c-hr-time@^1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/w3c-hr-time/-/w3c-hr-time-1.0.2.tgz#0a89cdf5cc15822df9c360543676963e0cc308cd"