From 8e37d84f197ea69df450fa432a034b38420d0648 Mon Sep 17 00:00:00 2001 From: Steven Gum <14935595+stevengum@users.noreply.github.com> Date: Tue, 4 Feb 2020 15:27:47 -0800 Subject: [PATCH] [Streaming] add generic socket closure handling (#1627) * add generic socket closure handling * add isConnected to WebSocketServer * expose new property via BotFrameworkAdapter * add check to verify socket connection is open in sendActivities * add optional isConnected to IStreamingTransportServer * add server.isConnected check to StreamingHttpClient * apply @christopheranderson PR feedback Co-authored-by: Christopher Anderson --- .../botbuilder/src/botFrameworkAdapter.ts | 10 +++++ .../src/streaming/streamingHttpClient.ts | 7 +++- .../botFrameworkAdapterStreaming.test.js | 21 +++++++++- .../streaming/streamingHttpClient.test.js | 40 +++++++++++++++++++ .../interfaces/IStreamingTransportServer.ts | 1 + .../src/namedPipe/namedPipeServer.ts | 11 +++++ .../src/webSocket/webSocketServer.ts | 13 ++++++ .../tests/NamedPipe.test.js | 17 ++++++++ .../tests/WebSocket.test.js | 9 +++++ 9 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 libraries/botbuilder/tests/streaming/streamingHttpClient.test.js diff --git a/libraries/botbuilder/src/botFrameworkAdapter.ts b/libraries/botbuilder/src/botFrameworkAdapter.ts index bb0f549e29..d44e000f64 100644 --- a/libraries/botbuilder/src/botFrameworkAdapter.ts +++ b/libraries/botbuilder/src/botFrameworkAdapter.ts @@ -224,6 +224,13 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide } + /** + * Used in streaming contexts to check if the streaming connection is still open for the bot to send activities. + */ + public get isStreamingConnectionOpen(): boolean { + return this.streamingServer.isConnected; + } + /** * Asynchronously resumes a conversation with a user, possibly after some time has gone by. * @@ -835,6 +842,9 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide throw new Error(`BotFrameworkAdapter.sendActivity(): missing conversation id.`); } if (activity && BotFrameworkAdapter.isStreamingServiceUrl(activity.serviceUrl)) { + if (!this.isStreamingConnectionOpen) { + throw new Error('BotFrameworkAdapter.sendActivities(): Unable to send activity as Streaming connection is closed.'); + } TokenResolver.checkForOAuthCards(this, context, activity as Activity); } const client = this.getOrCreateConnectorClient(context, activity.serviceUrl, this.credentials); diff --git a/libraries/botbuilder/src/streaming/streamingHttpClient.ts b/libraries/botbuilder/src/streaming/streamingHttpClient.ts index 6f799806e0..6a2748d600 100644 --- a/libraries/botbuilder/src/streaming/streamingHttpClient.ts +++ b/libraries/botbuilder/src/streaming/streamingHttpClient.ts @@ -34,8 +34,13 @@ export class StreamingHttpClient implements HttpClient { */ public async sendRequest(httpRequest: WebResource): Promise { if (!httpRequest) { - throw new Error('SendRequest invalid parameter: httpRequest should be provided'); + throw new Error('StreamingHttpClient.sendRequest(): missing "httpRequest" parameter'); } + if (!this.server.isConnected) { + throw new Error('StreamingHttpClient.sendRequest(): Streaming connection is disconnected, and the request could not be sent.'); + + } + const request = this.mapHttpRequestToProtocolRequest(httpRequest); request.path = request.path.substring(request.path.indexOf('/v3')); const res = await this.server.send(request); diff --git a/libraries/botbuilder/tests/streaming/botFrameworkAdapterStreaming.test.js b/libraries/botbuilder/tests/streaming/botFrameworkAdapterStreaming.test.js index a75afc2c7d..02f794b3b8 100644 --- a/libraries/botbuilder/tests/streaming/botFrameworkAdapterStreaming.test.js +++ b/libraries/botbuilder/tests/streaming/botFrameworkAdapterStreaming.test.js @@ -2,7 +2,7 @@ const { Socket } = require('net'); const { expect } = require('chai'); const { spy } = require('sinon'); -const { ActivityHandler, ActivityTypes } = require('botbuilder-core'); +const { ActivityHandler, ActivityTypes, TurnContext } = require('botbuilder-core'); const { BotFrameworkAdapter, StatusCodes } = require('../../'); @@ -50,6 +50,25 @@ describe('BotFrameworkAdapter Streaming tests', () => { expect(adapter.streamingServer.disconnect()).to.not.throw; }); + it('sendActivities should throw an error if streaming connection is closed.', async () => { + const activity = { + serviceUrl: 'urn:botframework:WebSocket:wss://beep.com', + type: 'message' + }; + const reply = { + conversation: { id: 'convo1' }, + ...activity + }; + + const adapter = new BotFrameworkAdapter({}); + adapter.streamingServer = { isConnected: false }; + try { + await adapter.sendActivities(new TurnContext(adapter, activity), [reply]); + } catch (err) { + expect(err.message).contains('BotFrameworkAdapter.sendActivities(): Unable to send activity as Streaming connection is closed.'); + } + }); + it('starts and stops a websocket server', async () => { const bot = new ActivityHandler(); const adapter = new BotFrameworkAdapter(new TestAdapterSettings()); diff --git a/libraries/botbuilder/tests/streaming/streamingHttpClient.test.js b/libraries/botbuilder/tests/streaming/streamingHttpClient.test.js new file mode 100644 index 0000000000..bf58b47817 --- /dev/null +++ b/libraries/botbuilder/tests/streaming/streamingHttpClient.test.js @@ -0,0 +1,40 @@ +const { expect } = require('chai'); +const { StreamingHttpClient } = require('../../lib'); + +describe('StreamingHttpClient', function() { + this.timeout(3000); + + it('should construct when provided a server', () => { + const server = { isConnected: true }; + const client = new StreamingHttpClient(server); + expect(client.server).to.equal(server); + }); + + it('should throw an error if missing the "server" parameter', () => { + try { + new StreamingHttpClient(); + } catch (err) { + expect(err.message).to.contain('StreamingHttpClient: Expected server.'); + } + }); + + it('should throw an error on sendRequest if missing "httpRequest" parameter', async () => { + const client = new StreamingHttpClient({}); + try { + await client.sendRequest(); + } catch (err) { + expect(err).to.be.instanceOf(Error); + expect(err.message).to.contain('StreamingHttpClient.sendRequest(): missing "httpRequest" parameter'); + } + }); + + it('should throw an error on sendRequest if internal server is not connected', async () => { + const client = new StreamingHttpClient({}); + try { + await client.sendRequest({}); + } catch (err) { + expect(err).to.be.instanceOf(Error); + expect(err.message).to.contain('StreamingHttpClient.sendRequest(): Streaming connection is disconnected'); + } + }); +}); diff --git a/libraries/botframework-streaming/src/interfaces/IStreamingTransportServer.ts b/libraries/botframework-streaming/src/interfaces/IStreamingTransportServer.ts index 6427a0fe8c..8f3657882b 100644 --- a/libraries/botframework-streaming/src/interfaces/IStreamingTransportServer.ts +++ b/libraries/botframework-streaming/src/interfaces/IStreamingTransportServer.ts @@ -16,4 +16,5 @@ export interface IStreamingTransportServer { start(): Promise; disconnect(): void; send(request: StreamingRequest): Promise; + isConnected?: boolean; } diff --git a/libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts b/libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts index 243dd035f1..3d3a61b358 100644 --- a/libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts +++ b/libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts @@ -40,6 +40,10 @@ export class NamedPipeServer implements IStreamingTransportServer { * @param autoReconnect Optional setting to determine if the client sould attempt to reconnect automatically on disconnection events. Defaults to true. */ public constructor(baseName: string, requestHandler?: RequestHandler, autoReconnect: boolean = true) { + if (!baseName) { + throw new TypeError('NamedPipeServer: Missing baseName parameter'); + } + this._baseName = baseName; this._requestHandler = requestHandler; this._autoReconnect = autoReconnect; @@ -51,6 +55,13 @@ export class NamedPipeServer implements IStreamingTransportServer { this._receiver.disconnected = this.onConnectionDisconnected.bind(this); } + /** + * Returns true if currently connected. + */ + public get isConnected(): boolean { + return !!(this._receiver.isConnected && this._sender.isConnected); + } + /** * Used to establish the connection used by this server and begin listening for incoming messages. * diff --git a/libraries/botframework-streaming/src/webSocket/webSocketServer.ts b/libraries/botframework-streaming/src/webSocket/webSocketServer.ts index a12e545309..04d1dc634d 100644 --- a/libraries/botframework-streaming/src/webSocket/webSocketServer.ts +++ b/libraries/botframework-streaming/src/webSocket/webSocketServer.ts @@ -30,6 +30,7 @@ export class WebSocketServer implements IStreamingTransportServer { private readonly _protocolAdapter: ProtocolAdapter; private readonly _webSocketTransport: WebSocketTransport; private _closedSignal; + private _socket: ISocket; /** * Creates a new instance of the [WebSocketServer](xref:botframework-streaming.WebSocketServer) class. @@ -38,6 +39,11 @@ export class WebSocketServer implements IStreamingTransportServer { * @param requestHandler Optional [RequestHandler](xref:botframework-streaming.RequestHandler) to process incoming messages received by this server. */ public constructor(socket: ISocket, requestHandler?: RequestHandler) { + if (!socket) { + throw new TypeError('WebSocketServer: Missing socket parameter'); + } + + this._socket = socket; this._webSocketTransport = new WebSocketTransport(socket); this._requestHandler = requestHandler; @@ -53,6 +59,13 @@ export class WebSocketServer implements IStreamingTransportServer { this._closedSignal = (x: string): string => { return x; }; } + /** + * Examines the stored ISocket and returns true if the socket connection is open. + */ + public get isConnected(): boolean { + return this._socket.isConnected; + } + /** * Used to establish the connection used by this server and begin listening for incoming messages. * diff --git a/libraries/botframework-streaming/tests/NamedPipe.test.js b/libraries/botframework-streaming/tests/NamedPipe.test.js index a66a4c9c22..9c05c3f975 100644 --- a/libraries/botframework-streaming/tests/NamedPipe.test.js +++ b/libraries/botframework-streaming/tests/NamedPipe.test.js @@ -347,6 +347,15 @@ describe('Streaming Extensions NamedPipe Library Tests', () => { expect(server.disconnect()).to.not.throw; }); + it('throws a TypeError during construction if missing the "baseName" parameter', () => { + try { + new np.NamedPipeServer(); + } catch (err) { + expect(err).to.be.instanceOf(TypeError); + expect(err.message).to.contain('NamedPipeServer: Missing baseName parameter'); + } + }); + it('starts the server without throwing', () => { let server = new np.NamedPipeServer('pipeA', new protocol.RequestHandler(), false); expect(server).to.be.instanceOf(np.NamedPipeServer); @@ -362,6 +371,14 @@ describe('Streaming Extensions NamedPipe Library Tests', () => { expect(server.disconnect()).to.not.throw; }); + it('returns true if isConnected === true on _receiver & _sender', () => { + const server = new np.NamedPipeServer('pipeisConnected', new protocol.RequestHandler(), false); + + expect(server.isConnected).to.be.false; + server._receiver = { isConnected: true }; + server._sender = { isConnected: true }; + expect(server.isConnected).to.be.true; + }); it('sends without throwing', (done) => { let server = new np.NamedPipeServer('pipeA', new protocol.RequestHandler(), false); diff --git a/libraries/botframework-streaming/tests/WebSocket.test.js b/libraries/botframework-streaming/tests/WebSocket.test.js index 1b49e8153e..450512fd8b 100644 --- a/libraries/botframework-streaming/tests/WebSocket.test.js +++ b/libraries/botframework-streaming/tests/WebSocket.test.js @@ -195,6 +195,15 @@ describe('Streaming Extensions WebSocket Library Tests', () => { expect(server.disconnect()).to.not.throw; }); + it('throws a TypeError during construction if missing the "socket" parameter', () => { + try { + new ws.WebSocketServer(); + } catch (err) { + expect(err).to.be.instanceOf(TypeError); + expect(err.message).to.contain('WebSocketServer: Missing socket parameter'); + } + }); + it('connects', (done) => { let server = new ws.WebSocketServer(new FauxSock, new protocol.RequestHandler()); expect(server).to.be.instanceOf(ws.WebSocketServer);