Skip to content

Commit

Permalink
fix(bindCallback): now emits errors that happen after callback
Browse files Browse the repository at this point in the history
- No longer logs to console warn
- AsyncSubject overhead no longer required
- Inlined the scheduled path as is it deprecated
  • Loading branch information
benlesh committed Aug 26, 2020
1 parent edc28cf commit 2bddd31
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 96 deletions.
18 changes: 9 additions & 9 deletions spec/observables/bindCallback-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,17 @@ describe('bindCallback', () => {
});
});

it('should not swallow post-callback errors', () => {
it('should emit post-callback errors', () => {
function badFunction(callback: (answer: number) => void): void {
callback(42);
throw new Error('kaboom');
}
const consoleStub = sinon.stub(console, 'warn');
try {
bindCallback(badFunction)().subscribe();
expect(consoleStub).to.have.property('called', true);
} finally {
consoleStub.restore();
throw 'kaboom';
}
let receivedError: any;

bindCallback(badFunction)().subscribe({
error: err => receivedError = err
});

expect(receivedError).to.equal('kaboom');
});
});
158 changes: 74 additions & 84 deletions src/internal/observable/bindCallback.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { SchedulerLike, SchedulerAction } from '../types';
import { SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { AsyncSubject } from '../AsyncSubject';
import { Subscriber } from '../Subscriber';
import { map } from '../operators/map';
import { canReportError } from '../util/canReportError';
import { isArray } from '../util/isArray';
import { isScheduler } from '../util/isScheduler';

Expand Down Expand Up @@ -192,98 +189,91 @@ export function bindCallback<T>(
}
}


return function (this: any, ...args: any[]): Observable<T> {
const context = this;
let subject: AsyncSubject<T> | undefined;
const params = {
context,
subject: undefined,
callbackFunc,
scheduler: scheduler!,
};
return new Observable<T>(subscriber => {
let results: any;
let hasResults = false;
let hasError = false;
let error: any;
return new Observable<T>((subscriber) => {
if (!scheduler) {
if (!subject) {
subject = new AsyncSubject<T>();
let isCurrentlyAsync = false;
let hasCompletedSynchronously = false;
if (hasResults) {
subscriber.next(results);
subscriber.complete();
} else if (hasError) {
subscriber.error(error);
} else {
const handler = (...innerArgs: any[]) => {
subject!.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs);
subject!.complete();
hasResults = true;
results = innerArgs.length <= 1 ? innerArgs[0] : innerArgs;
subscriber.next(results);
if (isCurrentlyAsync) {
subscriber.complete();
} else {
hasCompletedSynchronously = true;
}
};

try {
callbackFunc.apply(context, [...args, handler]);
callbackFunc.apply(this, [...args, handler]);
} catch (err) {
if (canReportError(subject)) {
subject.error(err);
} else {
console.warn(err);
}
hasError = true;
error = err;
subscriber.error(err);
}
isCurrentlyAsync = true;

if (hasCompletedSynchronously && !hasError) {
subscriber.complete();
}
}
return subject.subscribe(subscriber);
return;
} else {
const state: DispatchState<T> = {
args, subscriber, params,
const scheduleNext = (value: any[]) => {
hasResults = true;
results = value.length <= 1 ? value[0] : value;
subscriber.add(
scheduler!.schedule(() => {
subscriber.next(results);
subscriber.add(
scheduler!.schedule(() => {
subscriber.complete();
})
);
})
);
};

const scheduleError = (err: any) => {
hasError = true;
error = err;
subscriber.add(
scheduler!.schedule(() => {
subscriber.error(error);
})
);
};
return scheduler.schedule<DispatchState<T>>(dispatch as any, 0, state);

return scheduler.schedule(() => {
if (hasResults) {
scheduleNext(results);
} else if (hasError) {
scheduleError(error);
} else {
try {
callbackFunc.apply(this, [
...args,
(...innerArgs: any[]) => scheduleNext(innerArgs)
]);
} catch (err) {
scheduleError(err);
return;
}
}
});
}
});
};
}

interface DispatchState<T> {
args: any[];
subscriber: Subscriber<T>;
params: ParamsContext<T>;
}

interface ParamsContext<T> {
callbackFunc: Function;
scheduler: SchedulerLike;
context: any;
subject?: AsyncSubject<T>;
}

function dispatch<T>(this: SchedulerAction<DispatchState<T>>, state: DispatchState<T>) {
const self = this;
const { args, subscriber, params } = state;
const { callbackFunc, context, scheduler } = params;
let { subject } = params;
if (!subject) {
subject = params.subject = new AsyncSubject<T>();

const handler = (...innerArgs: any[]) => {
const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs;
this.add(scheduler.schedule<NextState<T>>(dispatchNext as any, 0, { value, subject: subject! }));
};

try {
callbackFunc.apply(context, [...args, handler]);
} catch (err) {
subject.error(err);
}
}

this.add(subject.subscribe(subscriber));
}

interface NextState<T> {
subject: AsyncSubject<T>;
value: T;
}

function dispatchNext<T>(this: SchedulerAction<NextState<T>>, state: NextState<T>) {
const { value, subject } = state;
subject.next(value);
subject.complete();
}

interface ErrorState<T> {
subject: AsyncSubject<T>;
err: any;
}

function dispatchError<T>(this: SchedulerAction<ErrorState<T>>, state: ErrorState<T>) {
const { err, subject } = state;
subject.error(err);
}
4 changes: 1 addition & 3 deletions src/internal/observable/bindNodeCallback.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/** @prettier */
import { Observable } from '../Observable';
import { AsyncSubject } from '../AsyncSubject';
import { Subscriber } from '../Subscriber';
import { SchedulerAction, SchedulerLike } from '../types';
import { SchedulerLike } from '../types';
import { map } from '../operators/map';
import { isScheduler } from '../util/isScheduler';
import { isArray } from '../util/isArray';
Expand Down

0 comments on commit 2bddd31

Please sign in to comment.