Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(repeatWhen): Ensure teardown happens between repeat subscriptions #5625

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion spec/operators/repeatWhen-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { repeatWhen, map, mergeMap, takeUntil } from 'rxjs/operators';
import { repeatWhen, map, mergeMap, takeUntil, takeWhile } from 'rxjs/operators';
import { of, EMPTY, Observable, Subscriber } from 'rxjs';

/** @test {repeatWhen} */
Expand Down Expand Up @@ -385,4 +385,25 @@ describe('repeatWhen operator', () => {
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should always teardown before starting the next cycle, even when synchronous', () => {
const results: any[] = [];
const source = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return () => {
results.push('teardown');
}
});
const subscription = source.pipe(repeatWhen(completions$ => completions$.pipe(
takeWhile((_, i) => i < 3)
))).subscribe({
next: value => results.push(value),
complete: () => results.push('complete')
});

expect(subscription.closed).to.be.true;
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown'])
});
});
178 changes: 88 additions & 90 deletions src/internal/operators/repeatWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,98 +38,96 @@ import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '..
* @name repeatWhen
*/
export function repeatWhen<T>(notifier: (notifications: Observable<void>) => Observable<any>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => lift(source, new RepeatWhenOperator(notifier));
}

class RepeatWhenOperator<T> implements Operator<T, T> {
constructor(protected notifier: (notifications: Observable<void>) => Observable<any>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new RepeatWhenSubscriber(subscriber, this.notifier, source));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class RepeatWhenSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {

private notifications: Subject<void> | null = null;
private retries: Observable<any> | null = null;
private retriesSubscription: Subscription | null | undefined = null;
private sourceIsBeingSubscribedTo: boolean = true;

constructor(destination: Subscriber<R>,
private notifier: (notifications: Observable<void>) => Observable<any>,
private source: Observable<T>) {
super(destination);
}

notifyNext(): void {
this.sourceIsBeingSubscribedTo = true;
this.source.subscribe(this);
}

notifyComplete(): void {
if (this.sourceIsBeingSubscribedTo === false) {
return super.complete();
}
}

complete() {
this.sourceIsBeingSubscribedTo = false;

if (!this.isStopped) {
if (!this.retries) {
this.subscribeToRetries();
return (source: Observable<T>) => lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null;
let syncResub = false;
let completions$: Subject<void>;
let isNotifierComplete = false;
let isMainComplete = false;

/**
* Gets the subject to send errors through. If it doesn't exist,
* we know we need to setup the notifier.
*/
const getCompletionSubject = () => {
if (!completions$) {
completions$ = new Subject();
let notifier$: Observable<any>;
// The notifier is a user-provided function, so we need to do
// some error handling.
try {
notifier$ = notifier(completions$);
} catch (err) {
subscriber.error(err);
// Returning null here will cause the code below to
// notice there's been a problem and skip error notification.
return null;
}
subscription.add(
notifier$.subscribe({
next: () => {
if (innerSub) {
subscribeForRepeatWhen();
} else {
// If we don't have an innerSub yet, that's because the inner subscription
// call hasn't even returned yet. We've arrived here synchronously.
// So we flag that we want to resub, such that we can ensure teardown
// happens before we resubscribe.
syncResub = true;
}
},
error: (err) => subscriber.error(err),
complete: () => {
isNotifierComplete = true;
if (isMainComplete) {
subscriber.complete();
}
},
})
);
}
if (!this.retriesSubscription || this.retriesSubscription.closed) {
return super.complete();
return completions$;
};

const subscribeForRepeatWhen = () => {
isMainComplete = false;
innerSub = source.subscribe({
next: (value) => subscriber.next(value),
error: (err) => subscriber.error(err),
complete: () => {
isMainComplete = true;
if (isNotifierComplete) {
subscriber.complete();
} else {
const completions$ = getCompletionSubject();
if (completions$) {
// We have set up the notifier without error.
completions$.next();
}
}
},
});
if (syncResub) {
// Ensure that the inner subscription is torn down before
// moving on to the next subscription in the synchronous case.
// If we don't do this here, all inner subscriptions will not be
// torn down until the entire observable is done.
innerSub.unsubscribe();
innerSub = null;
// We may need to do this multiple times, so reset the flags.
syncResub = false;
// Resubscribe
subscribeForRepeatWhen();
} else {
subscription.add(innerSub);
}
};

this._unsubscribeAndRecycle();
this.notifications!.next();
}
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
const { notifications, retriesSubscription } = this;
if (notifications) {
notifications.unsubscribe();
this.notifications = null;
}
if (retriesSubscription) {
retriesSubscription.unsubscribe();
this.retriesSubscription = null;
}
this.retries = null;
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribeAndRecycle(): Subscriber<T> {
const { _unsubscribe } = this;

this._unsubscribe = null!;
super._unsubscribeAndRecycle();
this._unsubscribe = _unsubscribe;

return this;
}
// Start the subscription
subscribeForRepeatWhen();

private subscribeToRetries() {
this.notifications = new Subject();
let retries;
try {
const { notifier } = this;
retries = notifier(this.notifications);
} catch (e) {
return super.complete();
}
this.retries = retries;
this.retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this));
}
return subscription;
});
}