Skip to content

Commit

Permalink
fix(repeatWhen): Ensure teardown happens between repeat subscriptions (
Browse files Browse the repository at this point in the history
…#5625)

* fix(repeatWhen): Ensure teardown happens between repeat subscriptions

Resolves an issue where teardowns would wait until the resulting observable ended before being executed.

* chore: address comments
  • Loading branch information
benlesh authored Aug 5, 2020
1 parent 6752af7 commit 98356f4
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 91 deletions.
23 changes: 22 additions & 1 deletion spec/operators/repeatWhen-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { repeatWhen, map, mergeMap, takeUntil } from 'rxjs/operators';
import { repeatWhen, map, mergeMap, takeUntil, takeWhile } from 'rxjs/operators';
import { of, EMPTY, Observable, Subscriber } from 'rxjs';

/** @test {repeatWhen} */
Expand Down Expand Up @@ -385,4 +385,25 @@ describe('repeatWhen operator', () => {
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should always teardown before starting the next cycle, even when synchronous', () => {
const results: any[] = [];
const source = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return () => {
results.push('teardown');
}
});
const subscription = source.pipe(repeatWhen(completions$ => completions$.pipe(
takeWhile((_, i) => i < 3)
))).subscribe({
next: value => results.push(value),
complete: () => results.push('complete')
});

expect(subscription.closed).to.be.true;
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown'])
});
});
178 changes: 88 additions & 90 deletions src/internal/operators/repeatWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,98 +38,96 @@ import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '..
* @name repeatWhen
*/
export function repeatWhen<T>(notifier: (notifications: Observable<void>) => Observable<any>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => lift(source, new RepeatWhenOperator(notifier));
}

class RepeatWhenOperator<T> implements Operator<T, T> {
constructor(protected notifier: (notifications: Observable<void>) => Observable<any>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new RepeatWhenSubscriber(subscriber, this.notifier, source));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class RepeatWhenSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {

private notifications: Subject<void> | null = null;
private retries: Observable<any> | null = null;
private retriesSubscription: Subscription | null | undefined = null;
private sourceIsBeingSubscribedTo: boolean = true;

constructor(destination: Subscriber<R>,
private notifier: (notifications: Observable<void>) => Observable<any>,
private source: Observable<T>) {
super(destination);
}

notifyNext(): void {
this.sourceIsBeingSubscribedTo = true;
this.source.subscribe(this);
}

notifyComplete(): void {
if (this.sourceIsBeingSubscribedTo === false) {
return super.complete();
}
}

complete() {
this.sourceIsBeingSubscribedTo = false;

if (!this.isStopped) {
if (!this.retries) {
this.subscribeToRetries();
return (source: Observable<T>) => lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null;
let syncResub = false;
let completions$: Subject<void>;
let isNotifierComplete = false;
let isMainComplete = false;

/**
* Gets the subject to send errors through. If it doesn't exist,
* we know we need to setup the notifier.
*/
const getCompletionSubject = () => {
if (!completions$) {
completions$ = new Subject();
let notifier$: Observable<any>;
// The notifier is a user-provided function, so we need to do
// some error handling.
try {
notifier$ = notifier(completions$);
} catch (err) {
subscriber.error(err);
// Returning null here will cause the code below to
// notice there's been a problem and skip error notification.
return null;
}
subscription.add(
notifier$.subscribe({
next: () => {
if (innerSub) {
subscribeForRepeatWhen();
} else {
// If we don't have an innerSub yet, that's because the inner subscription
// call hasn't even returned yet. We've arrived here synchronously.
// So we flag that we want to resub, such that we can ensure teardown
// happens before we resubscribe.
syncResub = true;
}
},
error: (err) => subscriber.error(err),
complete: () => {
isNotifierComplete = true;
if (isMainComplete) {
subscriber.complete();
}
},
})
);
}
if (!this.retriesSubscription || this.retriesSubscription.closed) {
return super.complete();
return completions$;
};

const subscribeForRepeatWhen = () => {
isMainComplete = false;
innerSub = source.subscribe({
next: (value) => subscriber.next(value),
error: (err) => subscriber.error(err),
complete: () => {
isMainComplete = true;
if (isNotifierComplete) {
subscriber.complete();
} else {
const completions$ = getCompletionSubject();
if (completions$) {
// We have set up the notifier without error.
completions$.next();
}
}
},
});
if (syncResub) {
// Ensure that the inner subscription is torn down before
// moving on to the next subscription in the synchronous case.
// If we don't do this here, all inner subscriptions will not be
// torn down until the entire observable is done.
innerSub.unsubscribe();
innerSub = null;
// We may need to do this multiple times, so reset the flags.
syncResub = false;
// Resubscribe
subscribeForRepeatWhen();
} else {
subscription.add(innerSub);
}
};

this._unsubscribeAndRecycle();
this.notifications!.next();
}
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
const { notifications, retriesSubscription } = this;
if (notifications) {
notifications.unsubscribe();
this.notifications = null;
}
if (retriesSubscription) {
retriesSubscription.unsubscribe();
this.retriesSubscription = null;
}
this.retries = null;
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribeAndRecycle(): Subscriber<T> {
const { _unsubscribe } = this;

this._unsubscribe = null!;
super._unsubscribeAndRecycle();
this._unsubscribe = _unsubscribe;

return this;
}
// Start the subscription
subscribeForRepeatWhen();

private subscribeToRetries() {
this.notifications = new Subject();
let retries;
try {
const { notifier } = this;
retries = notifier(this.notifications);
} catch (e) {
return super.complete();
}
this.retries = retries;
this.retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this));
}
return subscription;
});
}

0 comments on commit 98356f4

Please sign in to comment.