Skip to content

Commit

Permalink
feat: stopped notification handler (#5750)
Browse files Browse the repository at this point in the history
* fix: add undefined to config Promise type

And fix a spelling error.

* feat: add stopped registration point

* chore: wire up Subscriber

* chore: make call async

* chore: use notification exports

* chore: check callback before setTimeout

* chore: use null

* chore: api_guardian:update

* refactor: move factories to avoid circular dep

* chore: api_guardian:update

* chore: remove empty lines

* chore: remove canReportError and add tests

* test: use onStoppedNotification

* chore: remove unused import

* refactor: import notification factories directly

* refactor: use delegatable provider

* test: replace setTimeout with the provider

* chore: add and use timeout provider

* chore: use ObservableNotification type

* chore: update api_guardian files

* chore: comment the delegation of setTimeout, et al.
  • Loading branch information
cartant authored Oct 31, 2020
1 parent 0eff973 commit cfa267b
Show file tree
Hide file tree
Showing 14 changed files with 318 additions and 153 deletions.
3 changes: 2 additions & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ export declare function concat<T extends readonly unknown[]>(...inputsAndSchedul

export declare const config: {
onUnhandledError: ((err: any) => void) | null;
Promise: PromiseConstructorLike;
onStoppedNotification: ((notification: ObservableNotification<any>, subscriber: Subscriber<any>) => void) | null;
Promise: PromiseConstructorLike | undefined;
useDeprecatedSynchronousErrorHandling: boolean;
useDeprecatedNextContext: boolean;
};
Expand Down
9 changes: 5 additions & 4 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -965,12 +965,13 @@ describe('Observable.lift', () => {
});

it('should not swallow internal errors', (done) => {
config.onUnhandledError = (err) => {
expect(err).to.equal('bad');
config.onUnhandledError = null;
config.onStoppedNotification = (notification) => {
expect(notification.kind).to.equal('E');
expect(notification).to.have.property('error', 'bad');
config.onStoppedNotification = null;
done();
};

new Observable(subscriber => {
subscriber.error('test');
throw 'bad';
Expand Down
169 changes: 125 additions & 44 deletions spec/config-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { config } from '../src/internal/config';
import { expect } from 'chai';
import { Observable } from 'rxjs';
import { timeoutProvider } from 'rxjs/internal/scheduler/timeoutProvider';

describe('config', () => {
it('should have a Promise property that defaults to nothing', () => {
Expand All @@ -23,19 +24,19 @@ describe('config', () => {
let called = false;
const results: any[] = [];

config.onUnhandledError = err => {
config.onUnhandledError = (err) => {
called = true;
expect(err).to.equal('bad');
done()
done();
};

const source = new Observable<number>(subscriber => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.error('bad');
});

source.subscribe({
next: value => results.push(value),
next: (value) => results.push(value),
});
expect(called).to.be.false;
expect(results).to.deep.equal([1]);
Expand All @@ -45,73 +46,120 @@ describe('config', () => {
let called = false;
const results: any[] = [];

config.onUnhandledError = err => {
config.onUnhandledError = (err) => {
called = true;
expect(err).to.equal('bad');
done()
done();
};

const source = new Observable<number>(subscriber => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.error('bad');
});

source.subscribe(value => results.push(value));
source.subscribe((value) => results.push(value));
expect(called).to.be.false;
expect(results).to.deep.equal([1]);
});

it('should call asynchronously if an error is emitted and not handled by the consumer in the empty case', (done) => {
let called = false;
config.onUnhandledError = err => {
config.onUnhandledError = (err) => {
called = true;
expect(err).to.equal('bad');
done()
done();
};

const source = new Observable(subscriber => {
const source = new Observable((subscriber) => {
subscriber.error('bad');
});

source.subscribe();
expect(called).to.be.false;
});

it('should call asynchronously if a subscription setup errors after the subscription is closed by an error', (done) => {
/**
* This test is added so people know this behavior is _intentional_. It's part of the contract of observables
* and, while I'm not sure I like it, it might start surfacing untold numbers of errors, and break
* node applications if we suddenly changed this to start throwing errors on other jobs for instances
* where users accidentally called `subscriber.error` twice. Likewise, would we report an error
* for two calls of `complete`? This is really something a build-time tool like a linter should
* capture. Not a run time error reporting event.
*/
it('should not be called if two errors are sent to the subscriber', (done) => {
let called = false;
config.onUnhandledError = err => {
config.onUnhandledError = () => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable(subscriber => {
const source = new Observable((subscriber) => {
subscriber.error('handled');
subscriber.error('swallowed');
});

let syncSentError: any;
source.subscribe({
error: (err) => {
syncSentError = err;
},
});

expect(syncSentError).to.equal('handled');
// When called, onUnhandledError is called on a timeout, so delay the
// the assertion of the expectation until after the point at which
// onUnhandledError would have been called.
timeoutProvider.setTimeout(() => {
expect(called).to.be.false;
done();
});
});
});

describe('onStoppedNotification', () => {
afterEach(() => {
config.onStoppedNotification = null;
});

it('should default to null', () => {
expect(config.onStoppedNotification).to.be.null;
});

it('should be called asynchronously if a subscription setup errors after the subscription is closed by an error', (done) => {
let called = false;
config.onStoppedNotification = (notification) => {
called = true;
expect(notification.kind).to.equal('E');
expect(notification).to.have.property('error', 'bad');
done();
};

const source = new Observable((subscriber) => {
subscriber.error('handled');
throw 'bad';
});

let syncSentError: any;
source.subscribe({
error: err => {
error: (err) => {
syncSentError = err;
}
},
});

expect(syncSentError).to.equal('handled');
expect(called).to.be.false;
});

it('should call asynchronously if a subscription setup errors after the subscription is closed by a completion', (done) => {
it('should be called asynchronously if a subscription setup errors after the subscription is closed by a completion', (done) => {
let called = false;
let completed = false;

config.onUnhandledError = err => {
config.onStoppedNotification = (notification) => {
called = true;
expect(err).to.equal('bad');
done()
expect(notification.kind).to.equal('E');
expect(notification).to.have.property('error', 'bad');
done();
};

const source = new Observable(subscriber => {
const source = new Observable((subscriber) => {
subscriber.complete();
throw 'bad';
});
Expand All @@ -122,47 +170,80 @@ describe('config', () => {
},
complete: () => {
completed = true;
}
},
});

expect(completed).to.be.true;
expect(called).to.be.false;
});

/**
* Thie test is added so people know this behavior is _intentional_. It's part of the contract of observables
* and, while I'm not sure I like it, it might start surfacing untold numbers of errors, and break
* node applications if we suddenly changed this to start throwing errors on other jobs for instances
* where users accidentally called `subscriber.error` twice. Likewise, would we report an error
* for two calls of `complete`? This is really something a build-time tool like a linter should
* capture. Not a run time error reporting event.
*/
it('should not be called if two errors are sent to the subscriber', (done) => {
it('should be called if a next is sent to the stopped subscriber', (done) => {
let called = false;
config.onUnhandledError = () => {
config.onStoppedNotification = (notification) => {
called = true;
expect(notification.kind).to.equal('N');
expect(notification).to.have.property('value', 2);
done();
};

const source = new Observable(subscriber => {
const source = new Observable((subscriber) => {
subscriber.next(1);
subscriber.complete();
subscriber.next(2);
});

let syncSentValue: any;
source.subscribe({
next: (value) => {
syncSentValue = value;
},
});

expect(syncSentValue).to.equal(1);
expect(called).to.be.false;
});

it('should be called if two errors are sent to the subscriber', (done) => {
let called = false;
config.onStoppedNotification = (notification) => {
called = true;
expect(notification.kind).to.equal('E');
expect(notification).to.have.property('error', 'swallowed');
done();
};

const source = new Observable((subscriber) => {
subscriber.error('handled');
subscriber.error('swallowed');
});

let syncSentError: any;
source.subscribe({
error: err => {
error: (err) => {
syncSentError = err;
}
},
});

expect(syncSentError).to.equal('handled');
// This timeout would be scheduled _after_ any error timeout that might be scheduled
// (But we're not scheduling that), so this is just an artificial delay to make sure the
// behavior sticks.
setTimeout(() => {
expect(called).to.be.false;
expect(called).to.be.false;
});

it('should be called if two completes are sent to the subscriber', (done) => {
let called = false;
config.onStoppedNotification = (notification) => {
called = true;
expect(notification.kind).to.equal('C');
done();
};

const source = new Observable((subscriber) => {
subscriber.complete();
subscriber.complete();
});

source.subscribe();

expect(called).to.be.false;
});
});
});
});
2 changes: 1 addition & 1 deletion spec/operators/groupBy-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { groupBy, delay, tap, map, take, mergeMap, materialize, skip, ignoreElem
import { TestScheduler } from 'rxjs/testing';
import { ReplaySubject, of, Observable, Operator, Observer, interval, Subject } from 'rxjs';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { createNotification } from 'rxjs/internal/Notification';
import { createNotification } from 'rxjs/internal/NotificationFactories';

declare const rxTestScheduler: TestScheduler;

Expand Down
28 changes: 24 additions & 4 deletions spec/schedulers/TestScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import { hot, cold, expectObservable, expectSubscriptions, time } from '../helpe
import { TestScheduler } from 'rxjs/testing';
import { Observable, NEVER, EMPTY, Subject, of, merge, animationFrameScheduler, asapScheduler, asyncScheduler, interval } from 'rxjs';
import { delay, debounceTime, concatMap, mergeMap, mapTo, take } from 'rxjs/operators';
import { nextNotification, COMPLETE_NOTIFICATION, errorNotification } from 'rxjs/internal/Notification';
import { nextNotification, COMPLETE_NOTIFICATION, errorNotification } from 'rxjs/internal/NotificationFactories';
import { animationFrameProvider } from 'rxjs/internal/scheduler/animationFrameProvider';
import { immediateProvider } from 'rxjs/internal/scheduler/immediateProvider';
import { intervalProvider } from 'rxjs/internal/scheduler/intervalProvider';
import { timeoutProvider } from 'rxjs/internal/scheduler/timeoutProvider';

declare const rxTestScheduler: TestScheduler;

Expand Down Expand Up @@ -680,22 +681,41 @@ describe('TestScheduler', () => {
});
});

it('should schedule immediates before intervals', () => {
it('should schedule timeouts', () => {
const testScheduler = new TestScheduler(assertDeepEquals);
testScheduler.run(() => {
const values: string[] = [];
const { setTimeout } = timeoutProvider;
setTimeout(() => {
values.push(`a@${testScheduler.now()}`);
}, 1);
expect(values).to.deep.equal([]);
testScheduler.schedule(() => {
expect(values).to.deep.equal(['a@1']);
}, 10);
});
});

it('should schedule immediates before intervals and timeouts', () => {
const testScheduler = new TestScheduler(assertDeepEquals);
testScheduler.run(() => {
const values: string[] = [];
const { setImmediate } = immediateProvider;
const { setInterval, clearInterval } = intervalProvider;
const { setTimeout } = timeoutProvider;
const handle = setInterval(() => {
values.push(`a@${testScheduler.now()}`);
clearInterval(handle);
}, 0);
setImmediate(() => {
setTimeout(() => {
values.push(`b@${testScheduler.now()}`);
}, 0);
setImmediate(() => {
values.push(`c@${testScheduler.now()}`);
});
expect(values).to.deep.equal([]);
testScheduler.schedule(() => {
expect(values).to.deep.equal(['b@0', 'a@0']);
expect(values).to.deep.equal(['c@0', 'a@0', 'b@0']);
}, 10);
});
});
Expand Down
24 changes: 0 additions & 24 deletions spec/util/canReportError-spec.ts

This file was deleted.

Loading

0 comments on commit cfa267b

Please sign in to comment.