Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(catchError): ensure proper handling of async return for synchrono… #5627

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions spec/operators/catch-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs';
import { catchError, map, mergeMap, takeWhile } from 'rxjs/operators';
import { catchError, map, mergeMap, takeWhile, delay } from 'rxjs/operators';
import * as sinon from 'sinon';
import { createObservableInputs } from '../helpers/test-helper';
import { TestScheduler } from 'rxjs/testing';
Expand Down Expand Up @@ -431,8 +431,8 @@ describe('catchError operator', () => {
// TODO(v8): see https://github.com/ReactiveX/rxjs/issues/5115
// The re-implementation in version 8 should fix the problem in the
// referenced issue. Closed subscribers should remain closed.
/*
it('issue #5115', (done: MochaDone) => {

it('Properly handle async handled result if source is synchronous', (done: MochaDone) => {
const source = new Observable<string>(observer => {
observer.error(new Error('kaboom!'));
observer.complete();
Expand All @@ -456,5 +456,5 @@ describe('catchError operator', () => {
}
);
});
*/

});
96 changes: 46 additions & 50 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { Operator } from '../Operator';
/** @prettier */
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';

import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { lift } from '../util/lift';
import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
import { Subscription } from '../Subscription';
import { from } from '../observable/from';

/* tslint:disable:max-line-length */
export function catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>;
export function catchError<T, O extends ObservableInput<any>>(
selector: (err: any, caught: Observable<T>) => O
): OperatorFunction<T, T | ObservedValueOf<O>>;
/* tslint:enable:max-line-length */

/**
Expand Down Expand Up @@ -105,55 +108,48 @@ export function catchError<T, O extends ObservableInput<any>>(selector: (err: an
export function catchError<T, O extends ObservableInput<any>>(
selector: (err: any, caught: Observable<T>) => O
): OperatorFunction<T, T | ObservedValueOf<O>> {
return function catchErrorOperatorFunction(source: Observable<T>): Observable<T | ObservedValueOf<O>> {
const operator = new CatchOperator(selector);
const caught = lift(source, operator);
operator.caught = caught;
return caught;
};
}
return (source: Observable<T>) =>
lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null = null;
let syncUnsub = false;
let handledResult: Observable<ObservedValueOf<O>>;

class CatchOperator<T, R> implements Operator<T, T | R> {
caught: Observable<T> | undefined;
const handleError = (err: any) => {
try {
handledResult = from(selector(err, catchError(selector)(source)));
} catch (err) {
subscriber.error(err);
return;
}
};

constructor(private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>) {
}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new CatchSubscriber(subscriber, this.selector, this.caught!));
}
}
innerSub = source.subscribe({
next: (value) => subscriber.next(value),
error: (err) => {
handleError(err);
if (handledResult) {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult.subscribe(subscriber));
} else {
syncUnsub = true;
}
}
},
complete: () => subscriber.complete(),
});

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class CatchSubscriber<T, R> extends SimpleOuterSubscriber<T, T | R> {
constructor(destination: Subscriber<any>,
private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>,
private caught: Observable<T>) {
super(destination);
}

// NOTE: overriding `error` instead of `_error` because we don't want
// to have this flag this subscriber as `isStopped`. We can mimic the
// behavior of the RetrySubscriber (from the `retry` operator), where
// we unsubscribe from our source chain, reset our Subscriber flags,
// then subscribe to the selector result.
error(err: any) {
if (!this.isStopped) {
let result: any;
try {
result = this.selector(err, this.caught);
} catch (err2) {
super.error(err2);
return;
if (syncUnsub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult!.subscribe(subscriber));
} else {
subscription.add(innerSub);
}
this._unsubscribeAndRecycle();
const innerSubscriber = new SimpleInnerSubscriber(this);
this.add(innerSubscriber);
innerSubscribe(result, innerSubscriber);
}
}

return subscription;
});
}