Skip to content

Commit

Permalink
fix(takeUntil): takeUntil should subscribe to the source if notifier …
Browse files Browse the repository at this point in the history
…sync completes without emitting (#4039)

This bug was introduced in #3504, which intended to not subscribe to the source if the notifier synchronously emits a value, but instead it only skipped subscribe if it completed
  • Loading branch information
jayphelps authored and benlesh committed Aug 27, 2018
1 parent afee5d8 commit 21fd0b4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
14 changes: 12 additions & 2 deletions spec/operators/takeUntil-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { takeUntil, mergeMap } from 'rxjs/operators';
import { of } from 'rxjs';
import { of, EMPTY } from 'rxjs';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -56,13 +56,23 @@ describe('takeUntil operator', () => {

it('should complete without subscribing to the source when notifier synchronously emits', () => {
const e1 = hot('----a--|');
const e2 = of(0);
const e2 = of(1, 2, 3);
const expected = '(|) ';

expectObservable(e1.pipe(takeUntil(e2))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe([]);
});

it('should subscribe to the source when notifier synchronously completes without emitting', () => {
const e1 = hot('----a--|');
const e1subs = '^ !';
const e2 = EMPTY;
const expected = '----a--|';

expectObservable(e1.pipe(takeUntil(e2))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow unsubscribing explicitly and early', () => {
const e1 = hot('--a--b--c--d--e--f--g--|');
const e1subs = '^ ! ';
Expand Down
4 changes: 3 additions & 1 deletion src/internal/operators/takeUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class TakeUntilOperator<T> implements Operator<T, T> {
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
const takeUntilSubscriber = new TakeUntilSubscriber(subscriber);
const notifierSubscription = subscribeToResult(takeUntilSubscriber, this.notifier);
if (notifierSubscription && !notifierSubscription.closed) {
if (notifierSubscription && !takeUntilSubscriber.seenValue) {
takeUntilSubscriber.add(notifierSubscription);
return source.subscribe(takeUntilSubscriber);
}
Expand All @@ -70,6 +70,7 @@ class TakeUntilOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TakeUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
seenValue = false;

constructor(destination: Subscriber<any>, ) {
super(destination);
Expand All @@ -78,6 +79,7 @@ class TakeUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.seenValue = true;
this.complete();
}

Expand Down

0 comments on commit 21fd0b4

Please sign in to comment.