Skip to content

Commit

Permalink
feat(error-handling): add deprecated sync error handling behind a flag
Browse files Browse the repository at this point in the history
Given the LARGE number of people that seem to be using bad patterns like wrapping subscribe calls in a try catch, we're going to readd the synchronous error throwing behind a config flag during transition.
  • Loading branch information
benlesh committed Mar 16, 2018
1 parent 4898f52 commit 583cd1d
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 29 deletions.
28 changes: 28 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as Rx from '../src/internal/Rx';
import { Observer, TeardownLogic } from '../src/internal/types';
import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing';
import { map } from '../src/internal/operators/map';
import * as HostReportErrorModule from '../src/internal/util/hostReportError';
//tslint:disable-next-line
require('./helpers/test-helper');

Expand Down Expand Up @@ -531,6 +532,33 @@ describe('Observable', () => {

});
});

describe('if config.useDeprecatedSynchronousThrowing === true', () => {
beforeEach(() => {
Rx.config.useDeprecatedSynchronousErrorHandling = true;
});

it('should throw synchronously', () => {
expect(() => Observable.throwError(new Error()).subscribe())
.to.throw();
});

afterEach(() => {
Rx.config.useDeprecatedSynchronousErrorHandling = false;
});
});

describe('if config.useDeprecatedSynchronousThrowing === false', () => {
beforeEach(() => {
Rx.config.useDeprecatedSynchronousErrorHandling = false;
});

it('should call hostReportErrors', () => {
const spy = sinon.spy(HostReportErrorModule, 'hostReportError');
Observable.throwError(new Error()).subscribe();
expect(spy).to.have.been.calledOnce;
});
});
});

describe('pipe', () => {
Expand Down
2 changes: 1 addition & 1 deletion spec/observables/fromEvent-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ describe('fromEvent', () => {
};

fromEvent(obj as any, 'click').subscribe({
error(err) {
error(err: any) {
expect(err).to.exist
.and.be.instanceof(Error)
.and.have.property('message', 'Invalid event target');
Expand Down
13 changes: 13 additions & 0 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,26 @@ export class Observable<T> implements Subscribable<T> {
sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
}

if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}

return sink;
}

protected _trySubscribe(sink: Subscriber<T>): TeardownLogic {
try {
return this._subscribe(sink);
} catch (err) {
if (config.useDeprecatedSynchronousErrorHandling) {
sink.syncErrorThrown = true;
sink.syncErrorValue = err;
}
sink.error(err);
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/internal/Observer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import { Observer } from './types';
import { config } from './config';
import { hostReportError } from './util/hostReportError';

export const empty: Observer<any> = {
closed: true,
next(value: any): void { /* noop */},
error(err: any): void { throw err; },
error(err: any): void {
if (config.useDeprecatedSynchronousErrorHandling) {
throw err;
} else {
hostReportError(err);
}
},
complete(): void { /*noop*/ }
};
106 changes: 84 additions & 22 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { empty as emptyObserver } from './Observer';
import { Observer, PartialObserver } from './types';
import { Subscription } from './Subscription';
import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
import { config } from './config';
import { hostReportError } from './util/hostReportError';

/**
* Implements the {@link Observer} interface and extends the
Expand Down Expand Up @@ -33,9 +35,14 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
error?: (e?: any) => void,
complete?: () => void): Subscriber<T> {
const subscriber = new Subscriber(next, error, complete);
subscriber.syncErrorThrowable = false;
return subscriber;
}

public syncErrorValue: any = null;
public syncErrorThrown: boolean = false;
public syncErrorThrowable: boolean = false;

protected isStopped: boolean = false;
protected destination: PartialObserver<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)

Expand Down Expand Up @@ -66,12 +73,14 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this.destination = (<Subscriber<any>> destinationOrNext);
(<any> this.destination).add(this);
} else {
this.destination = new SafeSubscriber<T>(<PartialObserver<any>> destinationOrNext);
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
}
break;
}
default:
this.destination = new SafeSubscriber<T>(<((value: T) => void)> destinationOrNext, error, complete);
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
break;
}
}
Expand Down Expand Up @@ -160,7 +169,8 @@ class SafeSubscriber<T> extends Subscriber<T> {

private _context: any;

constructor(observerOrNext?: PartialObserver<T> | ((value: T) => void),
constructor(private _parentSubscriber: Subscriber<T>,
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
super();
Expand Down Expand Up @@ -191,48 +201,100 @@ class SafeSubscriber<T> extends Subscriber<T> {

next(value?: T): void {
if (!this.isStopped && this._next) {
try {
this._next.call(this._context, value);
} catch (err) {
this._hostReportError(err);
const { _parentSubscriber } = this;
if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._next, value);
} else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
this.unsubscribe();
}
}
}

error(err?: any): void {
if (!this.isStopped) {
const { _parentSubscriber } = this;
const { useDeprecatedSynchronousErrorHandling } = config;
if (this._error) {
try {
this._error.call(this._context, err);
} catch (err) {
this._hostReportError(err);
if (!useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._error, err);
this.unsubscribe();
} else {
this.__tryOrSetError(_parentSubscriber, this._error, err);
this.unsubscribe();
}
} else if (!_parentSubscriber.syncErrorThrowable) {
this.unsubscribe();
if (useDeprecatedSynchronousErrorHandling) {
throw err;
}
hostReportError(err);
} else {
this._hostReportError(err);
if (useDeprecatedSynchronousErrorHandling) {
_parentSubscriber.syncErrorValue = err;
_parentSubscriber.syncErrorThrown = true;
} else {
hostReportError(err);
}
this.unsubscribe();
}
this.unsubscribe();
}
}

complete(): void {
if (!this.isStopped) {
const { _parentSubscriber } = this;
if (this._complete) {
try {
this._complete.call(this._context);
} catch (err) {
this._hostReportError(err);
const wrappedComplete = () => this._complete.call(this._context);

if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(wrappedComplete);
this.unsubscribe();
} else {
this.__tryOrSetError(_parentSubscriber, wrappedComplete);
this.unsubscribe();
}
} else {
this.unsubscribe();
}
}
}

private __tryOrUnsub(fn: Function, value?: any): void {
try {
fn.call(this._context, value);
} catch (err) {
this.unsubscribe();
if (config.useDeprecatedSynchronousErrorHandling) {
throw err;
} else {
hostReportError(err);
}
}
}

protected _unsubscribe(): void {
this._context = null;
private __tryOrSetError(parent: Subscriber<T>, fn: Function, value?: any): boolean {
if (!config.useDeprecatedSynchronousErrorHandling) {
throw new Error('bad call');
}
try {
fn.call(this._context, value);
} catch (err) {
if (config.useDeprecatedSynchronousErrorHandling) {
parent.syncErrorValue = err;
parent.syncErrorThrown = true;
return true;
} else {
hostReportError(err);
return true;
}
}
return false;
}

private _hostReportError(err: any) {
setTimeout(() => { throw err; });
protected _unsubscribe(): void {
const { _parentSubscriber } = this;
this._context = null;
this._parentSubscriber = null;
_parentSubscriber.unsubscribe();
}
}
}
12 changes: 11 additions & 1 deletion src/internal/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,15 @@ export const config = {
* The promise constructor used by default for methods such as
* {@link toPromise} and {@link forEach}
*/
Promise
Promise,

/**
* If true, turns on synchronous error rethrowing, which is a deprecated behavior
* in v6 and higher. This behavior enables bad patterns like wrapping a subscribe
* call in a try/catch block. It also enables producer interference, a nasty bug
* where a multicast can be broken for all observers by a downstream consumer with
* an unhandled error. DO NOT USE THIS FLAG UNLESS IT'S NEEDED TO BY TIME
* FOR MIGRATION REASONS.
*/
useDeprecatedSynchronousErrorHandling: false,
};
8 changes: 8 additions & 0 deletions src/internal/util/hostReportError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* Throws an error on another job so that it's picked up by the runtime's
* uncaught error handling mechanism.
* @param err the error to throw
*/
export function hostReportError(err: any) {
setTimeout(() => { throw err; });

This comment has been minimized.

Copy link
@jamie-pate

jamie-pate Apr 20, 2018

Just curious of the decision to throw in a timeout instead of scheduling a microtask

}
6 changes: 2 additions & 4 deletions src/internal/util/subscribeToPromise.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Subscriber } from '../Subscriber';
import { hostReportError } from './hostReportError';

export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: Subscriber<T>) => {
promise.then(
Expand All @@ -10,9 +11,6 @@ export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: S
},
(err: any) => subscriber.error(err)
)
.then(null, (err: any) => {
// Escaping the Promise trap: globally throw unhandled errors
setTimeout(() => { throw err; });
});
.then(null, hostReportError);
return subscriber;
};

0 comments on commit 583cd1d

Please sign in to comment.