From cd8cdd0cbad37a91364e406229f9205d636b46e7 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Thu, 30 Jun 2016 14:59:37 -0700 Subject: [PATCH] fix(WebSocketSubject): respect WebSockeCtor, support source/destination arguments in constructor. (#1790) Fixes #1745 and #1784 --- src/observable/dom/WebSocketSubject.ts | 52 ++++++++++++++++++-------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/src/observable/dom/WebSocketSubject.ts b/src/observable/dom/WebSocketSubject.ts index 293b5b4681..819f6274d9 100644 --- a/src/observable/dom/WebSocketSubject.ts +++ b/src/observable/dom/WebSocketSubject.ts @@ -2,6 +2,7 @@ import {Subject, AnonymousSubject} from '../../Subject'; import {Subscriber} from '../../Subscriber'; import {Observable} from '../../Observable'; import {Subscription} from '../../Subscription'; +import {Operator} from '../../Operator'; import {root} from '../../util/root'; import {ReplaySubject} from '../../ReplaySubject'; import {Observer, NextObserver} from '../../Observer'; @@ -25,6 +26,7 @@ export interface WebSocketSubjectConfig { * @hide true */ export class WebSocketSubject extends AnonymousSubject { + url: string; protocol: string|Array; socket: WebSocket; @@ -32,7 +34,8 @@ export class WebSocketSubject extends AnonymousSubject { closeObserver: NextObserver; closingObserver: NextObserver; WebSocketCtor: { new(url: string, protocol?: string|Array): WebSocket }; - private _output: Subject = new Subject(); + + private _output: Subject; resultSelector(e: MessageEvent) { return JSON.parse(e.data); @@ -50,21 +53,29 @@ export class WebSocketSubject extends AnonymousSubject { } constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable, destination?: Observer) { - super(); - this.WebSocketCtor = root.WebSocket; - - if (typeof urlConfigOrSource === 'string') { - this.url = urlConfigOrSource; + if (urlConfigOrSource instanceof Observable) { + super(destination, > urlConfigOrSource); } else { - // WARNING: config object could override important members here. - assign(this, urlConfigOrSource); - } - - if (!this.WebSocketCtor) { - throw new Error('no WebSocket constructor can be found'); + super(); + this.WebSocketCtor = root.WebSocket; + this._output = new Subject(); + if (typeof urlConfigOrSource === 'string') { + this.url = urlConfigOrSource; + } else { + // WARNING: config object could override important members here. + assign(this, urlConfigOrSource); + } + if (!this.WebSocketCtor) { + throw new Error('no WebSocket constructor can be found'); + } + this.destination = new ReplaySubject(); } + } - this.destination = new ReplaySubject(); + lift(operator: Operator): WebSocketSubject { + const sock = new WebSocketSubject(this, this.destination); + sock.operator = operator; + return sock; } // TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures @@ -102,7 +113,10 @@ export class WebSocketSubject extends AnonymousSubject { } private _connectSocket() { - const socket = this.protocol ? new WebSocket(this.url, this.protocol) : new WebSocket(this.url); + const { WebSocketCtor } = this; + const socket = this.protocol ? + new WebSocketCtor(this.url, this.protocol) : + new WebSocketCtor(this.url); this.socket = socket; const subscription = new Subscription(() => { this.socket = null; @@ -178,6 +192,10 @@ export class WebSocketSubject extends AnonymousSubject { } protected _subscribe(subscriber: Subscriber): Subscription { + const { source } = this; + if (source) { + return source.subscribe(subscriber); + } if (!this.socket) { this._connectSocket(); } @@ -194,12 +212,14 @@ export class WebSocketSubject extends AnonymousSubject { } unsubscribe() { - const { socket } = this; + const { source, socket } = this; if (socket && socket.readyState === 1) { socket.close(); this.socket = null; } super.unsubscribe(); - this.destination = new ReplaySubject(); + if (!source) { + this.destination = new ReplaySubject(); + } } }