diff --git a/packages/terminal/src/browser/terminal-widget-impl.ts b/packages/terminal/src/browser/terminal-widget-impl.ts index 7b0448aab6266..d2a57acd81bdc 100644 --- a/packages/terminal/src/browser/terminal-widget-impl.ts +++ b/packages/terminal/src/browser/terminal-widget-impl.ts @@ -17,7 +17,7 @@ import { Terminal, RendererType } from 'xterm'; import { FitAddon } from 'xterm-addon-fit'; import { inject, injectable, named, postConstruct } from '@theia/core/shared/inversify'; -import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection, RpcProtocol, RequestHandler } from '@theia/core'; +import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection, Channel } from '@theia/core'; import { Widget, Message, WebSocketConnectionProvider, StatefulWidget, isFirefox, MessageLoop, KeyCode, codicon, ExtractableWidget } from '@theia/core/lib/browser'; import { isOSX } from '@theia/core/lib/common'; import { WorkspaceService } from '@theia/workspace/lib/browser'; @@ -66,7 +66,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget protected searchBox: TerminalSearchWidget; protected restored = false; protected closeOnDispose = true; - protected waitForConnection: Deferred | undefined; + protected waitForConnection: Deferred | undefined; protected linkHover: HTMLDivElement; protected linkHoverButton: HTMLAnchorElement; protected lastTouchEnd: TouchEvent | undefined; @@ -563,23 +563,18 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget } this.toDisposeOnConnect.dispose(); this.toDispose.push(this.toDisposeOnConnect); - const waitForConnection = this.waitForConnection = new Deferred(); + const waitForConnection = this.waitForConnection = new Deferred(); this.webSocketConnectionProvider.listen({ path: `${terminalsPath}/${this.terminalId}`, onConnection: connection => { - const requestHandler: RequestHandler = _method => this.logger.warn('Received an unhandled RPC request from the terminal process'); - - const rpc = new RpcProtocol(connection, requestHandler); - rpc.onNotification(event => { - if (event.method === 'onData') { - this.write(event.args[0]); - } + connection.onMessage(e => { + this.write(e().readString()); }); // Excludes the device status code emitted by Xterm.js const sendData = (data?: string) => { if (data && !this.deviceStatusCodes.has(data) && !this.disableEnterWhenAttachCloseListener()) { - return rpc.sendRequest('write', [data]); + connection.getWriteBuffer().writeString(data).commit(); } }; @@ -590,7 +585,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget connection.onClose(() => disposable.dispose()); if (waitForConnection) { - waitForConnection.resolve(rpc); + waitForConnection.resolve(connection); } } }, { reconnecting: false }); @@ -664,7 +659,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget sendText(text: string): void { if (this.waitForConnection) { this.waitForConnection.promise.then(connection => - connection.sendRequest('write', [text]) + connection.getWriteBuffer().writeString(text).commit() ); } } diff --git a/packages/terminal/src/node/buffering-stream.spec.ts b/packages/terminal/src/node/buffering-stream.spec.ts index 8332143490b17..6d54dfbd8f324 100644 --- a/packages/terminal/src/node/buffering-stream.spec.ts +++ b/packages/terminal/src/node/buffering-stream.spec.ts @@ -16,12 +16,12 @@ import { wait } from '@theia/core/lib/common/promise-util'; import { expect } from 'chai'; -import { BufferingStream } from './buffering-stream'; +import { BufferBufferingStream } from './buffering-stream'; describe('BufferringStream', () => { it('should emit whatever data was buffered before the timeout', async () => { - const buffer = new BufferingStream({ emitInterval: 1000 }); + const buffer = new BufferBufferingStream({ emitInterval: 1000 }); const chunkPromise = waitForChunk(buffer); buffer.push(Buffer.from([0])); await wait(100); @@ -33,14 +33,14 @@ describe('BufferringStream', () => { }); it('should not emit chunks bigger than maxChunkSize', async () => { - const buffer = new BufferingStream({ maxChunkSize: 2 }); + const buffer = new BufferBufferingStream({ maxChunkSize: 2 }); buffer.push(Buffer.from([0, 1, 2, 3, 4, 5])); expect(await waitForChunk(buffer)).deep.eq(Buffer.from([0, 1])); expect(await waitForChunk(buffer)).deep.eq(Buffer.from([2, 3])); expect(await waitForChunk(buffer)).deep.eq(Buffer.from([4, 5])); }); - function waitForChunk(buffer: BufferingStream): Promise { + function waitForChunk(buffer: BufferBufferingStream): Promise { return new Promise(resolve => buffer.onData(resolve)); } }); diff --git a/packages/terminal/src/node/buffering-stream.ts b/packages/terminal/src/node/buffering-stream.ts index 0a9d1c30688ac..46bec4bb9a260 100644 --- a/packages/terminal/src/node/buffering-stream.ts +++ b/packages/terminal/src/node/buffering-stream.ts @@ -33,27 +33,32 @@ export interface BufferingStreamOptions { * every {@link BufferingStreamOptions.emitInterval}. It will also ensure that * the emitted chunks never exceed {@link BufferingStreamOptions.maxChunkSize}. */ -export class BufferingStream { - - protected buffer?: Buffer; +export class BufferingStream { + protected buffer?: T; protected timeout?: NodeJS.Timeout; protected maxChunkSize: number; protected emitInterval: number; - protected onDataEmitter = new Emitter(); + protected onDataEmitter = new Emitter(); + protected readonly concat: (left: T, right: T) => T; + protected readonly slice: (what: T, start?: number, end?: number) => T; + protected readonly length: (what: T) => number; - constructor(options?: BufferingStreamOptions) { - this.emitInterval = options?.emitInterval ?? 16; // ms - this.maxChunkSize = options?.maxChunkSize ?? (256 * 1024); // bytes + constructor(options: BufferingStreamOptions = {}, concat: (left: T, right: T) => T, slice: (what: T, start?: number, end?: number) => T, length: (what: T) => number) { + this.emitInterval = options.emitInterval ?? 16; // ms + this.maxChunkSize = options.maxChunkSize ?? (256 * 1024); // bytes + this.concat = concat; + this.slice = slice; + this.length = length; } - get onData(): Event { + get onData(): Event { return this.onDataEmitter.event; } - push(chunk: Buffer): void { + push(chunk: T): void { if (this.buffer) { - this.buffer = Buffer.concat([this.buffer, chunk]); + this.buffer = this.concat(this.buffer, chunk); } else { this.buffer = chunk; this.timeout = setTimeout(() => this.emitBufferedChunk(), this.emitInterval); @@ -67,12 +72,24 @@ export class BufferingStream { } protected emitBufferedChunk(): void { - this.onDataEmitter.fire(this.buffer!.slice(0, this.maxChunkSize)); - if (this.buffer!.byteLength <= this.maxChunkSize) { + this.onDataEmitter.fire(this.slice(this.buffer!, 0, this.maxChunkSize)); + if (this.length(this.buffer!) <= this.maxChunkSize) { this.buffer = undefined; } else { - this.buffer = this.buffer!.slice(this.maxChunkSize); + this.buffer = this.slice(this.buffer!, this.maxChunkSize); this.timeout = setTimeout(() => this.emitBufferedChunk(), this.emitInterval); } } } + +export class StringBufferingStream extends BufferingStream { + constructor(options: BufferingStreamOptions = {}) { + super(options, (left, right) => left.concat(right), (what, start, end) => what.slice(start, end), what => what.length); + } +} + +export class BufferBufferingStream extends BufferingStream { + constructor(options: BufferingStreamOptions = {}) { + super(options, (left, right) => Buffer.concat([left, right]), (what, start, end) => what.slice(start, end), what => what.length); + } +} diff --git a/packages/terminal/src/node/terminal-backend-contribution.ts b/packages/terminal/src/node/terminal-backend-contribution.ts index fb2c0fe0ddce5..d5d45ed9e8d8a 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.ts @@ -15,15 +15,15 @@ // ***************************************************************************** import { injectable, inject, named } from '@theia/core/shared/inversify'; -import { ILogger, RequestHandler } from '@theia/core/lib/common'; +import { ILogger } from '@theia/core/lib/common'; import { TerminalProcess, ProcessManager } from '@theia/process/lib/node'; import { terminalsPath } from '../common/terminal-protocol'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; -import { RpcProtocol } from '@theia/core/'; -import { BufferingStream } from './buffering-stream'; +import { StringBufferingStream } from './buffering-stream'; @injectable() export class TerminalBackendContribution implements MessagingService.Contribution { + protected readonly decoder = new TextDecoder('utf-8'); @inject(ProcessManager) protected readonly processManager: ProcessManager; @@ -39,18 +39,17 @@ export class TerminalBackendContribution implements MessagingService.Contributio const output = termProcess.createOutputStream(); // Create a RPC connection to the terminal process // eslint-disable-next-line @typescript-eslint/no-explicit-any - const requestHandler: RequestHandler = async (method: string, args: any[]) => { - if (method === 'write' && args[0]) { - termProcess.write(args[0]); - } else { - this.logger.warn('Terminal process received a request with an unsupported method or argument', { method, args }); - } - }; + channel.onMessage(e => { + termProcess.write(e().readString()); + }); - const rpc = new RpcProtocol(channel, requestHandler); - const buffer = new BufferingStream(); - buffer.onData(chunk => rpc.sendNotification('onData', [chunk.toString('utf8')])); - output.on('data', chunk => buffer.push(Buffer.from(chunk, 'utf8'))); + const buffer = new StringBufferingStream(); + buffer.onData(chunk => { + channel.getWriteBuffer().writeString(chunk).commit(); + }); + output.on('data', chunk => { + buffer.push(chunk); + }); channel.onClose(() => { buffer.dispose(); output.dispose();