diff --git a/spec/operators/catchError-spec.ts b/spec/operators/catchError-spec.ts index 9eddb87f40..4dd12f0e20 100644 --- a/spec/operators/catchError-spec.ts +++ b/spec/operators/catchError-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs'; -import { catchError, map, mergeMap, takeWhile, delay, take } from 'rxjs/operators'; +import { concat, Observable, of, throwError, EMPTY, from } from 'rxjs'; +import { catchError, map, mergeMap, delay, take, finalize } from 'rxjs/operators'; import * as sinon from 'sinon'; import { createObservableInputs } from '../helpers/test-helper'; import { TestScheduler } from 'rxjs/testing'; @@ -161,31 +161,6 @@ describe('catchError operator', () => { }); }); - it('should stop listening to a synchronous observable when unsubscribed', () => { - const sideEffects: number[] = []; - const synchronousObservable = concat( - defer(() => { - sideEffects.push(1); - return of(1); - }), - defer(() => { - sideEffects.push(2); - return of(2); - }), - defer(() => { - sideEffects.push(3); - return of(3); - }) - ); - - throwError(new Error('Some error')).pipe( - catchError(() => synchronousObservable), - takeWhile((x) => x != 2) // unsubscribe at the second side-effect - ).subscribe(() => { /* noop */ }); - - expect(sideEffects).to.deep.equal([1, 2]); - }); - it('should catch error and replace it with a hot Observable', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const e1 = hot(' --a--b--# '); @@ -457,7 +432,6 @@ describe('catchError operator', () => { ); }); - // TODO: fix firehose unsubscription it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { @@ -477,4 +451,54 @@ describe('catchError operator', () => { expect(sideEffects).to.deep.equal([0, 1, 2]); }); + it('should call the cleanup function of the catched observable when the source throws synchronously', () => { + let clean = false; + const testCleanup$ = new Observable(() => () => { + clean = true; + }); + + const subscription = throwError(new Error('Some error')).pipe( + catchError(() => testCleanup$), + ).subscribe(); + expect(clean).to.equal(false); + + subscription.unsubscribe(); + expect(clean).to.equal(true); + }); + + it('should not alter the order of the teardown logic of the subscription chain', () => { + const logs: any[] = []; + const VALUE = 'VALUE'; + const ERROR = 'ERROR'; + const COMPLETE = 'COMPLETE'; + const FINALIZED = 'FINALIZED'; + + const source = concat( + of(VALUE), + throwError(ERROR) + ).pipe( + finalize(() => { + logs.push(FINALIZED); + }) + ); + + let hasRetried = false; + source.pipe( + catchError((err, caught) => { + if (hasRetried) { + throw err; + } + hasRetried = true; + return caught; + }) + ) + .subscribe( + (value) => { logs.push(value); }, + (e) => { logs.push(e); }, + () => { logs.push(COMPLETE); } + ); + + const expectedLogs = [VALUE, FINALIZED, VALUE, ERROR, FINALIZED]; + expect(logs).to.deep.equal(expectedLogs); + }); }); diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index f793d1b3de..f5e3d8b566 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -111,44 +111,29 @@ export function catchError>( return (source: Observable) => lift(source, function (this: Subscriber, source: Observable) { const subscriber = this; - const subscription = new Subscription(); - let innerSub: Subscription | null = null; - let syncUnsub = false; - let handledResult: Observable>; + let nextSource: Observable | null = null; + let subscription: Subscription | null = null; - const handleError = (err: any) => { - try { - handledResult = from(selector(err, catchError(selector)(source))); - } catch (err) { - subscriber.error(err); - return; + const tryNextSubscription = () => { + if (subscription && nextSource && !subscriber.closed) { + subscription = nextSource.subscribe(subscriber); } }; - innerSub = source.subscribe( + subscription = source.subscribe( new CatchErrorSubscriber(subscriber, (err) => { - handleError(err); - if (handledResult) { - if (innerSub) { - innerSub.unsubscribe(); - innerSub = null; - subscription.add(handledResult.subscribe(subscriber)); - } else { - syncUnsub = true; - } + try { + nextSource = from(selector(err, catchError(selector)(source))); + tryNextSubscription(); + } catch (selectorErr) { + subscriber.error(selectorErr); } }) ); - if (syncUnsub) { - innerSub.unsubscribe(); - innerSub = null; - subscription.add(handledResult!.subscribe(subscriber)); - } else { - subscription.add(innerSub); - } + tryNextSubscription(); - return subscription; + return () => subscription!.unsubscribe(); }); }