Skip to content

Commit

Permalink
refactor(catchError): simplify implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
josepot committed Aug 26, 2020
1 parent 837ed8d commit 3634261
Showing 1 changed file with 13 additions and 28 deletions.
41 changes: 13 additions & 28 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,44 +111,29 @@ export function catchError<T, O extends ObservableInput<any>>(
return (source: Observable<T>) =>
lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null = null;
let syncUnsub = false;
let handledResult: Observable<ObservedValueOf<O>>;
let nextSource: Observable<T> | null = null;
let subscription: Subscription;

const handleError = (err: any) => {
try {
handledResult = from(selector(err, catchError(selector)(source)));
} catch (err) {
subscriber.error(err);
return;
const tryToSwapSubscription = () => {
if (subscription && nextSource && !subscriber.closed) {
subscription = nextSource.subscribe(subscriber);
}
};

innerSub = source.subscribe(
subscription = source.subscribe(
new CatchErrorSubscriber(subscriber, (err) => {
handleError(err);
if (handledResult) {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult.subscribe(subscriber));
} else {
syncUnsub = true;
}
try {
nextSource = from(selector(err, catchError(selector)(source)));
tryToSwapSubscription();
} catch (selectorErr) {
subscriber.error(selectorErr);
}
})
);

if (syncUnsub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult!.subscribe(subscriber));
} else {
subscription.add(innerSub);
}
tryToSwapSubscription();

return subscription;
return () => subscription.unsubscribe();
});
}

Expand Down

0 comments on commit 3634261

Please sign in to comment.