diff --git a/package-lock.json b/package-lock.json index 7ee1e9c271..a51c1b9f75 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "@reactivex/rxjs", - "version": "6.0.0-turbo-rc.4", + "version": "6.0.0-uncanny-rc.7", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/spec/observables/dom/webSocket-spec.ts b/spec/observables/dom/webSocket-spec.ts index 9221fee91f..c588d1a95b 100644 --- a/spec/observables/dom/webSocket-spec.ts +++ b/spec/observables/dom/webSocket-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { websocket } from 'rxjs/websocket'; -import { map, retry, take, repeat } from 'rxjs/operators'; +import { map, retry, take, repeat, takeWhile } from 'rxjs/operators'; declare const __root__: any; @@ -51,6 +51,20 @@ describe('websocket', () => { subject.unsubscribe(); }); + it('should allow use of operators and subscribe', () => { + const subject = websocket('ws://mysocket'); + const results: any[] = []; + + subject.pipe( + map(x => x + '!'), + ) + .subscribe(x => results.push(x)); + + MockWebSocket.lastSocket.triggerMessage(JSON.stringify('ngconf 2018 bug')); + + expect(results).to.deep.equal(['ngconf 2018 bug!']); + }); + it('receive multiple messages', () => { const expected = ['what', 'do', 'you', 'do', 'with', 'a', 'drunken', 'sailor?']; const results: string[] = []; @@ -526,13 +540,15 @@ describe('websocket', () => { const sub1 = socketSubject.multiplex( () => 'no-op', () => results.push('A unsub'), - (req: any) => req.id === 'A') - .takeWhile((req: any) => !req.complete) - .subscribe( - () => results.push('A next'), - (e) => results.push('A error ' + e), - () => results.push('A complete') - ); + (req: any) => req.id === 'A' + ).pipe( + takeWhile((req: any) => !req.complete) + ) + .subscribe( + () => results.push('A next'), + (e) => results.push('A error ' + e), + () => results.push('A complete') + ); socketSubject.multiplex( () => 'no-op', @@ -581,24 +597,26 @@ describe('websocket', () => { socketSubject.multiplex( () => 'no-op', () => results.push('A unsub'), - req => req.id === 'A') - .takeWhile(req => !req.complete) - .subscribe( - () => results.push('A next'), - (e) => results.push('A error ' + e), - () => results.push('A complete') - ); + req => req.id === 'A' + ).pipe( + takeWhile(req => !req.complete) + ).subscribe( + () => results.push('A next'), + (e) => results.push('A error ' + e), + () => results.push('A complete') + ); socketSubject.multiplex( () => 'no-op', () => results.push('B unsub'), - req => req.id === 'B') - .takeWhile(req => !req.complete) - .subscribe( - () => results.push('B next'), - (e) => results.push('B error ' + e), - () => results.push('B complete') - ); + req => req.id === 'B' + ).pipe( + takeWhile(req => !req.complete) + ).subscribe( + () => results.push('B next'), + (e) => results.push('B error ' + e), + () => results.push('B complete') + ); // Setup socket and send messages let socket = MockWebSocket.lastSocket; diff --git a/src/internal/observable/dom/WebSocketSubject.ts b/src/internal/observable/dom/WebSocketSubject.ts index ee68790e17..2fe5cb0c67 100644 --- a/src/internal/observable/dom/WebSocketSubject.ts +++ b/src/internal/observable/dom/WebSocketSubject.ts @@ -101,6 +101,7 @@ export class WebSocketSubject extends AnonymousSubject { lift(operator: Operator): WebSocketSubject { const sock = new WebSocketSubject(this._config as WebSocketSubjectConfig, this.destination); sock.operator = operator; + sock.source = this; return sock; }