diff --git a/packages/rpc-tcp/src/client.ts b/packages/rpc-tcp/src/client.ts index 3e3213a5c..41e9c485a 100644 --- a/packages/rpc-tcp/src/client.ts +++ b/packages/rpc-tcp/src/client.ts @@ -30,6 +30,7 @@ export class RpcTcpClientAdapter implements ClientTransportAdapter { }); socket.on('error', (error: any) => { + error = error instanceof Error ? error : new Error(String(error)); connection.onError(error); }); diff --git a/packages/rpc/src/client/client.ts b/packages/rpc/src/client/client.ts index d7d662a5a..7ef0ba9c3 100644 --- a/packages/rpc/src/client/client.ts +++ b/packages/rpc/src/client/client.ts @@ -71,7 +71,7 @@ export interface TransportConnectionHooks { onData(buffer: Uint8Array, bytes?: number): void; - onError(error: any): void; + onError(error: Error): void; } export interface ClientTransportAdapter { @@ -88,7 +88,7 @@ export interface WritableClient { connectionId?: number, peerId?: string, timeout?: number - } + }, ): RpcMessageSubject; } @@ -122,19 +122,34 @@ export class RpcClientTransporter { public id?: Uint8Array; /** - * true when the connection fully established (after authentication) + * When the connection is established (including handshake and authentication). */ public readonly connection = new BehaviorSubject(false); + + /** + * When the connection was reconnected. This is not called for the very first connection. + */ public readonly reconnected = new Subject(); + + /** + * When the connection was disconnected (due to error or close). + * This increases the connectionId by one. + */ public readonly disconnected = new Subject(); + /** + * Triggered for any onError call from the transporter. + * Right after this event, onDisconnect is called (and thus connection.next(false) and disconnected.next()). + */ + public readonly errored = new Subject<{ connectionId: number, error: Error }>(); + public reader = new RpcMessageReader( (v) => this.onMessage(v), (id) => { if (this.writer) { this.writer.write(createRpcMessage(id, RpcTypes.ChunkAck)); } - } + }, ); public constructor( @@ -159,7 +174,14 @@ export class RpcClientTransporter { return this.connected; } - protected onError() { + protected onError(error: Error) { + if (this.connected) { + // We do not want to call errored if we are not yet connected, + // since errors thrown while in connection process are forwarded + // to the connection promise (and thus are thrown in connect() + // or in any rpc action). + this.errored.next({ connectionId: this.connectionId, error }); + } this.onDisconnect(); } @@ -256,8 +278,8 @@ export class RpcClientTransporter { resolve(undefined); }, - onError: (error: Event) => { - this.onError(); + onError: (error: Error) => { + this.onError(error); reject(new OfflineError(`Could not connect: ${formatError(error)}`)); }, @@ -315,7 +337,10 @@ export class RpcClientPeer { } - public controller(nameOrDefinition: string | ControllerDefinition, options: { timeout?: number, dontWaitForConnection?: true } = {}): RemoteController { + public controller(nameOrDefinition: string | ControllerDefinition, options: { + timeout?: number, + dontWaitForConnection?: true + } = {}): RemoteController { const controller = new RpcControllerState('string' === typeof nameOrDefinition ? nameOrDefinition : nameOrDefinition.path); controller.peerId = this.peerId; @@ -324,7 +349,7 @@ export class RpcClientPeer { return (...args: any[]) => { return this.actionClient.action(controller, propertyName as string, args, options); }; - } + }, }) as any as RemoteController; } @@ -335,8 +360,12 @@ export class RpcClientPeer { export type RpcEventMessage = { id: number, date: Date, type: number, body: any }; -export type RpcClientEventIncomingMessage = { event: 'incoming', composite: boolean, messages: RpcEventMessage[] } & RpcEventMessage; -export type RpcClientEventOutgoingMessage = { event: 'outgoing', composite: boolean, messages: RpcEventMessage[] } & RpcEventMessage; +export type RpcClientEventIncomingMessage = + { event: 'incoming', composite: boolean, messages: RpcEventMessage[] } + & RpcEventMessage; +export type RpcClientEventOutgoingMessage = + { event: 'outgoing', composite: boolean, messages: RpcEventMessage[] } + & RpcEventMessage; export type RpcClientEvent = RpcClientEventIncomingMessage | RpcClientEventOutgoingMessage; @@ -355,7 +384,7 @@ export class RpcBaseClient implements WritableClient { public events = new Subject(); constructor( - protected transport: ClientTransportAdapter + protected transport: ClientTransportAdapter, ) { this.transporter = new RpcClientTransporter(this.transport); this.transporter.onMessage = this.onMessage.bind(this); @@ -443,7 +472,7 @@ export class RpcBaseClient implements WritableClient { connectionId?: number, peerId?: string, timeout?: number - } = {} + } = {}, ): RpcMessageSubject { const resolvedSchema = schema ? resolveReceiveType(schema) : undefined; if (body && !schema) throw new Error('Body given, but not type'); @@ -452,7 +481,7 @@ export class RpcBaseClient implements WritableClient { const dontWaitForConnection = !!options.dontWaitForConnection; // const timeout = options && options.timeout ? options.timeout : 0; - const continuation = (type: number, body?: T, schema?: ReceiveType,) => { + const continuation = (type: number, body?: T, schema?: ReceiveType) => { if (connectionId === this.transporter.connectionId) { //send a message with the same id. Don't use sendMessage() again as this would lead to a memory leak // and a new id generated. We want to use the same id. @@ -460,7 +489,7 @@ export class RpcBaseClient implements WritableClient { this.events.next({ event: 'outgoing', date: new Date, - id, type, body, messages: [], composite: false + id, type, body, messages: [], composite: false, }); } const message = createRpcMessage(id, type, body, undefined, schema); @@ -488,7 +517,7 @@ export class RpcBaseClient implements WritableClient { this.events.next({ event: 'outgoing', date: new Date, - id, type, body, messages: [], composite: false + id, type, body, messages: [], composite: false, }); } @@ -505,14 +534,14 @@ export class RpcBaseClient implements WritableClient { this.events.next({ event: 'outgoing', date: new Date, - id, type, body, messages: [], composite: false + id, type, body, messages: [], composite: false, }); } this.transporter.send(message, progress?.upload); }, (e) => { subject.next(new ErroredRpcMessage(id, e)); - } + }, ); } @@ -586,7 +615,7 @@ export class RpcClient extends RpcBaseClient { write: (answer: Uint8Array) => { //should we modify the package? this.transporter.send(answer); - } + }, }; //todo: set up timeout for idle detection. Make the timeout configurable @@ -614,7 +643,7 @@ export class RpcClient extends RpcBaseClient { }, bufferedAmount: () => { return this.transporter.bufferedAmount(); - } + }, }); // Important to disable since transporter.send chunks already, // otherwise data is chunked twice and protocol breaks. @@ -667,7 +696,7 @@ export class RpcClient extends RpcBaseClient { deregister: async () => { await this.sendMessage(RpcTypes.PeerDeregister, { id }).firstThenClose(RpcTypes.Ack); this.registeredAsPeer = undefined; - } + }, }; } @@ -689,7 +718,11 @@ export class RpcClient extends RpcBaseClient { return peer; } - public controller(nameOrDefinition: string | ControllerDefinition, options: { timeout?: number, dontWaitForConnection?: true, typeReuseDisabled?: boolean } = {}): RemoteController { + public controller(nameOrDefinition: string | ControllerDefinition, options: { + timeout?: number, + dontWaitForConnection?: true, + typeReuseDisabled?: boolean + } = {}): RemoteController { const controller = new RpcControllerState('string' === typeof nameOrDefinition ? nameOrDefinition : nameOrDefinition.path); options = options || {}; @@ -702,7 +735,7 @@ export class RpcClient extends RpcBaseClient { return (...args: any[]) => { return this.actionClient.action(controller, propertyName as string, args, options); }; - } + }, }) as any as RemoteController; } diff --git a/packages/rpc/tests/connection.spec.ts b/packages/rpc/tests/connection.spec.ts new file mode 100644 index 000000000..f0ff855bc --- /dev/null +++ b/packages/rpc/tests/connection.spec.ts @@ -0,0 +1,50 @@ +import { expect, test } from '@jest/globals'; +import { RpcKernel } from '../src/server/kernel.js'; +import { RpcClient, TransportConnectionHooks } from '../src/client/client.js'; + +test('connect', async () => { + const kernel = new RpcKernel(); + + const connections: TransportConnectionHooks[] = []; + + const client = new RpcClient({ + connect(connection: TransportConnectionHooks) { + const kernelConnection = kernel.createConnection({ + write: (buffer) => connection.onData(buffer), + close: () => { + connection.onClose(); + }, + }); + + connections.push(connection); + + connection.onConnected({ + clientAddress: () => { + return 'direct'; + }, + bufferedAmount(): number { + return 0; + }, + close() { + kernelConnection.close(); + }, + send(buffer) { + kernelConnection.feed(buffer); + }, + }); + }, + }); + + const errors: Error[] = []; + client.transporter.errored.subscribe((error) => { + errors.push(error.error); + }); + + await client.connect(); + expect(client.transporter.isConnected()).toBe(true); + expect(connections).toHaveLength(1); + + connections[0].onError(new Error('test')); + expect(errors[0].message).toEqual('test'); + expect(client.transporter.isConnected()).toBe(false); +}); diff --git a/packages/rpc/tests/custom-message.spec.ts b/packages/rpc/tests/custom-message.spec.ts index c0a601e3e..10f5b9f47 100644 --- a/packages/rpc/tests/custom-message.spec.ts +++ b/packages/rpc/tests/custom-message.spec.ts @@ -56,13 +56,13 @@ test('back controller', async () => { const client = new DirectClient(kernel); - // This wait for the server's Ack + // This waits for the server's Ack const answer = await client .sendMessage(MyTypes.QueryAndAnswer) .firstThenClose<{ v: string }>(MyTypes.Answer); expect(answer.v).toBe('42 is the answer'); - // This wait for the server's Ack + // This waits for the server's Ack await client .sendMessage<{v: string}>(MyTypes.BroadcastWithAck, {v: 'Hi1'}) .ackThenClose();