Skip to content

Commit

Permalink
Fix critical bugs
Browse files Browse the repository at this point in the history
- GH-11196 Remove dev/debug logoutput from IPCChannel
- GH-11199 Refactor connection provider and channel multiplexer to properly handle a reconnecting backend
   -  Remove console log in websocket-channel if the socket is not connected. Otherwise we end up in an endless loop.
   -  Ensure that channels & RpcProtocol instances proplery dispose all resources if the backend disconnects.
   -  Ensure that all previously open channels and RpcProtocol instances are properly restored once the backend reconnects.

- #11203 Ensure that debugging is handled gracefully (implicitly fixed with the fix for #11199)

- Remove dependency to `reconnecting-websocket` which is no longer needed since the swap to socket.io

Fixes #11196
Fixes #11199

Contributed on behalf of STMicroelectronics
  • Loading branch information
tortmayr committed May 31, 2022
1 parent b62e17e commit 53262af
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 83 deletions.
23 changes: 12 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@

- [Previous Changelogs](https://github.com/eclipse-theia/theia/tree/master/doc/changelogs/)

## v1.27.0 - 6/30/2022

<a name="breaking_changes_1.27.0">[Breaking Changes:](#breaking_changes_1.27.0)</a>

- [core] Refactored the core messaging API. Replaced `vscode-ws-jsonrpc` with a custom RPC protocol that is better suited for handling binary data and enables message tunneling.
This impacts all main concepts of the messaging API. The API no longer exposes a `Connection` object and uses a generic `Channel` implementation instead.
- Replaces usage of `vscode-json-rpc`'s `Connection` with the new generic `Channel`. Affects `AbstractConnectionProvider`, `MessagingService`, `IPCConnectionProvider`, `ElectronMessagingService`
- `MessagingService`: No longer offers the `listen` and `forward` method. Use `wsChannel` instead.
- `RemoteFileSystemServer`: Use `UInt8Array` instead of plain number arrays for all arguments and return type that store binary data
- `DebugAdapter`: Replaced the debug-service internal `Channel` implementation with the newly introduced generic `Channel`.
[#11011](https://github.com/eclipse-theia/theia/pull/11228) - Contributed on behalf of STMicroelectronics.

## v1.26.0 - 5/26/2022

- [application-package] introduce application config prop `validatePreferencesSchema` to control whether to validate preferences on start [#11189](https://github.com/eclipse-theia/theia/pull/11189)
Expand Down Expand Up @@ -55,17 +67,6 @@
- `FileSystem`, `FileMoveOptions`, `FileDeleteOptions`, `FileStat`, `FileSystemError`
- [filesystem] updated `FileStatNodeData.fileStat` to use the non-deprecated `FileStat` from `@theia/core/lib/common/files` [#11176](https://github.com/eclipse-theia/theia/pull/1176)

<a name="breaking_changes_1.26.0">[Breaking Changes:](#breaking_changes_1.26.0)</a>

- [core] Refactored the core messaging API. Replaced `vscode-ws-jsonrpc` with a custom RPC protocol that is better suited for handling binary data and enables message tunneling.
This impacts all main concepts of the messaging API. The API no longer exposes a `Connection` object and uses a generic `Channel` implementation instead.
- Replaces usage of `vscode-json-rpc`'s `Connection` with the new generic `Channel`. Affects `AbstractConnectionProvider`, `MessagingService`, `IPCConnectionProvider`, `ElectronMessagingService`
- `MessagingService`: No longer offers the `listen` and `forward` method. Use `wsChannel` instead.
- `RemoteFileSystemServer`: Use `UInt8Array` instead of plain number arrays for all arguments and return type that store binary data
- `DebugAdapter`: Replaced the debug-service internal `Channel` implementation with the newly introduced generic `Channel`.
[#11011](https://github.com/eclipse-theia/theia/pull/11011) - Contributed on behalf of STMicroelectronics.


## v1.25.0 - 4/28/2022

[1.25.0 Milestone](https://github.com/eclipse-theia/theia/milestone/35)
Expand Down
1 change: 0 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
"react-dom": "^16.8.0",
"react-tooltip": "^4.2.21",
"react-virtualized": "^9.20.0",
"reconnecting-websocket": "^4.2.0",
"reflect-metadata": "^0.1.10",
"route-parser": "^0.0.5",
"safer-buffer": "^2.1.2",
Expand Down
28 changes: 17 additions & 11 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ export interface WebSocketOptions {
export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebSocketOptions> {

protected readonly onSocketDidOpenEmitter: Emitter<void> = new Emitter();
// Socket that is used by the main channel
protected socket: Socket;
get onSocketDidOpen(): Event<void> {
return this.onSocketDidOpenEmitter.event;
}
Expand All @@ -50,18 +48,27 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
return container.get(WebSocketConnectionProvider).createProxy<T>(path, arg);
}

protected createMainChannel(): Channel {
protected readonly socket: Socket;

constructor() {
super();
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
const channel = new WebSocketChannel(this.toIWebSocket(socket));
socket.on('connect', () => {
this.socket = this.createWebSocket(url);
this.socket.on('connect', () => {
this.initializeMultiplexer();
if (this.reconnectChannelOpeners.length > 0) {
this.reconnectChannelOpeners.forEach(opener => opener());
this.reconnectChannelOpeners = [];
}
this.socket.on('disconnect', () => this.fireSocketDidClose());
this.socket.on('message', () => this.onIncomingMessageActivityEmitter.fire(undefined));
this.fireSocketDidOpen();
});
channel.onClose(() => this.fireSocketDidClose());
socket.connect();
this.socket = socket;
this.socket.connect();
}

return channel;
protected createMainChannel(): Channel {
return new WebSocketChannel(this.toIWebSocket(this.socket));
}

protected toIWebSocket(socket: Socket): IWebSocket {
Expand All @@ -70,7 +77,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
socket.removeAllListeners('disconnect');
socket.removeAllListeners('error');
socket.removeAllListeners('message');
socket.close();
},
isConnected: () => socket.connected,
onClose: cb => socket.on('disconnect', reason => cb(reason)),
Expand Down
46 changes: 29 additions & 17 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';

Expand Down Expand Up @@ -72,7 +73,9 @@ export type MessageProvider = () => ReadBuffer;
*/
export class ForwardingChannel implements Channel {

protected toDispose = new DisposableCollection();
constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) {
this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]);
}

onCloseEmitter: Emitter<ChannelCloseEvent> = new Emitter();
Expand All @@ -94,14 +97,9 @@ export class ForwardingChannel implements Channel {
return this.writeBufferSource();
}

send(message: Uint8Array): void {
const writeBuffer = this.getWriteBuffer();
writeBuffer.writeBytes(message);
writeBuffer.commit();
}

close(): void {
this.closeHandler();
this.toDispose.dispose();
}
}

Expand All @@ -120,7 +118,7 @@ export enum MessageTypes {
* channel, so we rely on writers to the multiplexed channels to always commit their
* messages and always in one go.
*/
export class ChannelMultiplexer {
export class ChannelMultiplexer implements Disposable {
protected pendingOpen: Map<string, (channel: ForwardingChannel) => void> = new Map();
protected openChannels: Map<string, ForwardingChannel> = new Map();

Expand All @@ -129,10 +127,15 @@ export class ChannelMultiplexer {
return this.onOpenChannelEmitter.event;
}

protected toDispose = new DisposableCollection();

constructor(protected readonly underlyingChannel: Channel) {
this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer()));
this.underlyingChannel.onClose(event => this.closeUnderlyingChannel(event));
this.underlyingChannel.onError(error => this.handleError(error));
this.toDispose.pushAll([
this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer())),
this.underlyingChannel.onClose(event => this.onUnderlyingChannelClose(event)),
this.underlyingChannel.onError(error => this.handleError(error)),
this.onOpenChannelEmitter
]);
}

protected handleError(error: unknown): void {
Expand All @@ -141,14 +144,19 @@ export class ChannelMultiplexer {
});
}

closeUnderlyingChannel(event?: ChannelCloseEvent): void {

this.pendingOpen.clear();
this.openChannels.forEach(channel => {
channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' });
});
onUnderlyingChannelClose(event?: ChannelCloseEvent): void {
if (!this.toDispose.disposed) {
this.toDispose.push(Disposable.create(() => {
this.pendingOpen.clear();
this.openChannels.forEach(channel => {
channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' });
});

this.openChannels.clear();
}));
this.dispose();
}

this.openChannels.clear();
}

protected handleMessage(buffer: ReadBuffer): void {
Expand Down Expand Up @@ -244,5 +252,9 @@ export class ChannelMultiplexer {
getOpenChannel(id: string): Channel | undefined {
return this.openChannels.get(id);
}

dispose(): void {
this.toDispose.dispose();
}
}

19 changes: 12 additions & 7 deletions packages/core/src/common/message-rpc/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */

import { CancellationToken, CancellationTokenSource } from '../cancellation';
import { DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { Deferred } from '../promise-util';
import { Channel } from './channel';
Expand All @@ -40,6 +41,7 @@ export interface RpcProtocolOptions {
*/
decoder?: RpcMessageDecoder
}

/**
* Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send
* sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side.
Expand All @@ -64,12 +66,14 @@ export class RpcProtocol {
return this.onNotificationEmitter.event;
}

protected toDispose = new DisposableCollection();

constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) {
this.encoder = options.encoder ?? new RpcMessageEncoder();
this.decoder = options.decoder ?? new RpcMessageDecoder();
const registration = channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())));
channel.onClose(() => registration.dispose());

this.toDispose.push(this.onNotificationEmitter);
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
channel.onClose(() => this.toDispose.dispose());
}

handleMessage(message: RpcMessage): void {
Expand Down Expand Up @@ -103,7 +107,7 @@ export class RpcProtocol {
this.pendingRequests.delete(id);
replyHandler.resolve(value);
} else {
console.warn(`reply: no handler for message: ${id}`);
throw new Error(`No reply handler for reply with id: ${id}`);
}
}

Expand All @@ -114,15 +118,14 @@ export class RpcProtocol {
this.pendingRequests.delete(id);
replyHandler.reject(error);
} else {
console.warn(`error: no handler for message: ${id}`);
throw new Error(`No reply handler for error reply with id: ${id}`);
}
} catch (err) {
throw err;
}
}

sendRequest<T>(method: string, args: any[]): Promise<T> {

const id = this.nextMessageId++;
const reply = new Deferred<T>();

Expand Down Expand Up @@ -176,7 +179,6 @@ export class RpcProtocol {
}

protected async handleRequest(id: number, method: string, args: any[]): Promise<void> {

const output = this.channel.getWriteBuffer();

// Check if the last argument of the received args is the key for indicating that a cancellation token should be used
Expand Down Expand Up @@ -207,6 +209,9 @@ export class RpcProtocol {
}

protected async handleNotify(id: number, method: string, args: any[]): Promise<void> {
if (this.toDispose.disposed) {
return;
}
this.onNotificationEmitter.fire({ method, args });
}
}
22 changes: 14 additions & 8 deletions packages/core/src/common/messaging/abstract-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,16 @@ export abstract class AbstractConnectionProvider<AbstractOptions extends object>
return factory.createProxy();
}

protected channelMultiPlexer: ChannelMultiplexer;
protected channelMultiPlexer?: ChannelMultiplexer;

constructor() {
this.channelMultiPlexer = this.createMultiplexer();
}
// A set of channel opening functions that are executed if the backend reconnects to restore the
// the channels that were open before the disconnect occurred.
protected reconnectChannelOpeners: Array<() => Promise<void>> = [];

protected createMultiplexer(): ChannelMultiplexer {
return new ChannelMultiplexer(this.createMainChannel());
protected initializeMultiplexer(): void {
const mainChannel = this.createMainChannel();
mainChannel.onMessage(() => this.onIncomingMessageActivityEmitter.fire());
this.channelMultiPlexer = new ChannelMultiplexer(mainChannel);
}

/**
Expand All @@ -91,18 +93,22 @@ export abstract class AbstractConnectionProvider<AbstractOptions extends object>
}

async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise<void> {
if (!this.channelMultiPlexer) {
throw new Error('The channel multiplexer has not been initialized yet!');
}
const newChannel = await this.channelMultiPlexer.open(path);
newChannel.onClose(() => {
const { reconnecting } = { reconnecting: true, ...options };
if (reconnecting) {
this.openChannel(path, handler, options);
this.reconnectChannelOpeners.push(() => this.openChannel(path, handler, options));
}
});

handler(newChannel);
}

/**
* Create the main connection that is used for multiplexing all channels.
* Create the main connection that is used for multiplexing all service channels.
*/
protected abstract createMainChannel(): Channel;

Expand Down
8 changes: 5 additions & 3 deletions packages/core/src/common/messaging/proxy-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,11 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
this.connectionPromiseResolve = resolve
);
this.connectionPromise.then(connection => {
connection.channel.onClose(() =>
this.onDidCloseConnectionEmitter.fire(undefined)
);
connection.channel.onClose(() => {
this.onDidCloseConnectionEmitter.fire(undefined);
// Wait for connection in case the backend reconnects
this.waitForConnection();
});
this.onDidOpenConnectionEmitter.fire(undefined);
});
}
Expand Down
11 changes: 6 additions & 5 deletions packages/core/src/common/messaging/web-socket-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Emitter, Event } from '../event';
import { WriteBuffer } from '../message-rpc';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer';
import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel';
import { DisposableCollection } from '../disposable';

/**
* A channel that manages the main websocket connection between frontend and backend. All service channels
Expand All @@ -44,8 +45,12 @@ export class WebSocketChannel implements Channel {
return this.onErrorEmitter.event;
}

protected toDispose = new DisposableCollection();

constructor(protected readonly socket: IWebSocket) {
this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]);
socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code }));
socket.onClose(() => this.close());
socket.onError(error => this.onErrorEmitter.fire(error));
// eslint-disable-next-line arrow-body-style
socket.onMessage(data => this.onMessageEmitter.fire(() => {
Expand All @@ -62,19 +67,15 @@ export class WebSocketChannel implements Channel {
result.onCommit(buffer => {
if (this.socket.isConnected()) {
this.socket.send(buffer);
} else {
console.warn('Could not send message. Websocket is not connected');
}
});

return result;
}

close(): void {
this.toDispose.dispose();
this.socket.close();
this.onCloseEmitter.dispose();
this.onMessageEmitter.dispose();
this.onErrorEmitter.dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider<El
return container.get(ElectronIpcConnectionProvider).createProxy<T>(path, arg);
}

constructor() {
super();
this.initializeMultiplexer();
}

protected createMainChannel(): Channel {
const onMessageEmitter = new Emitter<MessageProvider>();
ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => {
Expand All @@ -46,7 +51,6 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider<El
getWriteBuffer: () => {
const writer = new Uint8ArrayWriteBuffer();
writer.onCommit(buffer =>
// The ipcRenderer cannot handle ArrayBuffers directly=> we have to convert to Uint8Array.
ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer)
);
return writer;
Expand Down
Loading

0 comments on commit 53262af

Please sign in to comment.