Skip to content

Commit

Permalink
fix(takeUntil): unsubscribe notifier when it completes
Browse files Browse the repository at this point in the history
Fix operator takeUntil to automatically unsubscribe the notifier when it
completes. This is to conform RxJS Next with RxJS 4.

Somewhat related to issue #577.
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 27, 2015
1 parent 25151a7 commit 9415196
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export interface CoreOperators<T> {
switchMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
switchMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take?: (count: number) => Observable<T>;
takeUntil?: (observable: Observable<any>) => Observable<T>;
takeUntil?: (notifier: Observable<any>) => Observable<T>;
throttle?: (delay: number, scheduler?: Scheduler) => Observable<T>;
timeout?: <T>(due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith?: <T>(due: number|Date, withObservable: Observable<any>, scheduler?: Scheduler) => Observable<T>;
Expand Down
31 changes: 19 additions & 12 deletions src/operators/takeUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,48 @@ import Operator from '../Operator';
import Observable from '../Observable';
import Subscriber from '../Subscriber';

export default function takeUntil<T>(observable: Observable<any>) {
return this.lift(new TakeUntilOperator(observable));
export default function takeUntil<T>(notifier: Observable<any>) {
return this.lift(new TakeUntilOperator(notifier));
}

class TakeUntilOperator<T, R> implements Operator<T, R> {

observable: Observable<any>;

constructor(observable: Observable<any>) {
this.observable = observable;
constructor(private notifier: Observable<any>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new TakeUntilSubscriber(subscriber, this.observable);
return new TakeUntilSubscriber(subscriber, this.notifier);
}
}

class TakeUntilSubscriber<T> extends Subscriber<T> {
private notificationSubscriber: TakeUntilInnerSubscriber<any> = null;

constructor(destination: Subscriber<T>,
observable: Observable<any>) {
private notifier: Observable<any>) {
super(destination);
this.add(observable._subscribe(new TakeUntilInnerSubscriber(destination)));
this.notificationSubscriber = new TakeUntilInnerSubscriber(destination);
this.add(notifier.subscribe(this.notificationSubscriber));
}

_complete() {
this.destination.complete();
this.notificationSubscriber.unsubscribe();
}
}

class TakeUntilInnerSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>) {
super(destination);
constructor(protected destination: Subscriber<T>) {
super(null);
}

_next() {
this.destination.complete();
}

_error(e) {
this.destination.error(e);
}

_complete() {
}
}

5 comments on commit 9415196

@masimplo
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test for this commit? TakeUntil does not do what is supposed to when notifier completes, but does so when notifier emits on version 5.0.0-beta.12 which is greater than this commit which was included 5.0.0-alpha-7

@staltz
Copy link
Member

@staltz staltz commented on 9415196 Jan 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@masimplo
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But these tests in the commits you are referencing are all for skipUntil not takeUntil.

I think a test like the following should be passing as well:

  it('should take values until notifier completes', () => {
    const e1 =     hot('--a--b--c--d--e--f--g--|');
    const e1subs =     '^            !          ';
    const e2 =     hot('-------------|          ');
    const e2subs =     '^            !          ';
    const expected =   '--a--b--c--d-|          ';

    expectObservable(e1.takeUntil(e2)).toBe(expected);
    expectSubscriptions(e1.subscriptions).toBe(e1subs);
    expectSubscriptions(e2.subscriptions).toBe(e2subs);
  });

but it is not. TakeUntil states that

If the notifier emits a value or a complete notification, the output Observable stops mirroring the source Observable and completes.

But there is no test for what happens when the notifier completes, which is what I am seeing not working in some sample code I am running trying out takeUntil.

Here is a jsbin demonstrating my case:
http://jsbin.com/lagoquz/1/edit?js,console

@staltz
Copy link
Member

@staltz staltz commented on 9415196 Jan 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please open an issue to discuss this? Not in a commit message

@masimplo
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure here you go #2304

Please sign in to comment.