From 7c61e57ebb80bba401d69216364bb5720ac8b01f Mon Sep 17 00:00:00 2001 From: PSanetra Date: Fri, 21 Dec 2018 15:49:10 +0100 Subject: [PATCH] fix(Observable): Fix Observable.subscribe to add operator TeardownLogic to returned Subscription. --- spec/Observable-spec.ts | 21 +++++++++++++++++++++ src/internal/Observable.ts | 2 +- src/internal/Subscription.ts | 29 ++++++++++++++++++----------- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 3627c37f15..872a57d852 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -5,6 +5,9 @@ import { Observer, TeardownLogic } from '../src/internal/types'; import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing'; import { map } from '../src/internal/operators/map'; import { noop } from '../src/internal/util/noop'; +import { NEVER } from '../src/internal/observable/never'; +import { Subscriber } from '../src/internal/Subscriber'; +import { Operator } from '../src/internal/Operator'; declare const asDiagram: any, rxTestScheduler: any; const Observable = Rx.Observable; @@ -697,6 +700,24 @@ describe('Observable.lift', () => { } } + it('should return Observable which calls TeardownLogic of operator on unsubscription', (done) => { + + const myOperator: Operator = { + call: (subscriber: Subscriber, source: any) => { + const subscription = source.subscribe((x: any) => subscriber.next(x)); + return () => { + subscription.unsubscribe(); + done(); + }; + } + }; + + NEVER.lift(myOperator) + .subscribe() + .unsubscribe(); + + }); + it('should be overrideable in a custom Observable type that composes', (done) => { const result = new MyCustomObservable((observer) => { observer.next(1); diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index b233618cbb..f841bdeb87 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -204,7 +204,7 @@ export class Observable implements Subscribable { const sink = toSubscriber(observerOrNext, error, complete); if (operator) { - operator.call(sink, this.source); + sink.add(operator.call(sink, this.source)); } else { sink.add( this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ? diff --git a/src/internal/Subscription.ts b/src/internal/Subscription.ts index 29ce9f8533..f97588a887 100644 --- a/src/internal/Subscription.ts +++ b/src/internal/Subscription.ts @@ -168,14 +168,15 @@ export class Subscription implements SubscriptionLike { } } - // Optimize for the common case when adding the first subscription. - const subscriptions = this._subscriptions; - if (subscriptions) { - subscriptions.push(subscription); - } else { - this._subscriptions = [subscription]; + if (subscription._addParent(this)) { + // Optimize for the common case when adding the first subscription. + const subscriptions = this._subscriptions; + if (subscriptions) { + subscriptions.push(subscription); + } else { + this._subscriptions = [subscription]; + } } - subscription._addParent(this); return subscription; } @@ -197,20 +198,26 @@ export class Subscription implements SubscriptionLike { } /** @internal */ - private _addParent(parent: Subscription) { + private _addParent(parent: Subscription): boolean { let { _parent, _parents } = this; - if (!_parent || _parent === parent) { - // If we don't have a parent, or the new parent is the same as the - // current parent, then set this._parent to the new parent. + if (_parent === parent) { + // If the new parent is the same as the current parent, then do nothing. + return false; + } else if (!_parent) { + // If we don't have a parent, then set this._parent to the new parent. this._parent = parent; + return true; } else if (!_parents) { // If there's already one parent, but not multiple, allocate an Array to // store the rest of the parent Subscriptions. this._parents = [parent]; + return true; } else if (_parents.indexOf(parent) === -1) { // Only add the new parent to the _parents list if it's not already there. _parents.push(parent); + return true; } + return false; } }