Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: fix(onErrorResumeNext): no longer hangs onto subscriptions longer tha… #3178

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions spec/observables/onErrorResumeNext-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ describe('Observable.onErrorResumeNext', () => {
const next1 = cold( '--c--d--#');
const next2 = cold( '--e--#');
const next3 = cold( '--f--g--|');
const subs = '^ !';
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(Observable.onErrorResumeNext(source, next1, next2, next3)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should continue array of observables', () => {
const source = hot('--a--b--#');
const source = hot( '--a--b--#');
const next = [ source,
cold( '--c--d--#'),
cold( '--e--#'),
cold( '--f--g--|')];
const subs = '^ !';
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(Observable.onErrorResumeNext(next)).toBe(expected);
Expand All @@ -42,4 +42,4 @@ describe('Observable.onErrorResumeNext', () => {
expectObservable(Observable.onErrorResumeNext(source)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});
264 changes: 133 additions & 131 deletions src/operators/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -1,94 +1,90 @@
import { Observable, ObservableInput } from '../Observable';
import { FromObservable } from '../observable/FromObservable';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { isArray } from '../util/isArray';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { OperatorFunction } from '../interfaces';
import { catchError } from '../operators/catchError';
import { from } from '../observable/from';
import { empty } from '../observable/empty';

/* tslint:disable:max-line-length */
export function onErrorResumeNext<T, R>(v: ObservableInput<R>): OperatorFunction<T, R>;
export function onErrorResumeNext<T, T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): OperatorFunction<T, R>;
export function onErrorResumeNext<T, T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): OperatorFunction<T, R>;
export function onErrorResumeNext<T, T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): OperatorFunction<T, R>;
export function onErrorResumeNext<T, T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): OperatorFunction<T, R> ;
export function onErrorResumeNext<T, R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): OperatorFunction<T, R>;
export function onErrorResumeNext<T, R>(array: ObservableInput<any>[]): OperatorFunction<T, R>;
// export function onErrorResumeNext<T, R>(v: ObservableInput<R>): OperatorFunction<T, R>;
// export function onErrorResumeNext<T, T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): OperatorFunction<T, R>;
// export function onErrorResumeNext<T, T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): OperatorFunction<T, R>;
// export function onErrorResumeNext<T, T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): OperatorFunction<T, R>;
// export function onErrorResumeNext<T, T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): OperatorFunction<T, R> ;
// export function onErrorResumeNext<T, R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): OperatorFunction<T, R>;
// export function onErrorResumeNext<T, R>(array: ObservableInput<any>[]): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */

/**
* When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one
* that was passed.
*
* <span class="informal">Execute series of Observables no matter what, even if it means swallowing errors.</span>
*
* <img src="./img/onErrorResumeNext.png" width="100%">
*
* `onErrorResumeNext` is an operator that accepts a series of Observables, provided either directly as
* arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same
* as the source.
*
* `onErrorResumeNext` returns an Observable that starts by subscribing and re-emitting values from the source Observable.
* When its stream of values ends - no matter if Observable completed or emitted an error - `onErrorResumeNext`
* will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting
* its values as well and - again - when that stream ends, `onErrorResumeNext` will proceed to subscribing yet another
* Observable in provided series, no matter if previous Observable completed or ended with an error. This will
* be happening until there is no more Observables left in the series, at which point returned Observable will
* complete - even if the last subscribed stream ended with an error.
*
* `onErrorResumeNext` can be therefore thought of as version of {@link concat} operator, which is more permissive
* when it comes to the errors emitted by its input Observables. While `concat` subscribes to the next Observable
* in series only if previous one successfully completed, `onErrorResumeNext` subscribes even if it ended with
* an error.
*
* Note that you do not get any access to errors emitted by the Observables. In particular do not
* expect these errors to appear in error callback passed to {@link subscribe}. If you want to take
* specific actions based on what error was emitted by an Observable, you should try out {@link catch} instead.
*
*
* @example <caption>Subscribe to the next Observable after map fails</caption>
* Rx.Observable.of(1, 2, 3, 0)
* .map(x => {
* if (x === 0) { throw Error(); }
return 10 / x;
* })
* .onErrorResumeNext(Rx.Observable.of(1, 2, 3))
* .subscribe(
* val => console.log(val),
* err => console.log(err), // Will never be called.
* () => console.log('that\'s it!')
* );
*
* // Logs:
* // 10
* // 5
* // 3.3333333333333335
* // 1
* // 2
* // 3
* // "that's it!"
*
* @see {@link concat}
* @see {@link catch}
*
* @param {...ObservableInput} observables Observables passed either directly or as an array.
* @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes
* to the next passed Observable and so on, until it completes or runs out of Observables.
* @method onErrorResumeNext
* @owner Observable
*/

export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
Array<ObservableInput<any>> |
((...values: Array<any>) => R)>): OperatorFunction<T, R> {
if (nextSources.length === 1 && isArray(nextSources[0])) {
nextSources = <Array<Observable<any>>>nextSources[0];
}

return (source: Observable<T>) => source.lift(new OnErrorResumeNextOperator<T, R>(nextSources));
}
// /**
// * When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one
// * that was passed.
// *
// * <span class="informal">Execute series of Observables no matter what, even if it means swallowing errors.</span>
// *
// * <img src="./img/onErrorResumeNext.png" width="100%">
// *
// * `onErrorResumeNext` is an operator that accepts a series of Observables, provided either directly as
// * arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same
// * as the source.
// *
// * `onErrorResumeNext` returns an Observable that starts by subscribing and re-emitting values from the source Observable.
// * When its stream of values ends - no matter if Observable completed or emitted an error - `onErrorResumeNext`
// * will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting
// * its values as well and - again - when that stream ends, `onErrorResumeNext` will proceed to subscribing yet another
// * Observable in provided series, no matter if previous Observable completed or ended with an error. This will
// * be happening until there is no more Observables left in the series, at which point returned Observable will
// * complete - even if the last subscribed stream ended with an error.
// *
// * `onErrorResumeNext` can be therefore thought of as version of {@link concat} operator, which is more permissive
// * when it comes to the errors emitted by its input Observables. While `concat` subscribes to the next Observable
// * in series only if previous one successfully completed, `onErrorResumeNext` subscribes even if it ended with
// * an error.
// *
// * Note that you do not get any access to errors emitted by the Observables. In particular do not
// * expect these errors to appear in error callback passed to {@link subscribe}. If you want to take
// * specific actions based on what error was emitted by an Observable, you should try out {@link catch} instead.
// *
// *
// * @example <caption>Subscribe to the next Observable after map fails</caption>
// * Rx.Observable.of(1, 2, 3, 0)
// * .map(x => {
// * if (x === 0) { throw Error(); }
// return 10 / x;
// * })
// * .onErrorResumeNext(Rx.Observable.of(1, 2, 3))
// * .subscribe(
// * val => console.log(val),
// * err => console.log(err), // Will never be called.
// * () => console.log('that\'s it!')
// * );
// *
// * // Logs:
// * // 10
// * // 5
// * // 3.3333333333333335
// * // 1
// * // 2
// * // 3
// * // "that's it!"
// *
// * @see {@link concat}
// * @see {@link catch}
// *
// * @param {...ObservableInput} observables Observables passed either directly or as an array.
// * @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes
// * to the next passed Observable and so on, until it completes or runs out of Observables.
// * @method onErrorResumeNext
// * @owner Observable
// */

// export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
// Array<ObservableInput<any>> |
// ((...values: Array<any>) => R)>): OperatorFunction<T, R> {
// if (nextSources.length === 1 && isArray(nextSources[0])) {
// nextSources = <Array<Observable<any>>>nextSources[0];
// }

// return (source: Observable<T>) => source.lift(new OnErrorResumeNextOperator<T, R>(nextSources));
// }

/* tslint:disable:max-line-length */
export function onErrorResumeNextStatic<R>(v: ObservableInput<R>): Observable<R>;
Expand All @@ -101,56 +97,62 @@ export function onErrorResumeNextStatic<R>(...observables: Array<ObservableInput
export function onErrorResumeNextStatic<R>(array: ObservableInput<any>[]): Observable<R>;
/* tslint:enable:max-line-length */

export function onErrorResumeNextStatic<T, R>(...nextSources: Array<ObservableInput<any> |
Array<ObservableInput<any>> |
((...values: Array<any>) => R)>): Observable<R> {
let source: ObservableInput<any> = null;

export function onErrorResumeNextStatic<T, R>(
...nextSources: Array<ObservableInput<any> |
Array<ObservableInput<any>> |
((...values: Array<any>) => R)>): Observable<R> {
if (nextSources.length === 1 && isArray(nextSources[0])) {
nextSources = <Array<ObservableInput<any>>>nextSources[0];
nextSources = nextSources[0] as Array<ObservableInput<any>>;
}
source = nextSources.shift();

return new FromObservable(source, null).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
}

class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
constructor(private nextSources: Array<ObservableInput<any>>) {
if (nextSources.length === 0) {
return empty();
}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
}
}

class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
constructor(protected destination: Subscriber<T>,
private nextSources: Array<ObservableInput<any>>) {
super(destination);
}
const [source, ...rest] = nextSources;

notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
this.subscribeToNextSource();
}

notifyComplete(innerSub: InnerSubscriber<T, any>): void {
this.subscribeToNextSource();
}

protected _error(err: any): void {
this.subscribeToNextSource();
}

protected _complete(): void {
this.subscribeToNextSource();
}

private subscribeToNextSource(): void {
const next = this.nextSources.shift();
if (next) {
this.add(subscribeToResult(this, next));
} else {
this.destination.complete();
}
}
return from(source).pipe(
catchError(err => onErrorResumeNextStatic(...rest))
);
}

// class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
// constructor(private nextSources: Array<ObservableInput<any>>) {
// }

// call(subscriber: Subscriber<R>, source: any): any {
// return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
// }
// }

// class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
// constructor(protected destination: Subscriber<T>,
// private nextSources: Array<ObservableInput<any>>) {
// super(destination);
// }

// notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
// this.subscribeToNextSource();
// }

// notifyComplete(innerSub: InnerSubscriber<T, any>): void {
// this.subscribeToNextSource();
// }

// protected _error(err: any): void {
// this.subscribeToNextSource();
// }

// protected _complete(): void {
// this.subscribeToNextSource();
// }

// private subscribeToNextSource(): void {
// const next = this.nextSources.shift();
// if (next) {
// this.add(subscribeToResult(this, next));
// } else {
// this.destination.complete();
// }
// }
// }