Skip to content

Commit

Permalink
fix(Observable): errors thrown during subscription are now properly s…
Browse files Browse the repository at this point in the history
…ent down error channel (#2313)

- Fixes `Observable.create(fn)` and `new Observable(fn)` such that any error thrown in `fn` on subscription will be sent to the subscriber's error handler.
- Fixes a subject test that was relying on the errant behavior.

fixes #1833
  • Loading branch information
benlesh authored and jayphelps committed Jan 29, 2017
1 parent c70a09d commit d4a9aac
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 4 deletions.
24 changes: 24 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ describe('Observable', () => {
source.subscribe(function (x) { expect(x).to.equal(1); }, null, done);
});

it('should send errors thrown in the constructor down the error path', (done) => {
new Observable((observer) => {
throw new Error('this should be handled');
})
.subscribe({
error(err) {
expect(err).to.deep.equal(new Error('this should be handled'));
done();
}
});
});

describe('forEach', () => {
it('should iterate and return a Promise', (done: MochaDone) => {
const expected = [1, 2, 3];
Expand Down Expand Up @@ -582,6 +594,18 @@ describe('Observable.create', () => {
});
expect(called).to.be.true;
});

it('should send errors thrown in the passed function down the error path', (done) => {
Observable.create((observer) => {
throw new Error('this should be handled');
})
.subscribe({
error(err) {
expect(err).to.deep.equal(new Error('this should be handled'));
done();
}
});
});
});

/** @test {Observable} */
Expand Down
4 changes: 1 addition & 3 deletions spec/Subject-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,8 @@ describe('Subject', () => {
expect(() => {
subject.subscribe(
function (x) { results3.push(x); },
function (e) { results3.push('E'); },
() => { results3.push('C'); }
);
}).to.throw();
}).to.throw(Rx.ObjectUnsubscribedError);

expect(results1).to.deep.equal([1, 2, 3, 4, 5]);
expect(results2).to.deep.equal([3, 4, 5]);
Expand Down
12 changes: 11 additions & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export class Observable<T> implements Subscribable<T> {
if (operator) {
operator.call(sink, this.source);
} else {
sink.add(this._subscribe(sink));
sink.add(this._trySubscribe(sink));
}

if (sink.syncErrorThrowable) {
Expand All @@ -108,6 +108,16 @@ export class Observable<T> implements Subscribable<T> {
return sink;
}

private _trySubscribe(sink: Subscriber<T>): TeardownLogic {
try {
return this._subscribe(sink);
} catch (err) {
sink.syncErrorThrown = true;
sink.syncErrorValue = err;
sink.error(err);
}
}

/**
* @method forEach
* @param {Function} next a handler for each value emitted by the observable
Expand Down

0 comments on commit d4a9aac

Please sign in to comment.