diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 9f00589303..5f46fa9115 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -198,7 +198,7 @@ export class Observable<T> implements Subscribable<T> { if (operator) { operator.call(sink, this.source); } else { - sink.add( + sink._addParentTeardownLogic( this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ? this._subscribe(sink) : this._trySubscribe(sink) diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index 6c87098664..11d643faf7 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -1,6 +1,6 @@ import { isFunction } from './util/isFunction'; import { empty as emptyObserver } from './Observer'; -import { Observer, PartialObserver } from './types'; +import { Observer, PartialObserver, TeardownLogic } from './types'; import { Subscription } from './Subscription'; import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber'; import { config } from './config'; @@ -47,6 +47,8 @@ export class Subscriber<T> extends Subscription implements Observer<T> { protected isStopped: boolean = false; protected destination: PartialObserver<any>; // this `any` is the escape hatch to erase extra type param (e.g. R) + private _parentSubscription: Subscription | null = null; + /** * @param {Observer|function(value: T): void} [destinationOrNext] A partially * defined Observer or a `next` callback function. @@ -76,7 +78,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> { const trustedSubscriber = destinationOrNext[rxSubscriberSymbol]() as Subscriber<any>; this.syncErrorThrowable = trustedSubscriber.syncErrorThrowable; this.destination = trustedSubscriber; - trustedSubscriber.add(this); + trustedSubscriber._addParentTeardownLogic(this); } else { this.syncErrorThrowable = true; this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext); @@ -114,6 +116,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> { if (!this.isStopped) { this.isStopped = true; this._error(err); + this._unsubscribeParentSubscription(); } } @@ -127,6 +130,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> { if (!this.isStopped) { this.isStopped = true; this._complete(); + this._unsubscribeParentSubscription(); } } @@ -152,6 +156,18 @@ export class Subscriber<T> extends Subscription implements Observer<T> { this.unsubscribe(); } + /** @deprecated This is an internal implementation detail, do not use. */ + _addParentTeardownLogic(parentTeardownLogic: TeardownLogic) { + this._parentSubscription = this.add(parentTeardownLogic); + } + + /** @deprecated This is an internal implementation detail, do not use. */ + _unsubscribeParentSubscription() { + if (this._parentSubscription !== null) { + this._parentSubscription.unsubscribe(); + } + } + /** @deprecated This is an internal implementation detail, do not use. */ _unsubscribeAndRecycle(): Subscriber<T> { const { _parent, _parents } = this; @@ -162,6 +178,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> { this.isStopped = false; this._parent = _parent; this._parents = _parents; + this._parentSubscription = null; return this; } } diff --git a/src/internal/observable/dom/WebSocketSubject.ts b/src/internal/observable/dom/WebSocketSubject.ts index e19e80724d..3e33400c45 100644 --- a/src/internal/observable/dom/WebSocketSubject.ts +++ b/src/internal/observable/dom/WebSocketSubject.ts @@ -276,9 +276,8 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> { if (!this._socket) { this._connectSocket(); } - let subscription = new Subscription(); - subscription.add(this._output.subscribe(subscriber)); - subscription.add(() => { + this._output.subscribe(subscriber); + subscriber.add(() => { const { _socket } = this; if (this._output.observers.length === 0) { if (_socket && _socket.readyState === 1) { @@ -287,7 +286,7 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> { this._resetState(); } }); - return subscription; + return subscriber; } unsubscribe() { diff --git a/src/internal/testing/ColdObservable.ts b/src/internal/testing/ColdObservable.ts index c185c293ab..29f620e288 100644 --- a/src/internal/testing/ColdObservable.ts +++ b/src/internal/testing/ColdObservable.ts @@ -23,11 +23,12 @@ export class ColdObservable<T> extends Observable<T> implements SubscriptionLogg super(function (this: Observable<T>, subscriber: Subscriber<any>) { const observable: ColdObservable<T> = this as any; const index = observable.logSubscribedFrame(); - subscriber.add(new Subscription(() => { + const subscription = new Subscription(); + subscription.add(new Subscription(() => { observable.logUnsubscribedFrame(index); })); observable.scheduleMessages(subscriber); - return subscriber; + return subscription; }); this.scheduler = scheduler; } diff --git a/src/internal/testing/HotObservable.ts b/src/internal/testing/HotObservable.ts index 28a94a4e95..34dcb60256 100644 --- a/src/internal/testing/HotObservable.ts +++ b/src/internal/testing/HotObservable.ts @@ -28,10 +28,12 @@ export class HotObservable<T> extends Subject<T> implements SubscriptionLoggable _subscribe(subscriber: Subscriber<any>): Subscription { const subject: HotObservable<T> = this; const index = subject.logSubscribedFrame(); - subscriber.add(new Subscription(() => { + const subscription = new Subscription(); + subscription.add(new Subscription(() => { subject.logUnsubscribedFrame(index); })); - return super._subscribe(subscriber); + subscription.add(super._subscribe(subscriber)); + return subscription; } setup() {