Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(catchError): simplify implementation #5666

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 52 additions & 28 deletions spec/operators/catchError-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs';
import { catchError, map, mergeMap, takeWhile, delay, take } from 'rxjs/operators';
import { concat, Observable, of, throwError, EMPTY, from } from 'rxjs';
import { catchError, map, mergeMap, delay, take, finalize } from 'rxjs/operators';
import * as sinon from 'sinon';
import { createObservableInputs } from '../helpers/test-helper';
import { TestScheduler } from 'rxjs/testing';
Expand Down Expand Up @@ -161,31 +161,6 @@ describe('catchError operator', () => {
});
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

throwError(new Error('Some error')).pipe(
catchError(() => synchronousObservable),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should catch error and replace it with a hot Observable', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --a--b--# ');
Expand Down Expand Up @@ -457,7 +432,6 @@ describe('catchError operator', () => {
);
});

// TODO: fix firehose unsubscription
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable(subscriber => {
Expand All @@ -477,4 +451,54 @@ describe('catchError operator', () => {
expect(sideEffects).to.deep.equal([0, 1, 2]);
});

it('should call the cleanup function of the catched observable when the source throws synchronously', () => {
let clean = false;
const testCleanup$ = new Observable(() => () => {
clean = true;
});

const subscription = throwError(new Error('Some error')).pipe(
catchError(() => testCleanup$),
).subscribe();
expect(clean).to.equal(false);

subscription.unsubscribe();
expect(clean).to.equal(true);
});

it('should not alter the order of the teardown logic of the subscription chain', () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is a little too clever.

I'd prefer that things were more clearly named. For example, you're leveraging i in that for loop, but from the name it's not clear what the intended use of i is, or why it lives outside of the for declaration. You have to study the code to realize its part of the test, because you're throwing if you "make it through the loop". That test is probably unnecessary BTW.

Basically just an of(1, 2, 3, 4, 5, 6, 7) or something would probably be fine.

Copy link
Contributor Author

@josepot josepot Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I got a bit lazy and I just made a test out of your comment 🙈 I will improve that, for sure. Thanks!

That test is probably unnecessary BTW.

I don't know, I think that it doesn't hurt. With my previous implementation all the tests were passing, but this test would have failed... Let me try to make it more explicit and understandable.

const logs: any[] = [];
const VALUE = 'VALUE';
const ERROR = 'ERROR';
const COMPLETE = 'COMPLETE';
const FINALIZED = 'FINALIZED';

const source = concat(
of(VALUE),
throwError(ERROR)
).pipe(
finalize(() => {
logs.push(FINALIZED);
})
);

let hasRetried = false;
source.pipe(
catchError((err, caught) => {
if (hasRetried) {
throw err;
}
hasRetried = true;
return caught;
})
)
.subscribe(
(value) => { logs.push(value); },
(e) => { logs.push(e); },
() => { logs.push(COMPLETE); }
);

const expectedLogs = [VALUE, FINALIZED, VALUE, ERROR, FINALIZED];
expect(logs).to.deep.equal(expectedLogs);
});
});
41 changes: 13 additions & 28 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,44 +111,29 @@ export function catchError<T, O extends ObservableInput<any>>(
return (source: Observable<T>) =>
lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null = null;
let syncUnsub = false;
let handledResult: Observable<ObservedValueOf<O>>;
let nextSource: Observable<T> | null = null;
let subscription: Subscription | null = null;

const handleError = (err: any) => {
try {
handledResult = from(selector(err, catchError(selector)(source)));
} catch (err) {
subscriber.error(err);
return;
const tryNextSubscription = () => {
if (subscription && nextSource && !subscriber.closed) {
subscription = nextSource.subscribe(subscriber);
}
};

innerSub = source.subscribe(
subscription = source.subscribe(
new CatchErrorSubscriber(subscriber, (err) => {
handleError(err);
if (handledResult) {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult.subscribe(subscriber));
} else {
syncUnsub = true;
}
try {
nextSource = from(selector(err, catchError(selector)(source)));
tryNextSubscription();
} catch (selectorErr) {
subscriber.error(selectorErr);
}
})
);

if (syncUnsub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult!.subscribe(subscriber));
} else {
subscription.add(innerSub);
}
tryNextSubscription();

return subscription;
return () => subscription!.unsubscribe();
});
}

Expand Down