Skip to content

Commit

Permalink
refactor(catchError): simplify implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
josepot committed Aug 26, 2020
1 parent 837ed8d commit 5893948
Showing 1 changed file with 14 additions and 26 deletions.
40 changes: 14 additions & 26 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,41 +112,29 @@ export function catchError<T, O extends ObservableInput<any>>(
lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null = null;
let syncUnsub = false;
let handledResult: Observable<ObservedValueOf<O>>;
let nextSource: Observable<T> | null = null;
let sourceSubscription: Subscription | null = null;

const handleError = (err: any) => {
try {
handledResult = from(selector(err, catchError(selector)(source)));
} catch (err) {
subscriber.error(err);
return;
const tryToSwapSubscription = () => {
if (sourceSubscription && nextSource && !subscriber.closed) {
subscription.add(nextSource.subscribe(subscriber));
sourceSubscription = nextSource = null;
}
};

innerSub = source.subscribe(
sourceSubscription = source.subscribe(
new CatchErrorSubscriber(subscriber, (err) => {
handleError(err);
if (handledResult) {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult.subscribe(subscriber));
} else {
syncUnsub = true;
}
try {
nextSource = from(selector(err, catchError(selector)(source)));
tryToSwapSubscription();
} catch (selectorErr) {
subscriber.error(selectorErr);
}
})
);

if (syncUnsub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult!.subscribe(subscriber));
} else {
subscription.add(innerSub);
}
subscription.add(sourceSubscription);
tryToSwapSubscription();

return subscription;
});
Expand Down

0 comments on commit 5893948

Please sign in to comment.