Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use ChannelMultiplexer in RPCProtocol #13980

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 0 additions & 143 deletions packages/plugin-ext/src/common/proxy-handler.ts

This file was deleted.

128 changes: 62 additions & 66 deletions packages/plugin-ext/src/common/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@

/* eslint-disable @typescript-eslint/no-explicit-any */

import { Channel, Disposable, DisposableCollection, isObject, ReadBuffer, URI, WriteBuffer } from '@theia/core';
import { Channel, Disposable, DisposableCollection, isObject, ReadBuffer, RpcProtocol, URI, WriteBuffer } from '@theia/core';
import { Emitter, Event } from '@theia/core/lib/common/event';
import { ChannelMultiplexer, MessageProvider } from '@theia/core/lib/common/message-rpc/channel';
import { MsgPackMessageDecoder, MsgPackMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder';
import { MessageProvider } from '@theia/core/lib/common/message-rpc/channel';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer';
import { ClientProxyHandler, ProxySynchronizer, RpcInvocationHandler } from './proxy-handler';
import { MsgPackExtensionManager } from '@theia/core/lib/common/message-rpc/msg-pack-extension-manager';
import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri';
import { BinaryBuffer } from '@theia/core/lib/common/buffer';
import { Range, Position } from '../plugin/types-impl';
import { Deferred } from '@theia/core/lib/common/promise-util';

export interface MessageConnection {
send(msg: string): void;
Expand Down Expand Up @@ -79,21 +76,36 @@ export namespace ConnectionClosedError {
}

export class RPCProtocolImpl implements RPCProtocol {
private readonly locals = new Map<string, RpcInvocationHandler>();
private readonly locals = new Map<string, any>();
private readonly proxies = new Map<string, any>();
private readonly multiplexer: ChannelMultiplexer;
private readonly encoder = new MsgPackMessageEncoder();
private readonly decoder = new MsgPackMessageDecoder();
private readonly initCallback: ProxySynchronizer;
private readonly rpc: RpcProtocol;

private readonly toDispose = new DisposableCollection(
Disposable.create(() => { /* mark as no disposed */ })
);

constructor(channel: Channel) {
this.toDispose.push(this.multiplexer = new ChannelMultiplexer(new BatchingChannel(channel)));
this.rpc = new RpcProtocol(new BatchingChannel(channel), (method, args) => this.handleRequest(method, args));
this.rpc.onNotification((evt: { method: string; args: any[]; }) => this.handleNotification(evt.method, evt.args));
this.toDispose.push(Disposable.create(() => this.proxies.clear()));
this.initCallback = new ProxySynchronizerImpl();
}

handleNotification(method: any, args: any[]): void {
const serviceId = args[0] as string;
const handler: any = this.locals.get(serviceId);
if (!handler) {
throw new Error(`no local service handler with id ${serviceId}`);
}
handler[method](...(args.slice(1)));
}

handleRequest(method: string, args: any[]): Promise<any> {
const serviceId = args[0] as string;
const handler: any = this.locals.get(serviceId);
if (!handler) {
throw new Error(`no local service handler with id ${serviceId}`);
}
return handler[method](...(args.slice(1)));
}

dispose(): void {
Expand All @@ -117,76 +129,60 @@ export class RPCProtocolImpl implements RPCProtocol {
}

protected createProxy<T>(proxyId: string): T {
const handler = new ClientProxyHandler({
id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId), proxySynchronizer: this.initCallback
});
const handler = {
get: (target: any, name: string, receiver: any): any => {
if (target[name] || name.charCodeAt(0) !== 36 /* CharCode.DollarSign */) {
// not a remote property
return target[name];
}
const isNotify = this.isNotification(name);
return async (...args: any[]) => {
const method = name.toString();
if (isNotify) {
this.rpc.sendNotification(method, [proxyId, ...args]);
} else {
return await this.rpc.sendRequest(method, [proxyId, ...args]) as Promise<any>;
}
};
}

};
return new Proxy(Object.create(null), handler);
}

/**
* Return whether the given property represents a notification. If true,
* the promise returned from the invocation will resolve immediately to `undefined`
*
* A property leads to a notification rather than a method call if its name
* begins with `notify` or `on`.
*
* @param p - The property being called on the proxy.
* @return Whether `p` represents a notification.
*/
protected isNotification(p: PropertyKey): boolean {
let propertyString = p.toString();
if (propertyString.charCodeAt(0) === 36/* CharCode.DollarSign */) {
propertyString = propertyString.substring(1);
}
return propertyString.startsWith('notify') || propertyString.startsWith('on');
}

set<T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R {
if (this.isDisposed) {
throw ConnectionClosedError.create();
}
const invocationHandler = this.locals.get(identifier.id);
if (!invocationHandler) {
const handler = new RpcInvocationHandler({ id: identifier.id, target: instance, encoder: this.encoder, decoder: this.decoder });

const channel = this.multiplexer.getOpenChannel(identifier.id);
if (channel) {
handler.listen(channel);
} else {
const channelOpenListener = this.multiplexer.onDidOpenChannel(event => {
if (event.id === identifier.id) {
handler.listen(event.channel);
channelOpenListener.dispose();
}
});
}

this.locals.set(identifier.id, handler);
if (!this.locals.has(identifier.id)) {
this.locals.set(identifier.id, instance);
if (Disposable.is(instance)) {
this.toDispose.push(instance);
}
this.toDispose.push(Disposable.create(() => this.locals.delete(identifier.id)));

}
return instance;
}
}

export class ProxySynchronizerImpl implements ProxySynchronizer {

private readonly runningInitializations = new Set<string>();

private _pendingProxyInitializations: Deferred<void>;

constructor() {
this._pendingProxyInitializations = new Deferred();
/* after creation no init is active */
this._pendingProxyInitializations.resolve();
}

startProxyInitialization(id: string, init: Promise<void>): void {
if (this.runningInitializations.size === 0) {
this._pendingProxyInitializations = new Deferred();
}
init.then(() => this.finishedProxyInitialization(id));
this.runningInitializations.add(id);
}

protected finishedProxyInitialization(id: string): void {
this.runningInitializations.delete(id);
if (this.runningInitializations.size === 0) {
this._pendingProxyInitializations.resolve();
}
}

pendingProxyInitializations(): Promise<void> {
return this._pendingProxyInitializations.promise;
}

}

/**
* Wraps and underlying channel to send/receive multiple messages in one go:
* - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`.
Expand Down
Loading