Skip to content

Commit

Permalink
fix(takeUntil): If the notifier supplied to takeUntil synchronously e…
Browse files Browse the repository at this point in the history
…mits a value, the source should

1360
  • Loading branch information
trxcllnt committed Dec 15, 2016
1 parent 5f93f81 commit a1b18ec
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
9 changes: 9 additions & 0 deletions spec/operators/takeUntil-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ describe('Observable.prototype.takeUntil', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should complete without subscribing to the source when notifier synchronously emits', () => {
const e1 = hot('----a--|');
const e2 = Observable.of(0);
const expected = '(|) ';

expectObservable(e1.takeUntil(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe([]);
});

it('should raise error if source raises error before notifier emits', () => {
const e1 = hot('--a--b--c--d--# ');
const e1subs = '^ ! ';
Expand Down
12 changes: 8 additions & 4 deletions src/operator/takeUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ class TakeUntilOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new TakeUntilSubscriber(subscriber, this.notifier));
const takeUntilSubscriber = new TakeUntilSubscriber(subscriber);
const notifierSubscription = subscribeToResult(takeUntilSubscriber, this.notifier);
if (notifierSubscription && !notifierSubscription.closed) {
takeUntilSubscriber.add(notifierSubscription);
return source.subscribe(takeUntilSubscriber);
}
return takeUntilSubscriber;
}
}

Expand All @@ -60,10 +66,8 @@ class TakeUntilOperator<T> implements Operator<T, T> {
*/
class TakeUntilSubscriber<T, R> extends OuterSubscriber<T, R> {

constructor(destination: Subscriber<any>,
private notifier: Observable<any>) {
constructor(destination: Subscriber<any>) {
super(destination);
this.add(subscribeToResult(this, notifier));
}

notifyNext(outerValue: T, innerValue: R,
Expand Down

0 comments on commit a1b18ec

Please sign in to comment.