diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 8280955dcf..92df102bc6 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -67,7 +67,7 @@ export interface CoreOperators { switchMap?: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; switchMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take?: (count: number) => Observable; - takeUntil?: (observable: Observable) => Observable; + takeUntil?: (notifier: Observable) => Observable; throttle?: (delay: number, scheduler?: Scheduler) => Observable; timeout?: (due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith?: (due: number|Date, withObservable: Observable, scheduler?: Scheduler) => Observable; diff --git a/src/operators/takeUntil.ts b/src/operators/takeUntil.ts index 2430bad565..1e84320632 100644 --- a/src/operators/takeUntil.ts +++ b/src/operators/takeUntil.ts @@ -2,41 +2,48 @@ import Operator from '../Operator'; import Observable from '../Observable'; import Subscriber from '../Subscriber'; -export default function takeUntil(observable: Observable) { - return this.lift(new TakeUntilOperator(observable)); +export default function takeUntil(notifier: Observable) { + return this.lift(new TakeUntilOperator(notifier)); } class TakeUntilOperator implements Operator { - - observable: Observable; - - constructor(observable: Observable) { - this.observable = observable; + constructor(private notifier: Observable) { } call(subscriber: Subscriber): Subscriber { - return new TakeUntilSubscriber(subscriber, this.observable); + return new TakeUntilSubscriber(subscriber, this.notifier); } } class TakeUntilSubscriber extends Subscriber { + private notificationSubscriber: TakeUntilInnerSubscriber = null; + constructor(destination: Subscriber, - observable: Observable) { + private notifier: Observable) { super(destination); - this.add(observable._subscribe(new TakeUntilInnerSubscriber(destination))); + this.notificationSubscriber = new TakeUntilInnerSubscriber(destination); + this.add(notifier.subscribe(this.notificationSubscriber)); + } + + _complete() { + this.destination.complete(); + this.notificationSubscriber.unsubscribe(); } } class TakeUntilInnerSubscriber extends Subscriber { - constructor(destination: Subscriber) { - super(destination); + constructor(protected destination: Subscriber) { + super(null); } + _next() { this.destination.complete(); } + _error(e) { this.destination.error(e); } + _complete() { } } \ No newline at end of file