diff --git a/package-lock.json b/package-lock.json index 5220adffff..29c1c7cfdb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,14 @@ "source-map": "0.5.7", "typescript": "2.7.2", "webpack-sources": "1.1.0" + }, + "dependencies": { + "typescript": { + "version": "2.7.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-2.7.2.tgz", + "integrity": "sha512-p5TCYZDAO0m4G344hD+wx/LATebLWZNkkh2asWUFqSsD2OrDNhbAHuSjobrmsUmdzjJjEeZVU9g1h3O6vpstnw==", + "dev": true + } } }, "@sinonjs/formatio": { @@ -12282,9 +12290,9 @@ "dev": true }, "typescript": { - "version": "2.7.2", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-2.7.2.tgz", - "integrity": "sha512-p5TCYZDAO0m4G344hD+wx/LATebLWZNkkh2asWUFqSsD2OrDNhbAHuSjobrmsUmdzjJjEeZVU9g1h3O6vpstnw==", + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-2.8.1.tgz", + "integrity": "sha512-Ao/f6d/4EPLq0YwzsQz8iXflezpTkQzqAyenTiw4kCUGr1uPiFLC3+fZ+gMZz6eeI/qdRUqvC+HxIJzUAzEFdg==", "dev": true }, "uglify-js": { diff --git a/package.json b/package.json index 4b716bb986..7458ecfe1f 100644 --- a/package.json +++ b/package.json @@ -117,7 +117,7 @@ "prepublish": "shx rm -rf ./typings && npm run build_all", "publish_docs": "./publish_docs.sh", "test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html", - "test": "cross-env TS_NODE_FAST=true mocha --require ts-node/register --opts spec-build/support/coverage.opts \"spec-build/**/*-spec.ts\"", + "test": "mocha --require ts-node/register --opts spec-build/support/coverage.opts \"spec-build/**/*-spec.ts\"", "test:cover": "cross-env TS_NODE_FAST=true nyc npm test", "test:circular": "dependency-cruise --validate .dependency-cruiser.json -x \"^node_modules\" src", "test:systemjs": "node integration/systemjs/systemjs-compatibility-spec.js", diff --git a/spec/observables/dom/webSocket-spec.ts b/spec/observables/dom/webSocket-spec.ts index 515fd7dafa..9221fee91f 100644 --- a/spec/observables/dom/webSocket-spec.ts +++ b/spec/observables/dom/webSocket-spec.ts @@ -1,17 +1,15 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import * as Rx from 'rxjs/Rx'; +import { websocket } from 'rxjs/websocket'; +import { map, retry, take, repeat } from 'rxjs/operators'; declare const __root__: any; -const Observable = Rx.Observable; - /** @test {webSocket} */ -describe('Observable.webSocket', () => { +describe('websocket', () => { let __ws: any; function setupMockWebSocket() { - MockWebSocket.clearSockets(); __ws = __root__.WebSocket; __root__.WebSocket = MockWebSocket; } @@ -21,216 +19,204 @@ describe('Observable.webSocket', () => { MockWebSocket.clearSockets(); } - beforeEach(() => { - setupMockWebSocket(); - }); + describe('basic behavior', () => { + beforeEach(() => { + setupMockWebSocket(); + }); - afterEach(() => { - teardownMockWebSocket(); - }); + afterEach(() => { + teardownMockWebSocket(); + }); - it('should send and receive messages', () => { - let messageReceived = false; - const subject = Observable.webSocket('ws://mysocket'); + it('should send and receive messages', () => { + let messageReceived = false; + const subject = websocket('ws://mysocket'); - subject.next('ping'); + subject.next('ping'); - subject.subscribe((x: string) => { - expect(x).to.equal('pong'); - messageReceived = true; - }); + subject.subscribe(x => { + expect(x).to.equal('pong'); + messageReceived = true; + }); - const socket = MockWebSocket.lastSocket; - expect(socket.url).to.equal('ws://mysocket'); + const socket = MockWebSocket.lastSocket; + expect(socket.url).to.equal('ws://mysocket'); - socket.open(); - expect(socket.lastMessageSent).to.equal('ping'); + socket.open(); + expect(socket.lastMessageSent).to.equal(JSON.stringify('ping')); - socket.triggerMessage(JSON.stringify('pong')); - expect(messageReceived).to.be.true; + socket.triggerMessage(JSON.stringify('pong')); + expect(messageReceived).to.be.true; - subject.unsubscribe(); - }); + subject.unsubscribe(); + }); - it('should allow the user to chain operators', () => { - let messageReceived = false; - const subject = Observable.webSocket('ws://mysocket'); + it('receive multiple messages', () => { + const expected = ['what', 'do', 'you', 'do', 'with', 'a', 'drunken', 'sailor?']; + const results: string[] = []; + const subject = websocket('ws://mysocket'); - subject - .map(x => x + '?') - .map(x => x + '!') - .map(x => x + '!') - .subscribe((x: string) => { - expect(x).to.equal('pong?!!'); - messageReceived = true; + subject.subscribe(x => { + results.push(x); }); - const socket = MockWebSocket.lastSocket; - - socket.open(); + const socket = MockWebSocket.lastSocket; - socket.triggerMessage(JSON.stringify('pong')); - expect(messageReceived).to.be.true; + socket.open(); - subject.unsubscribe(); - }); + expected.forEach(x => { + socket.triggerMessage(JSON.stringify(x)); + }); - it('receive multiple messages', () => { - const expected = ['what', 'do', 'you', 'do', 'with', 'a', 'drunken', 'sailor?']; - const results = []; - const subject = Observable.webSocket('ws://mysocket'); + expect(results).to.deep.equal(expected); - subject.subscribe((x: string) => { - results.push(x); + subject.unsubscribe(); }); - const socket = MockWebSocket.lastSocket; + it('should queue messages prior to subscription', () => { + const expected = ['make', 'him', 'walk', 'the', 'plank']; + const subject = websocket('ws://mysocket'); - socket.open(); + expected.forEach(x => { + subject.next(x); + }); - expected.forEach((x: string) => { - socket.triggerMessage(JSON.stringify(x)); - }); + let socket = MockWebSocket.lastSocket; + expect(socket).not.exist; - expect(results).to.deep.equal(expected); + subject.subscribe(); - subject.unsubscribe(); - }); + socket = MockWebSocket.lastSocket; + expect(socket.sent.length).to.equal(0); - it('should queue messages prior to subscription', () => { - const expected = ['make', 'him', 'walk', 'the', 'plank']; - const subject = Observable.webSocket('ws://mysocket'); + socket.open(); + expect(socket.sent.length).to.equal(expected.length); - expected.forEach((x: string) => { - subject.next(x); + subject.unsubscribe(); }); - let socket = MockWebSocket.lastSocket; - expect(socket).not.exist; + it('should send messages immediately if already open', () => { + const subject = websocket('ws://mysocket'); + subject.subscribe(); + const socket = MockWebSocket.lastSocket; + socket.open(); - subject.subscribe(); + subject.next('avast!'); + expect(socket.lastMessageSent).to.equal(JSON.stringify('avast!')); + subject.next('ye swab!'); + expect(socket.lastMessageSent).to.equal(JSON.stringify('ye swab!')); - socket = MockWebSocket.lastSocket; - expect(socket.sent.length).to.equal(0); + subject.unsubscribe(); + }); - socket.open(); - expect(socket.sent.length).to.equal(expected.length); + it('should close the socket when completed', () => { + const subject = websocket('ws://mysocket'); + subject.subscribe(); + const socket = MockWebSocket.lastSocket; + socket.open(); - subject.unsubscribe(); - }); + expect(socket.readyState).to.equal(1); // open - it('should send messages immediately if already open', () => { - const subject = Observable.webSocket('ws://mysocket'); - subject.subscribe(); - const socket = MockWebSocket.lastSocket; - socket.open(); + sinon.spy(socket, 'close'); - subject.next('avast!'); - expect(socket.lastMessageSent).to.equal('avast!'); - subject.next('ye swab!'); - expect(socket.lastMessageSent).to.equal('ye swab!'); + expect(socket.close).not.have.been.called; - subject.unsubscribe(); - }); + subject.complete(); + expect(socket.close).have.been.called; + expect(socket.readyState).to.equal(3); // closed - it('should close the socket when completed', () => { - const subject = Observable.webSocket('ws://mysocket'); - subject.subscribe(); - const socket = MockWebSocket.lastSocket; - socket.open(); + subject.unsubscribe(); + (socket.close).restore(); + }); - expect(socket.readyState).to.equal(1); // open + it('should close the socket with a code and a reason when errored', () => { + const subject = websocket('ws://mysocket'); + subject.subscribe(); + const socket = MockWebSocket.lastSocket; + socket.open(); - sinon.spy(socket, 'close'); + sinon.spy(socket, 'close'); + expect(socket.close).not.have.been.called; - expect(socket.close).not.have.been.called; + subject.error({ code: 1337, reason: 'Too bad, so sad :('}); + expect(socket.close).have.been.calledWith(1337, 'Too bad, so sad :('); - subject.complete(); - expect(socket.close).have.been.called; - expect(socket.readyState).to.equal(3); // closed + subject.unsubscribe(); + (socket.close).restore(); + }); - subject.unsubscribe(); - (socket.close).restore(); - }); + it('should allow resubscription after closure via complete', () => { + const subject = websocket('ws://mysocket'); + subject.subscribe(); + const socket1 = MockWebSocket.lastSocket; + socket1.open(); + subject.complete(); - it('should close the socket with a code and a reason when errored', () => { - const subject = Observable.webSocket('ws://mysocket'); - subject.subscribe(); - const socket = MockWebSocket.lastSocket; - socket.open(); + subject.next('a mariner yer not. yarrr.'); + subject.subscribe(); + const socket2 = MockWebSocket.lastSocket; + socket2.open(); - sinon.spy(socket, 'close'); - expect(socket.close).not.have.been.called; + expect(socket2).not.to.equal(socket1); + expect(socket2.lastMessageSent).to.equal(JSON.stringify('a mariner yer not. yarrr.')); - subject.error({ code: 1337, reason: 'Too bad, so sad :('}); - expect(socket.close).have.been.calledWith(1337, 'Too bad, so sad :('); + subject.unsubscribe(); + }); - subject.unsubscribe(); - (socket.close).restore(); - }); + it('should allow resubscription after closure via error', () => { + const subject = websocket('ws://mysocket'); + subject.subscribe(); + const socket1 = MockWebSocket.lastSocket; + socket1.open(); + subject.error({ code: 1337 }); - it('should allow resubscription after closure via complete', () => { - const subject = Observable.webSocket('ws://mysocket'); - subject.subscribe(); - const socket1 = MockWebSocket.lastSocket; - socket1.open(); - subject.complete(); + subject.next('yo-ho! yo-ho!'); + subject.subscribe(); + const socket2 = MockWebSocket.lastSocket; + socket2.open(); - subject.next('a mariner yer not. yarrr.'); - subject.subscribe(); - const socket2 = MockWebSocket.lastSocket; - socket2.open(); + expect(socket2).not.to.equal(socket1); + expect(socket2.lastMessageSent).to.equal(JSON.stringify('yo-ho! yo-ho!')); - expect(socket2).not.to.equal(socket1); - expect(socket2.lastMessageSent).to.equal('a mariner yer not. yarrr.'); + subject.unsubscribe(); + }); - subject.unsubscribe(); - }); + it('should have a default resultSelector that parses message data as JSON', () => { + let result; + const expected = { mork: 'shazbot!' }; + const subject = websocket('ws://mysocket'); - it('should allow resubscription after closure via error', () => { - const subject = Observable.webSocket('ws://mysocket'); - subject.subscribe(); - const socket1 = MockWebSocket.lastSocket; - socket1.open(); - subject.error({ code: 1337 }); + subject.subscribe((x: any) => { + result = x; + }); - subject.next('yo-ho! yo-ho!'); - subject.subscribe(); - const socket2 = MockWebSocket.lastSocket; - socket2.open(); + const socket = MockWebSocket.lastSocket; + socket.open(); + socket.triggerMessage(JSON.stringify(expected)); - expect(socket2).not.to.equal(socket1); - expect(socket2.lastMessageSent).to.equal('yo-ho! yo-ho!'); + expect(result).to.deep.equal(expected); - subject.unsubscribe(); + subject.unsubscribe(); + }); }); - it('should have a default resultSelector that parses message data as JSON', () => { - let result; - const expected = { mork: 'shazbot!' }; - const subject = Observable.webSocket('ws://mysocket'); + describe('with a config object', () => { - subject.subscribe((x: any) => { - result = x; + beforeEach(() => { + setupMockWebSocket(); }); - const socket = MockWebSocket.lastSocket; - socket.open(); - socket.triggerMessage(JSON.stringify(expected)); - - expect(result).to.deep.equal(expected); - - subject.unsubscribe(); - }); + afterEach(() => { + teardownMockWebSocket(); + }); - describe('with a config object', () => { it('should send and receive messages', () => { let messageReceived = false; - const subject = Observable.webSocket({ url: 'ws://mysocket' }); + const subject = websocket({ url: 'ws://mysocket' }); subject.next('ping'); - subject.subscribe((x: string) => { + subject.subscribe(x => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -239,7 +225,7 @@ describe('Observable.webSocket', () => { expect(socket.url).to.equal('ws://mysocket'); socket.open(); - expect(socket.lastMessageSent).to.equal('ping'); + expect(socket.lastMessageSent).to.equal(JSON.stringify('ping')); socket.triggerMessage(JSON.stringify('pong')); expect(messageReceived).to.be.true; @@ -248,7 +234,7 @@ describe('Observable.webSocket', () => { }); it('should take a protocol and set it properly on the web socket', () => { - const subject = Observable.webSocket({ + const subject = websocket({ url: 'ws://mysocket', protocol: 'someprotocol' }); @@ -262,7 +248,7 @@ describe('Observable.webSocket', () => { }); it('should take a binaryType and set it properly on the web socket', () => { - const subject = Observable.webSocket({ + const subject = websocket({ url: 'ws://mysocket', binaryType: 'blob' }); @@ -275,12 +261,12 @@ describe('Observable.webSocket', () => { subject.unsubscribe(); }); - it('should take a resultSelector', () => { - const results = []; + it('should take a deserializer', () => { + const results = [] as string[]; - const subject = Observable.webSocket({ + const subject = websocket({ url: 'ws://mysocket', - resultSelector: (e: any) => { + deserializer: (e: any) => { return e.data + '!'; } }); @@ -300,10 +286,10 @@ describe('Observable.webSocket', () => { subject.unsubscribe(); }); - it('if the resultSelector fails it should go down the error path', () => { - const subject = Observable.webSocket({ + it('if the deserializer fails it should go down the error path', () => { + const subject = websocket({ url: 'ws://mysocket', - resultSelector: (e: any) => { + deserializer: (e: any) => { throw new Error('I am a bad error'); } }); @@ -323,10 +309,10 @@ describe('Observable.webSocket', () => { it('should accept a closingObserver', () => { let calls = 0; - const subject = Observable.webSocket({ + const subject = websocket({ url: 'ws://mysocket', closingObserver: { - next: function (x) { + next(x: any) { calls++; expect(x).to.be.an('undefined'); } @@ -354,11 +340,11 @@ describe('Observable.webSocket', () => { it('should accept a closeObserver', () => { const expected = [{ wasClean: true }, { wasClean: false }]; - const closes = []; - const subject = Observable.webSocket({ + const closes = [] as any[]; + const subject = websocket({ url: 'ws://mysocket', closeObserver: { - next: function (e) { + next(e: any) { closes.push(e); } } @@ -390,7 +376,7 @@ describe('Observable.webSocket', () => { }); it('should handle constructor errors', () => { - const subject = Observable.webSocket({ + const subject = websocket({ url: 'bad_url', WebSocketCtor: (url: string, protocol?: string | string[]): WebSocket => { throw new Error(`connection refused`); @@ -408,36 +394,43 @@ describe('Observable.webSocket', () => { }); describe('multiplex', () => { + + beforeEach(() => { + setupMockWebSocket(); + }); + + afterEach(() => { + teardownMockWebSocket(); + }); + it('should be retryable', () => { - const results = []; - const subject = Observable.webSocket('ws://websocket'); - const source = subject.multiplex(() => { - return { sub: 'foo'}; - }, () => { - return { unsub: 'foo' }; - }, function (value: any) { - return value.name === 'foo'; + const results = [] as string[]; + const subject = websocket<{ name: string, value: string }>('ws://websocket'); + const source = subject.multiplex( + () => ({ sub: 'foo' }), + () => ({ unsub: 'foo' }), + value => value.name === 'foo' + ); + + source.pipe( + retry(1), + map(x => x.value), + take(2), + ).subscribe(x => { + results.push(x); }); - source - .retry(1) - .map((x: any) => x.value) - .take(2) - .subscribe((x: any) => { - results.push(x); - }); - const socket = MockWebSocket.lastSocket; socket.open(); - expect(socket.lastMessageSent).to.deep.equal({ sub: 'foo' }); + expect(socket.lastMessageSent).to.deep.equal(JSON.stringify({ sub: 'foo' })); socket.triggerClose({ wasClean: false }); // Bad connection const socket2 = MockWebSocket.lastSocket; expect(socket2).not.to.equal(socket); socket2.open(); - expect(socket2.lastMessageSent).to.deep.equal({ sub: 'foo' }); + expect(socket2.lastMessageSent).to.deep.equal(JSON.stringify({ sub: 'foo' })); socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'test' })); socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'this' })); @@ -446,27 +439,27 @@ describe('Observable.webSocket', () => { }); it('should be repeatable', () => { - const results = []; - const subject = Observable.webSocket('ws://websocket'); - const source = subject.multiplex(() => { - return { sub: 'foo'}; - }, () => { - return { unsub: 'foo' }; - }, function (value: any) { - return value.name === 'foo'; - }); + const results = [] as string[]; + const subject = websocket<{ name: string, value: string }>('ws://websocket'); + const source = subject.multiplex( + () => ({ sub: 'foo' }), + () => ({ unsub: 'foo' }), + value => value.name === 'foo' + ); source - .repeat(2) - .map((x: any) => x.value) - .subscribe((x: any) => { + .pipe( + repeat(2), + map(x => x.value) + ) + .subscribe(x => { results.push(x); }); const socket = MockWebSocket.lastSocket; socket.open(); - expect(socket.lastMessageSent).to.deep.equal({ sub: 'foo' }, 'first multiplexed sub'); + expect(socket.lastMessageSent).to.deep.equal(JSON.stringify({ sub: 'foo' }), 'first multiplexed sub'); socket.triggerMessage(JSON.stringify({ name: 'foo', value: 'test' })); socket.triggerMessage(JSON.stringify({ name: 'foo', value: 'this' })); socket.triggerClose({ wasClean: true }); @@ -475,7 +468,7 @@ describe('Observable.webSocket', () => { expect(socket2).not.to.equal(socket, 'a new socket was not created'); socket2.open(); - expect(socket2.lastMessageSent).to.deep.equal({ sub: 'foo' }, 'second multiplexed sub'); + expect(socket2.lastMessageSent).to.deep.equal(JSON.stringify({ sub: 'foo' }), 'second multiplexed sub'); socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'test' })); socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'this' })); socket2.triggerClose({ wasClean: true }); @@ -484,15 +477,13 @@ describe('Observable.webSocket', () => { }); it('should multiplex over the websocket', () => { - const results = []; - const subject = Observable.webSocket('ws://websocket'); - const source = subject.multiplex(() => { - return { sub: 'foo'}; - }, () => { - return { unsub: 'foo' }; - }, function (value: any) { - return value.name === 'foo'; - }); + const results = [] as Array<{ value: number, name: string }>; + const subject = websocket<{ value: number, name: string }>('ws://websocket'); + const source = subject.multiplex( + () => ({ sub: 'foo'}), + () => ({ unsub: 'foo' }), + value => value.name === 'foo' + ); const sub = source.subscribe(function (x: any) { results.push(x.value); @@ -500,7 +491,7 @@ describe('Observable.webSocket', () => { const socket = MockWebSocket.lastSocket; socket.open(); - expect(socket.lastMessageSent).to.deep.equal({ sub: 'foo' }); + expect(socket.lastMessageSent).to.deep.equal(JSON.stringify({ sub: 'foo' })); [1, 2, 3, 4, 5].map((x: number) => { return { @@ -515,15 +506,15 @@ describe('Observable.webSocket', () => { sinon.spy(socket, 'close'); sub.unsubscribe(); - expect(socket.lastMessageSent).to.deep.equal({ unsub: 'foo' }); + expect(socket.lastMessageSent).to.deep.equal(JSON.stringify({ unsub: 'foo' })); expect(socket.close).have.been.called; (socket.close).restore(); }); it('should keep the same socket for multiple multiplex subscriptions', () => { - const socketSubject = Rx.Observable.webSocket({url: 'ws://mysocket'}); - const results = []; + const socketSubject = websocket({url: 'ws://mysocket'}); + const results = [] as string[]; const socketMessages = [ {id: 'A'}, {id: 'B'}, @@ -559,7 +550,7 @@ describe('Observable.webSocket', () => { socketMessages.forEach((msg, i) => { if (i === 1) { sub1.unsubscribe(); - expect(socketSubject.socket).to.equal(socket); + expect((socketSubject as any)._socket).to.equal(socket); } socket.triggerMessage(JSON.stringify(msg)); }); @@ -577,8 +568,8 @@ describe('Observable.webSocket', () => { }); it('should not close the socket until all subscriptions complete', () => { - const socketSubject = Rx.Observable.webSocket({url: 'ws://mysocket'}); - const results = []; + const socketSubject = websocket<{ id: string, complete: boolean }>({url: 'ws://mysocket'}); + const results = [] as string[]; const socketMessages = [ {id: 'A'}, {id: 'B'}, @@ -590,8 +581,8 @@ describe('Observable.webSocket', () => { socketSubject.multiplex( () => 'no-op', () => results.push('A unsub'), - (req: any) => req.id === 'A') - .takeWhile((req: any) => !req.complete) + req => req.id === 'A') + .takeWhile(req => !req.complete) .subscribe( () => results.push('A next'), (e) => results.push('A error ' + e), @@ -601,8 +592,8 @@ describe('Observable.webSocket', () => { socketSubject.multiplex( () => 'no-op', () => results.push('B unsub'), - (req: any) => req.id === 'B') - .takeWhile((req: any) => !req.complete) + req => req.id === 'B') + .takeWhile(req => !req.complete) .subscribe( () => results.push('B next'), (e) => results.push('B error ' + e), @@ -641,7 +632,7 @@ class MockWebSocket { MockWebSocket.sockets.length = 0; } - sent: Array = []; + sent: string[] = []; handlers: any = {}; readyState: number = 0; closeCode: any; @@ -652,11 +643,11 @@ class MockWebSocket { MockWebSocket.sockets.push(this); } - send(data: any): void { + send(data: string): void { this.sent.push(data); } - get lastMessageSent(): any { + get lastMessageSent(): string { const sent = this.sent; const length = sent.length; @@ -672,7 +663,7 @@ class MockWebSocket { const messageEvent = { data: data, origin: 'mockorigin', - ports: undefined, + ports: undefined as any, source: __root__, }; diff --git a/src/internal/observable/dom/WebSocketSubject.ts b/src/internal/observable/dom/WebSocketSubject.ts index 240a64d352..76a015dd17 100644 --- a/src/internal/observable/dom/WebSocketSubject.ts +++ b/src/internal/observable/dom/WebSocketSubject.ts @@ -8,17 +8,57 @@ import { Observer, NextObserver } from '../../types'; import { tryCatch } from '../../util/tryCatch'; import { errorObject } from '../../util/errorObject'; -export interface WebSocketSubjectConfig { +export interface WebSocketSubjectConfig { + /** The url of the socket server to connect to */ url: string; + /** The protocol to use to connect */ protocol?: string | Array; - resultSelector?: (e: MessageEvent) => T; + /** @deprecated use {@link deserializer} */ + resultSelector?: (e: MessageEvent) => T; + /** + * A serializer used for messages arriving on the over the socket from the + * server. Defaults to JSON.parse. + */ + serializer?: (value: T) => WebSocketMessage; + /** + * A deserializer used to create messages from passed values before the + * messages are sent to the server. Defaults to JSON.stringify + */ + deserializer?: (e: MessageEvent) => T; + /** + * An Observer that watches when open events occur on the underlying web socket. + */ openObserver?: NextObserver; + /** + * An Observer than watches when close events occur on the underlying websocket + */ closeObserver?: NextObserver; + /** + * An Observer that watches when a close is about to occur due to + * unsubscription. + */ closingObserver?: NextObserver; - WebSocketCtor?: { new(url: string, protocol?: string|Array): WebSocket }; + /** + * A WebSocket constructor to use. This is useful for situations like using a + * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket + * for testing purposes + */ + WebSocketCtor?: { new(url: string, protocols?: string|string[]): WebSocket }; + /** Sets the `binaryType` property of the underlying WebSocket. */ binaryType?: 'blob' | 'arraybuffer'; } +const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig = { + url: '', + deserializer: (e: MessageEvent) => JSON.parse(e.data), + serializer: (value: any) => JSON.stringify(value), +}; + +const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = + 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }'; + +export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; + /** * We need this JSDoc comment for affecting ESDoc. * @extends {Ignored} @@ -26,81 +66,31 @@ export interface WebSocketSubjectConfig { */ export class WebSocketSubject extends AnonymousSubject { - url: string; - protocol: string|Array; - socket: WebSocket; - openObserver: NextObserver; - closeObserver: NextObserver; - closingObserver: NextObserver; - WebSocketCtor: { new(url: string, protocol?: string|Array): WebSocket }; - binaryType?: 'blob' | 'arraybuffer'; + private _config: WebSocketSubjectConfig; - private _output: Subject; - - resultSelector(e: MessageEvent) { - return JSON.parse(e.data); - } + protected _output: Subject; - /** - * Wrapper around the w3c-compatible WebSocket object provided by the browser. - * - * @example Wraps browser WebSocket - * - * let socket$ = Observable.webSocket('ws://localhost:8081'); - * - * socket$.subscribe( - * (msg) => console.log('message received: ' + msg), - * (err) => console.log(err), - * () => console.log('complete') - * ); - * - * socket$.next(JSON.stringify({ op: 'hello' })); - * - * @example Wraps WebSocket from nodejs-websocket (using node.js) - * - * import { w3cwebsocket } from 'websocket'; - * - * let socket$ = Observable.webSocket({ - * url: 'ws://localhost:8081', - * WebSocketCtor: w3cwebsocket - * }); - * - * socket$.subscribe( - * (msg) => console.log('message received: ' + msg), - * (err) => console.log(err), - * () => console.log('complete') - * ); - * - * socket$.next(JSON.stringify({ op: 'hello' })); - * - * @param {string | WebSocketSubjectConfig} urlConfigOrSource the source of the websocket as an url or a structure defining the websocket object - * @return {WebSocketSubject} - * @static true - * @name webSocket - * @owner Observable - */ - static create(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { - return new WebSocketSubject(urlConfigOrSource); - } + private _socket: WebSocket; - constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable, destination?: Observer) { + constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable, destination?: Observer) { + super(); if (urlConfigOrSource instanceof Observable) { - super(destination, > urlConfigOrSource); + this.destination = destination; + this.source = urlConfigOrSource as Observable; } else { - super(); - this.WebSocketCtor = WebSocket; + const config = this._config = { ...DEFAULT_WEBSOCKET_CONFIG }; + config.WebSocketCtor = WebSocket; this._output = new Subject(); if (typeof urlConfigOrSource === 'string') { - this.url = urlConfigOrSource; + config.url = urlConfigOrSource; } else { - // WARNING: config object could override important members here. for (let key in urlConfigOrSource) { if (urlConfigOrSource.hasOwnProperty(key)) { - this[key] = urlConfigOrSource[key]; + config[key] = urlConfigOrSource[key]; } } } - if (!this.WebSocketCtor) { + if (!config.WebSocketCtor) { throw new Error('no WebSocket constructor can be found'); } this.destination = new ReplaySubject(); @@ -108,20 +98,37 @@ export class WebSocketSubject extends AnonymousSubject { } lift(operator: Operator): WebSocketSubject { - const sock = new WebSocketSubject(this, this.destination); + const sock = new WebSocketSubject(this._config as WebSocketSubjectConfig, this.destination); sock.operator = operator; return sock; } private _resetState() { - this.socket = null; + this._socket = null; if (!this.source) { this.destination = new ReplaySubject(); } this._output = new Subject(); } - // TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures + /** + * Creates an {@link Observable}, that when subscribed to, sends a message, + * defined be the `subMsg` function, to the server over the socket to begin a + * subscription to data over that socket. Once data arrives, the + * `messageFilter` argument will be used to select the appropriate data for + * the resulting Observable. When teardown occurs, either due to + * unsubscription, completion or error, a message defined by the `unsubMsg` + * argument will be send to the server over the WebSocketSubject. + * + * @param subMsg A function to generate the subscription message to be sent to + * the server. This will still be processed by the serializer in the + * WebSocketSubject's config. (Which defaults to JSON serialization) + * @param unsubMsg A function to generate the unsubscription message to be + * sent to the server at teardown. This will still be processed by the + * serializer in the WebSocketSubject's config. + * @param messageFilter A predicate for selecting the appropriate messages + * from the server for the output stream. + */ multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { const self = this; return new Observable((observer: Observer) => { @@ -156,17 +163,17 @@ export class WebSocketSubject extends AnonymousSubject { } private _connectSocket() { - const { WebSocketCtor } = this; + const { WebSocketCtor, protocol, url, binaryType } = this._config; const observer = this._output; let socket: WebSocket = null; try { - socket = this.protocol ? - new WebSocketCtor(this.url, this.protocol) : - new WebSocketCtor(this.url); - this.socket = socket; - if (this.binaryType) { - this.socket.binaryType = this.binaryType; + socket = protocol ? + new WebSocketCtor(url, protocol) : + new WebSocketCtor(url); + this._socket = socket; + if (binaryType) { + this._socket.binaryType = binaryType; } } catch (e) { observer.error(e); @@ -174,44 +181,53 @@ export class WebSocketSubject extends AnonymousSubject { } const subscription = new Subscription(() => { - this.socket = null; + this._socket = null; if (socket && socket.readyState === 1) { socket.close(); } }); socket.onopen = (e: Event) => { - const openObserver = this.openObserver; + const { openObserver } = this._config; if (openObserver) { openObserver.next(e); } const queue = this.destination; - this.destination = Subscriber.create( - (x) => socket.readyState === 1 && socket.send(x), + this.destination = Subscriber.create( + (x) => { + if (socket.readyState === 1) { + const { serializer } = this._config; + const msg = tryCatch(serializer)(x); + if (msg === errorObject) { + this.destination.error(errorObject.e); + return; + } + socket.send(msg); + } + }, (e) => { - const closingObserver = this.closingObserver; + const { closingObserver } = this._config; if (closingObserver) { closingObserver.next(undefined); } if (e && e.code) { socket.close(e.code, e.reason); } else { - observer.error(new TypeError('WebSocketSubject.error must be called with an object with an error code, ' + - 'and an optional reason: { code: number, reason: string }')); + observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); } this._resetState(); }, - ( ) => { - const closingObserver = this.closingObserver; + () => { + const { closingObserver } = this._config; if (closingObserver) { closingObserver.next(undefined); } socket.close(); this._resetState(); } - ); + ) as Subscriber; if (queue && queue instanceof ReplaySubject) { subscription.add((>queue).subscribe(this.destination)); @@ -225,7 +241,7 @@ export class WebSocketSubject extends AnonymousSubject { socket.onclose = (e: CloseEvent) => { this._resetState(); - const closeObserver = this.closeObserver; + const { closeObserver } = this._config; if (closeObserver) { closeObserver.next(e); } @@ -237,7 +253,8 @@ export class WebSocketSubject extends AnonymousSubject { }; socket.onmessage = (e: MessageEvent) => { - const result = tryCatch(this.resultSelector)(e); + const { deserializer } = this._config; + const result = tryCatch(deserializer)(e); if (result === errorObject) { observer.error(errorObject.e); } else { @@ -251,16 +268,16 @@ export class WebSocketSubject extends AnonymousSubject { if (source) { return source.subscribe(subscriber); } - if (!this.socket) { + if (!this._socket) { this._connectSocket(); } let subscription = new Subscription(); subscription.add(this._output.subscribe(subscriber)); subscription.add(() => { - const { socket } = this; + const { _socket } = this; if (this._output.observers.length === 0) { - if (socket && socket.readyState === 1) { - socket.close(); + if (_socket && _socket.readyState === 1) { + _socket.close(); } this._resetState(); } @@ -269,9 +286,9 @@ export class WebSocketSubject extends AnonymousSubject { } unsubscribe() { - const { source, socket } = this; - if (socket && socket.readyState === 1) { - socket.close(); + const { source, _socket } = this; + if (_socket && _socket.readyState === 1) { + _socket.close(); this._resetState(); } super.unsubscribe(); diff --git a/src/internal/observable/dom/webSocket.ts b/src/internal/observable/dom/webSocket.ts index 4680fa8f23..6b991e7b27 100644 --- a/src/internal/observable/dom/webSocket.ts +++ b/src/internal/observable/dom/webSocket.ts @@ -1,3 +1,43 @@ -import { WebSocketSubject } from './WebSocketSubject'; +import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject'; -export const webSocket = WebSocketSubject.create; \ No newline at end of file +/** + * Wrapper around the w3c-compatible WebSocket object provided by the browser. + * + * @example Wraps browser WebSocket + * + * import { webSocket } from 'rxjs/websocket'; + * + * let socket$ = webSocket('ws://localhost:8081'); + * + * socket$.subscribe( + * (msg) => console.log('message received: ' + msg), + * (err) => console.log(err), + * () => console.log('complete') + * ); + * + * socket$.next(JSON.stringify({ op: 'hello' })); + * + * @example Wraps WebSocket from nodejs-websocket (using node.js) + * + * import { webSocket } from 'rxjs/websocket'; + * import { w3cwebsocket } from 'websocket'; + * + * let socket$ = webSocket({ + * url: 'ws://localhost:8081', + * WebSocketCtor: w3cwebsocket + * }); + * + * socket$.subscribe( + * (msg) => console.log('message received: ' + msg), + * (err) => console.log(err), + * () => console.log('complete') + * ); + * + * socket$.next(JSON.stringify({ op: 'hello' })); + * + * @param {string | WebSocketSubjectConfig} urlConfigOrSource the source of the websocket as an url or a structure defining the websocket object + * @return {WebSocketSubject} + */ +export function webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { + return new WebSocketSubject(urlConfigOrSource); +}