Skip to content

Commit

Permalink
support persistent websocket
Browse files Browse the repository at this point in the history
Signed-off-by: bob <bob170731@gmail.com>
  • Loading branch information
DanboDuan committed Oct 29, 2022
1 parent 0a67da2 commit 52d6807
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 53 deletions.
38 changes: 15 additions & 23 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../.
import { Endpoint } from '../endpoint';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { io, Socket } from 'socket.io-client';
import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { IWebSocket, PersistentWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { v4 as uuid } from 'uuid';

decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
Expand Down Expand Up @@ -55,20 +56,21 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
this.socket = this.createWebSocket(url);
this.socket.on('connect', () => {
this.initializeMultiplexer();
if (this.reconnectChannelOpeners.length > 0) {
this.reconnectChannelOpeners.forEach(opener => opener());
this.reconnectChannelOpeners = [];
}
this.socket.on('disconnect', () => this.fireSocketDidClose());
this.socket.on('message', () => this.onIncomingMessageActivityEmitter.fire(undefined));
this.fireSocketDidOpen();
});
this.socket.on('disconnect', () => this.fireSocketDidClose());
this.socket.on('message', () => this.onIncomingMessageActivityEmitter.fire(undefined));
this.initializeMultiplexer();
this.socket.connect();
}

protected createMainChannel(): Channel {
return new WebSocketChannel(this.toIWebSocket(this.socket));
const websocket = new PersistentWebSocket(this.toIWebSocket(this.socket));
return new WebSocketChannel(websocket);
}

protected toIWebSocket(socket: Socket): IWebSocket {
Expand All @@ -82,33 +84,23 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
onClose: cb => socket.on('disconnect', reason => cb(reason)),
onError: cb => socket.on('error', reason => cb(reason)),
onMessage: cb => socket.on('message', data => cb(data)),
send: message => socket.emit('message', message)
send: message => socket.emit('message', message),
onConnect: cb => socket.on('connect', cb),
};
}

override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise<void> {
if (this.socket.connected) {
return super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.off('connect', openChannel);
this.openChannel(path, handler, options);
};
this.socket.on('connect', openChannel);
}
}

/**
* @param path The handler to reach in the backend.
*/
protected createWebSocketUrl(path: string): string {
// Since we are using Socket.io, the path should look like the following:
// proto://domain.com/{path}
return new Endpoint().getWebSocketUrl().withPath(path).toString();
}

protected createHttpWebSocketUrl(path: string): string {
return new Endpoint({ path }).getRestUrl().toString();
const url = new Endpoint()
.getWebSocketUrl()
.withPath(path)
// add reconnect key for persistent websocket
.withQuery(`${PersistentWebSocket.ReconnectionKey}=${uuid()}`)
.toString();
}

/**
Expand Down
43 changes: 17 additions & 26 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ export enum MessageTypes {
* messages and always in one go.
*/
export class ChannelMultiplexer implements Disposable {
protected pendingOpen: Map<string, (channel: ForwardingChannel) => void> = new Map();
protected openChannels: Map<string, ForwardingChannel> = new Map();

protected readonly onOpenChannelEmitter = new Emitter<{ id: string, channel: Channel }>();
Expand Down Expand Up @@ -168,7 +167,6 @@ export class ChannelMultiplexer implements Disposable {
onUnderlyingChannelClose(event?: ChannelCloseEvent): void {
if (!this.toDispose.disposed) {
this.toDispose.push(Disposable.create(() => {
this.pendingOpen.clear();
this.openChannels.forEach(channel => {
channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' });
});
Expand All @@ -185,7 +183,7 @@ export class ChannelMultiplexer implements Disposable {
const id = buffer.readString();
switch (type) {
case MessageTypes.AckOpen: {
return this.handleAckOpen(id);
break;
}
case MessageTypes.Open: {
return this.handleOpen(id);
Expand All @@ -199,28 +197,10 @@ export class ChannelMultiplexer implements Disposable {
}
}

protected handleAckOpen(id: string): void {
// edge case: both side try to open a channel at the same time.
const resolve = this.pendingOpen.get(id);
if (resolve) {
const channel = this.createChannel(id);
this.pendingOpen.delete(id);
this.openChannels.set(id, channel);
resolve!(channel);
this.onOpenChannelEmitter.fire({ id, channel });
}
}

protected handleOpen(id: string): void {
if (!this.openChannels.has(id)) {
const channel = this.createChannel(id);
this.openChannels.set(id, channel);
const resolve = this.pendingOpen.get(id);
if (resolve) {
// edge case: both side try to open a channel at the same time.
resolve(channel);
}
this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit();
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand All @@ -237,6 +217,13 @@ export class ChannelMultiplexer implements Disposable {
const channel = this.openChannels.get(id);
if (channel) {
channel.onMessageEmitter.fire(() => data);
} else {
const newChannel = this.createChannel(id);
this.openChannels.set(id, newChannel);
this.onOpenChannelEmitter.fire({ id, channel: newChannel });
setTimeout(() => {
newChannel.onMessageEmitter.fire(() => data);
});
}
}

Expand All @@ -263,11 +250,15 @@ export class ChannelMultiplexer implements Disposable {
}

open(id: string): Promise<Channel> {
const result = new Promise<Channel>((resolve, reject) => {
this.pendingOpen.set(id, resolve);
});
this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit();
return result;
const channel = this.openChannels.get(id) || this.createChannel(id);
this.openChannels.set(id, channel);
this.underlyingChannel
.getWriteBuffer()
.writeUint8(MessageTypes.Open)
.writeString(id)
.commit();
this.onOpenChannelEmitter.fire({ id, channel });
return Promise.resolve(channel);
}

getOpenChannel(id: string): Channel | undefined {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ export abstract class AbstractConnectionProvider<AbstractOptions extends object>
protected reconnectChannelOpeners: Array<() => Promise<void>> = [];

protected initializeMultiplexer(): void {
if (this.channelMultiplexer) {
return;
}
const mainChannel = this.createMainChannel();
mainChannel.onMessage(() => this.onIncomingMessageActivityEmitter.fire());
this.channelMultiplexer = new ChannelMultiplexer(mainChannel);
Expand Down
127 changes: 127 additions & 0 deletions packages/core/src/common/messaging/web-socket-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

/* eslint-disable @typescript-eslint/no-explicit-any */

import { Emitter, Event } from '../event';
import { WriteBuffer } from '../message-rpc';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer';
import { AbstractChannel } from '../message-rpc/channel';
import { Disposable } from '../disposable';
import { ProcessTimeRunOnceScheduler } from '../scheduler';

/**
* A channel that manages the main websocket connection between frontend and backend. All service channels
Expand Down Expand Up @@ -89,5 +91,130 @@ export interface IWebSocket {
* @param cb The callback.
*/
onClose(cb: (reason: string, code?: number) => void): void;
/**
* Listener callback to handle connect events (Remote side).
* @param cb The callback.
*/
onConnect(cb: () => void): void;
}

/**
* A persistant WebSocket implementation based on the `{@link IWebSocket}.
* For the client side, it will always try to reconnect to the remote.
* For the sever side, it will close itself after the `ReconnectionGraceTime` time.
*/
export class PersistentWebSocket implements IWebSocket {

/**
* Maximal grace time between the first and the last reconnection...
*/
static ReconnectionGraceTime: number = 10 * 60 * 1000; // 10 min
static ReconnectionKey: string = 'persistent_key';

public underlyingSocketConnected = true;

private readonly _onCloseEmitter = new Emitter<string>();
private readonly _onClose: Event<string> = this._onCloseEmitter.event;
private _onCloseReason: string;
private readonly _onSocketDisconnect = new Emitter<void>();
public readonly onSocketDisconnect: Event<void> = this._onSocketDisconnect.event;

private readonly _onMessageEmitter = new Emitter<Uint8Array>();
private readonly _onMessage: Event<Uint8Array> = this._onMessageEmitter.event;

private readonly _onErrorEmitter = new Emitter<any>();
private readonly _onError: Event<any> = this._onErrorEmitter.event;

private readonly _onConnectEmitter = new Emitter<void>();
private readonly _onConnect: Event<void> = this._onConnectEmitter.event;

private readonly _reconnectionShortGraceTime: number;
private _disconnectRunner: ProcessTimeRunOnceScheduler;

private pendingData: Uint8Array[] = [];

constructor(protected socket: IWebSocket) {
this._reconnectionShortGraceTime = PersistentWebSocket.ReconnectionGraceTime;
this._disconnectRunner = new ProcessTimeRunOnceScheduler(() => {
this.fireClose();
}, this._reconnectionShortGraceTime);

socket.onClose(reason => {
this.handleSocketDisconnect(reason);
});
socket.onConnect(() => {
this.handleReconnect();
});

socket.onMessage(message => this._onMessageEmitter.fire(message));
socket.onError(message => this._onErrorEmitter.fire(message));
}

send(message: Uint8Array): void {
if (this.socket.isConnected()) {
this.socket.send(message);
} else {
this.pendingData.push(message);
}
}

protected handleSocketDisconnect(reason: string): void {
this.underlyingSocketConnected = false;
this._onCloseReason = reason;
// The socket has closed, let's give the renderer a certain amount of time to reconnect
if (!this._disconnectRunner.isScheduled()) {
this._disconnectRunner.schedule();
}
this._onSocketDisconnect.fire();
}

protected handleReconnect(): void {
this.underlyingSocketConnected = true;
if (this._disconnectRunner.isScheduled()) {
this._disconnectRunner.cancel();
}
const pending = this.pendingData;
this.pendingData = [];
for (const content of pending) {
this.send(content);
}
this._onConnectEmitter.fire();
}

close(): void {
this.socket.close();
}

isConnected(): boolean {
return true;
}

onMessage(cb: (message: Uint8Array) => void): void {
this._onMessage(cb);
}

onError(cb: (reason: any) => void): void {
this._onError(cb);
}

onClose(cb: (reason: string, code?: number | undefined) => void): void {
this._onClose(cb);
}

onConnect(cb: () => void): void {
this._onConnect(cb);
}

public fireClose(): void {
this._onCloseEmitter.fire(this._onCloseReason);
}

public acceptReconnection(socket: IWebSocket): void {
this.socket.close();
this.socket = socket;
socket.onClose(reason => this.handleSocketDisconnect(reason));
socket.onMessage(message => this._onMessageEmitter.fire(message));
socket.onError(message => this._onErrorEmitter.fire(message));
this.handleReconnect();
}
}
Loading

0 comments on commit 52d6807

Please sign in to comment.