Skip to content

Commit

Permalink
Ignore "responses" that don't have 'result' or 'error' keys (#4712)
Browse files Browse the repository at this point in the history
Speculative change to address
#4663.

As noted in that issue, this is mostly a workaround for an issue that
exists in the Python kernel wherein comm messages emitted always have
the parent ID of the most recent message to the shell (even if they are
not a reply to that message). This can cause problems in Positron since
it assumes that, after a message is sent, the next message with a
matching `parent_id` is the reply to that message.

The fix is to check a message for `result` or `error` keys before
treating it as an RPC response. We can't do this for _all_ messages
since many codepaths/comms do not use the JSON-RPC structure, but we do
it for all formally defined Positron comms.

This is admittedly a little janky. Some things that could make this
better (but are higher risk than this change):
- a more robust way of identifying Positron's JSON-RPC comm messages
- having Positron supply its own ID with each request that must be
returned in the reply body in order to deterministically pair requests
and replies (i.e. don't rely on `parent_id` since it can be hard to
control)
- moving Positron comms off the shell socket (@lionel- has suggested
e.g. moving them to the control socket to allow them to operate
independently of the current shell command)

### QA Notes

This change is small but it's a hot codepath -- almost every comm
message goes through here. Sanity test the data explorer, and
ipywidgets. They use this codepath but specifically do not want the new
behavior added here.
  • Loading branch information
jmcphers committed Sep 18, 2024
1 parent 6af852f commit 50ff11f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
61 changes: 45 additions & 16 deletions src/vs/workbench/api/browser/positron/mainThreadLanguageRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,19 @@ class ExtHostLanguageRuntimeSessionAdapter implements ILanguageRuntimeSession {
static clientCounter = 0;
}

/**
* Represents a pending RPC call that has been sent to the server side of a
* comm.
*/
class PendingRpc<T> {
public readonly promise: DeferredPromise<IRuntimeClientOutput<T>>;
constructor(
public readonly responseKeys: Array<string>
) {
this.promise = new DeferredPromise<IRuntimeClientOutput<T>>();
}
}

/**
* Represents the front-end instance of a client widget inside a language runtime.
*
Expand All @@ -913,7 +926,7 @@ class ExtHostRuntimeClientInstance<Input, Output>

private readonly _dataEmitter = new Emitter<IRuntimeClientOutput<Output>>();

private readonly _pendingRpcs = new Map<string, DeferredPromise<any>>();
private readonly _pendingRpcs = new Map<string, PendingRpc<any>>();

/**
* An observable value that tracks the number of messages sent and received
Expand Down Expand Up @@ -945,16 +958,20 @@ class ExtHostRuntimeClientInstance<Input, Output>
* Performs an RPC call to the server side of the comm.
*
* @param request The request to send to the server.
* @param timeout Timeout in milliseconds after which to error if the server does not respond.
* @param timeout Timeout in milliseconds after which to error if the server
* does not respond.
* @param responseKeys Optional list of keys; if specified, at least one
* must be present in the response. This helps distinguish RPC responses
* from other messages that have the request as the parent ID.
* @returns A promise that will be resolved with the response from the server.
*/
performRpcWithBuffers<T>(request: Input, timeout: number): Promise<IRuntimeClientOutput<T>> {
performRpcWithBuffers<T>(request: Input, timeout: number, responseKeys: Array<string> = []): Promise<IRuntimeClientOutput<T>> {
// Generate a unique ID for this message.
const messageId = generateUuid();

// Add the promise to the list of pending RPCs.
const promise = new DeferredPromise<IRuntimeClientOutput<T>>();
this._pendingRpcs.set(messageId, promise);
const pending = new PendingRpc<T>(responseKeys);
this._pendingRpcs.set(messageId, pending);

// Send the message to the server side.
this._proxy.$sendClientMessage(this._handle, this._id, messageId, request);
Expand All @@ -965,18 +982,18 @@ class ExtHostRuntimeClientInstance<Input, Output>
// Start a timeout to reject the promise if the server doesn't respond.
setTimeout(() => {
// If the promise has already been resolved, do nothing.
if (promise.isSettled) {
if (pending.promise.isSettled) {
return;
}

// Otherwise, reject the promise and remove it from the list of pending RPCs.
const timeoutSeconds = Math.round(timeout / 100) / 10; // round to 1 decimal place
promise.error(new Error(`RPC timed out after ${timeoutSeconds} seconds: ${JSON.stringify(request)}`));
pending.promise.error(new Error(`RPC timed out after ${timeoutSeconds} seconds: ${JSON.stringify(request)}`));
this._pendingRpcs.delete(messageId);
}, timeout);

// Return a promise that will be resolved when the server responds.
return promise.p;
return pending.promise.p;
}

/**
Expand All @@ -985,8 +1002,8 @@ class ExtHostRuntimeClientInstance<Input, Output>
* This method is a convenience wrapper around {@link performRpcWithBuffers} that returns
* only the data portion of the RPC response.
*/
async performRpc<T>(request: Input, timeout: number): Promise<T> {
return (await this.performRpcWithBuffers<T>(request, timeout)).data;
async performRpc<T>(request: Input, timeout: number, responseKeys: Array<string> = []): Promise<T> {
return (await this.performRpcWithBuffers<T>(request, timeout, responseKeys)).data;
}

/**
Expand Down Expand Up @@ -1017,10 +1034,22 @@ class ExtHostRuntimeClientInstance<Input, Output>
this.messageCounter.set(this.messageCounter.get() + 1, undefined);

if (message.parent_id && this._pendingRpcs.has(message.parent_id)) {
// This is a response to an RPC call; resolve the deferred promise.
const promise = this._pendingRpcs.get(message.parent_id);
promise?.complete(message);
this._pendingRpcs.delete(message.parent_id);
// This may be a response to an RPC call.
const pending = this._pendingRpcs.get(message.parent_id)!;

// Read the keys from the response and the pending RPC.
const responseKeys = Object.keys(message.data);


// Are any of the response keys present in the message?
if (pending.responseKeys.length === 0 || pending.responseKeys.some((key: string) => responseKeys.includes(key))) {
// This is a response to an RPC call; resolve the promise.
pending.promise.complete(message);
this._pendingRpcs.delete(message.parent_id);
} else {
// This is a regular message or event; emit it to the client as-is
this._dataEmitter.fire({ data: message.data as Output, buffers: message.buffers });
}
} else {
// This is a regular message; emit it to the client as an event.
this._dataEmitter.fire({ data: message.data as Output, buffers: message.buffers });
Expand Down Expand Up @@ -1048,8 +1077,8 @@ class ExtHostRuntimeClientInstance<Input, Output>

public override dispose(): void {
// Cancel any pending RPCs
for (const promise of this._pendingRpcs.values()) {
promise.error('The language runtime exited before the RPC completed.');
for (const pending of this._pendingRpcs.values()) {
pending.promise.error('The language runtime exited before the RPC completed.');
}

// If we aren't currently closed, clean up before completing disposal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export interface IRuntimeClientInstance<Input, Output> extends Disposable {
getClientId(): string;
getClientType(): RuntimeClientType;
performRpcWithBuffers(request: Input, timeout: number): Promise<IRuntimeClientOutput<Output>>;
performRpc(request: Input, timeout: number): Promise<Output>;
performRpc(request: Input, timeout: number, responseKeys: Array<string>): Promise<Output>;
sendMessage(message: any): void;
messageCounter: ISettableObservable<number>;
clientState: ISettableObservable<RuntimeClientState>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ export class PositronBaseComm extends Disposable {
let response = {} as any;
try {
const timeout = this.options?.[rpcName]?.timeout ?? 5000;
response = await this.clientInstance.performRpc(request, timeout);
// Wait for a response to this message that includes a 'result' or
// 'error' field.
response = await this.clientInstance.performRpc(request, timeout, ['result', 'error']);
} catch (err) {
// Convert the error to a runtime method error. This handles errors
// that occur while performing the RPC; if the RPC is successfully
Expand Down

0 comments on commit 50ff11f

Please sign in to comment.