Skip to content

Commit

Permalink
refactor(catchError): simplify implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
josepot committed Aug 28, 2020
1 parent 837ed8d commit 818fc97
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 59 deletions.
58 changes: 26 additions & 32 deletions spec/operators/catchError-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,53 +455,47 @@ describe('catchError operator', () => {
let clean = false;
const testCleanup$ = new Observable(() => () => {
clean = true;
})
});

const subscription = throwError(new Error('Some error')).pipe(
catchError(() => testCleanup$),
).subscribe()
).subscribe();
expect(clean).to.equal(false);

subscription.unsubscribe()
subscription.unsubscribe();
expect(clean).to.equal(true);
});

it('should not alter the order of the teardown logic of the subscription chain', () => {
const error = 'bad'
const source = new Observable(subscriber => {
let i = 0;
for (; !subscriber.closed && i < 2; i++) {
subscriber.next(i);
}
if (i === 2) {
throw error;
}
subscriber.complete();
});
const logs: any[] = [];
const VALUE = 'VALUE';
const ERROR = 'ERROR';
const COMPLETE = 'COMPLETE';
const FINALIZED = 'FINALIZED';

const logs: any[] = []
let retries = 0;
source.pipe(
const source = concat(of(VALUE), throwError(ERROR)).pipe(
finalize(() => {
logs.push('finalized')
}),
logs.push(FINALIZED);
})
);

let hasRetried = false;
source.pipe(
catchError((err, caught) => {
if (retries++ < 1) {
return caught
if (hasRetried) {
throw err;
}
throw err
hasRetried = true;
return caught;
})
).subscribe(
(value) => {
logs.push( value)
}, (e) => {
logs.push(e)
}, () => {
logs.push('complete')
}
)
.subscribe(
(value) => { logs.push(value); },
(e) => { logs.push(e); },
() => { logs.push(COMPLETE); }
);

const expectedLogs = [0, 1, 'finalized', 0, 1, error, 'finalized']
expect(logs).to.deep.equal(expectedLogs)
const expectedLogs = [VALUE, FINALIZED, VALUE, ERROR, FINALIZED];
expect(logs).to.deep.equal(expectedLogs);
});
});
39 changes: 12 additions & 27 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,42 +111,27 @@ export function catchError<T, O extends ObservableInput<any>>(
return (source: Observable<T>) =>
lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null = null;
let syncUnsub = false;
let handledResult: Observable<ObservedValueOf<O>>;
let nextSource: Observable<T> | null = null;
let subscription: Subscription;

const handleError = (err: any) => {
try {
handledResult = from(selector(err, catchError(selector)(source)));
} catch (err) {
subscriber.error(err);
return;
const tryNextSubscription = () => {
if (subscription && nextSource && !subscriber.closed) {
nextSource.subscribe(subscriber);
}
};

innerSub = source.subscribe(
subscription = source.subscribe(
new CatchErrorSubscriber(subscriber, (err) => {
handleError(err);
if (handledResult) {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult.subscribe(subscriber));
} else {
syncUnsub = true;
}
try {
nextSource = from(selector(err, catchError(selector)(source)));
tryNextSubscription();
} catch (selectorErr) {
subscriber.error(selectorErr);
}
})
);

if (syncUnsub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult!.subscribe(subscriber));
} else {
subscription.add(innerSub);
}
tryNextSubscription();

return subscription;
});
Expand Down

0 comments on commit 818fc97

Please sign in to comment.