Skip to content

Commit

Permalink
Merge pull request #3289 from benlesh/onErrorResumeNext-fn
Browse files Browse the repository at this point in the history
fix(onErrorResumeNext): no longer holds onto subscriptions too long
  • Loading branch information
benlesh authored Feb 2, 2018
2 parents d487d6b + b781e27 commit c4756c3
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 108 deletions.
57 changes: 33 additions & 24 deletions spec/observables/onErrorResumeNext-spec.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,54 @@
import * as Rx from '../../src/Rx';
import { onErrorResumeNext } from '../../src/create';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;

const Observable = Rx.Observable;

describe('Observable.onErrorResumeNext', () => {
describe('onErrorResumeNext', () => {
it('should continue with observables', () => {
const source = hot('--a--b--#');
const next1 = cold( '--c--d--#');
const next2 = cold( '--e--#');
const next3 = cold( '--f--g--|');
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(Observable.onErrorResumeNext(source, next1, next2, next3)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
const s1 = hot('--a--b--#');
const s2 = cold( '--c--d--#');
const s3 = cold( '--e--#');
const s4 = cold( '--f--g--|');
const subs1 = '^ !';
const subs2 = ' ^ !';
const subs3 = ' ^ !';
const subs4 = ' ^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(onErrorResumeNext(s1, s2, s3, s4)).toBe(expected);
expectSubscriptions(s1.subscriptions).toBe(subs1);
expectSubscriptions(s2.subscriptions).toBe(subs2);
expectSubscriptions(s3.subscriptions).toBe(subs3);
expectSubscriptions(s4.subscriptions).toBe(subs4);
});

it('should continue array of observables', () => {
const source = hot('--a--b--#');
const next = [ source,
cold( '--c--d--#'),
cold( '--e--#'),
cold( '--f--g--|')];
const subs = '^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(Observable.onErrorResumeNext(next)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
const s1 = hot('--a--b--#');
const s2 = cold( '--c--d--#');
const s3 = cold( '--e--#');
const s4 = cold( '--f--g--|');
const subs1 = '^ !';
const subs2 = ' ^ !';
const subs3 = ' ^ !';
const subs4 = ' ^ !';
const expected = '--a--b----c--d----e----f--g--|';

expectObservable(onErrorResumeNext([s1, s2, s3, s4])).toBe(expected);
expectSubscriptions(s1.subscriptions).toBe(subs1);
expectSubscriptions(s2.subscriptions).toBe(subs2);
expectSubscriptions(s3.subscriptions).toBe(subs3);
expectSubscriptions(s4.subscriptions).toBe(subs4);
});

it('should complete single observable throws', () => {
const source = hot('#');
const subs = '(^!)';
const expected = '|';

expectObservable(Observable.onErrorResumeNext(source)).toBe(expected);
expectObservable(onErrorResumeNext(source)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
132 changes: 48 additions & 84 deletions src/internal/observable/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import { Observable, ObservableInput } from '../Observable';
import { from } from './from';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { isArray } from '..//util/isArray';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '..//util/subscribeToResult';
import { isArray } from '../util/isArray';
import { EMPTY } from './empty';

/* tslint:disable:max-line-length */
export function onErrorResumeNext<R>(v: ObservableInput<R>): Observable<R>;
Expand All @@ -26,40 +22,37 @@ export function onErrorResumeNext<R>(array: ObservableInput<any>[]): Observable<
*
* <img src="./img/onErrorResumeNext.png" width="100%">
*
* `onErrorResumeNext` is an operator that accepts a series of Observables, provided either directly as
* arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same
* as the source.
* `onErrorResumeNext` Will subscribe to each observable source it is provided, in order.
* If the source it's subscribed to emits an error or completes, it will move to the next source
* without error.
*
* `onErrorResumeNext` returns an Observable that starts by subscribing and re-emitting values from the source Observable.
* When its stream of values ends - no matter if Observable completed or emitted an error - `onErrorResumeNext`
* will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting
* its values as well and - again - when that stream ends, `onErrorResumeNext` will proceed to subscribing yet another
* Observable in provided series, no matter if previous Observable completed or ended with an error. This will
* be happening until there is no more Observables left in the series, at which point returned Observable will
* complete - even if the last subscribed stream ended with an error.
* If `onErrorResumeNext` is provided no arguments, or a single, empty array, it will return {@link EMPTY}.
*
* `onErrorResumeNext` can be therefore thought of as version of {@link concat} operator, which is more permissive
* when it comes to the errors emitted by its input Observables. While `concat` subscribes to the next Observable
* in series only if previous one successfully completed, `onErrorResumeNext` subscribes even if it ended with
* an error.
*
* Note that you do not get any access to errors emitted by the Observables. In particular do not
* expect these errors to appear in error callback passed to {@link subscribe}. If you want to take
* specific actions based on what error was emitted by an Observable, you should try out {@link catch} instead.
* `onErrorResumeNext` is basically {@link concat}, only it will continue, even if one of its
* sources emits an error.
*
* Note that there is no way to handle any errors thrown by sources via the resuult of
* `onErrorResumeNext`. If you want to handle errors thrown in any given source, you can
* always use the {@link catchError} operator on them before passing them into `onErrorResumeNext`.
*
* @example <caption>Subscribe to the next Observable after map fails</caption>
* Rx.Observable.of(1, 2, 3, 0)
* .map(x => {
* if (x === 0) { throw Error(); }
return 10 / x;
* })
* .onErrorResumeNext(Rx.Observable.of(1, 2, 3))
* .subscribe(
* val => console.log(val),
* err => console.log(err), // Will never be called.
* () => console.log('that\'s it!')
* );
* import { onErrorResumeNext, of } from 'rxjs/create';
* import { map } from 'rxjs/operators';
*
* onErrorResumeNext(
* of(1, 2, 3, 0).pipe(
* map(x => {
* if (x === 0) throw Error();
* return 10 / x;
* })
* ),
* of(1, 2, 3),
* )
* .subscribe(
* val => console.log(val),
* err => console.log(err), // Will never be called.
* () => console.log('done')
* );
*
* // Logs:
* // 10
Expand All @@ -68,67 +61,38 @@ export function onErrorResumeNext<R>(array: ObservableInput<any>[]): Observable<
* // 1
* // 2
* // 3
* // "that's it!"
* // "done"
*
* @see {@link concat}
* @see {@link catch}
*
* @param {...ObservableInput} observables Observables passed either directly or as an array.
* @return {Observable} An Observable that emits values from source Observable, but - if it errors - subscribes
* to the next passed Observable and so on, until it completes or runs out of Observables.
* @method onErrorResumeNext
* @owner Observable
* @param {...ObservableInput} sources Observables (or anything that *is* observable) passed either directly or as an array.
* @return {Observable} An Observable that concatenates all sources, one after the other,
* ignoring all errors, such that any error causes it to move on to the next source.
*/
export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
export function onErrorResumeNext<T, R>(...sources: Array<ObservableInput<any> |
Array<ObservableInput<any>> |
((...values: Array<any>) => R)>): Observable<R> {
let source: ObservableInput<any> = null;

if (nextSources.length === 1 && isArray(nextSources[0])) {
nextSources = <Array<ObservableInput<any>>>nextSources[0];
}
source = nextSources.shift();

return from(source, null).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
}

class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
constructor(private nextSources: Array<ObservableInput<any>>) {
}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
}
}

class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
constructor(protected destination: Subscriber<T>,
private nextSources: Array<ObservableInput<any>>) {
super(destination);
}

notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
this.subscribeToNextSource();
if (sources.length === 0) {
return EMPTY;
}

notifyComplete(innerSub: InnerSubscriber<T, any>): void {
this.subscribeToNextSource();
}
const [ first, ...remainder ] = sources;

protected _error(err: any): void {
this.subscribeToNextSource();
if (sources.length === 1 && isArray(first)) {
return onErrorResumeNext(...first);
}

protected _complete(): void {
this.subscribeToNextSource();
}
return new Observable(subscriber => {
const subNext = () => subscriber.add(
onErrorResumeNext(...remainder).subscribe(subscriber)
);

private subscribeToNextSource(): void {
const next = this.nextSources.shift();
if (next) {
this.add(subscribeToResult(this, next));
} else {
this.destination.complete();
}
}
return from(first).subscribe({
next(value) { subscriber.next(value); },
error: subNext,
complete: subNext,
});
});
}

0 comments on commit c4756c3

Please sign in to comment.