Skip to content

Commit

Permalink
fix(AsyncSubject) should not allow complete() after error (#2280)
Browse files Browse the repository at this point in the history
* test(AsyncSubject): asyncsubject should not send complete event after error

* fix(AsynSubject): check if there is an error first

* test(AsyncSubject): asyncSubject should not send error event after complete

* fix(AsyncSubject): only pass error if asyncsubject is not completed
  • Loading branch information
victorg1991 authored and jayphelps committed Jan 27, 2017
1 parent f7402b9 commit c70a09d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
24 changes: 24 additions & 0 deletions spec/subjects/AsyncSubject-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ describe('AsyncSubject', () => {

subscription.unsubscribe();
observer.results = [];

subject.error(new Error(''));

subject.subscribe(observer);
expect(observer.results).to.deep.equal(['done']);
});
Expand Down Expand Up @@ -168,4 +171,25 @@ describe('AsyncSubject', () => {
subject.subscribe(observer);
expect(observer.results).to.deep.equal([expected]);
});

it('should not allow send complete after error', () => {
const expected = new Error('bad');
const subject = new AsyncSubject();
const observer = new TestObserver();
const subscription = subject.subscribe(observer);

subject.next(1);
expect(observer.results).to.deep.equal([]);

subject.error(expected);
expect(observer.results).to.deep.equal([expected]);

subscription.unsubscribe();

observer.results = [];

subject.complete();
subject.subscribe(observer);
expect(observer.results).to.deep.equal([expected]);
});
});
15 changes: 10 additions & 5 deletions src/AsyncSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ export class AsyncSubject<T> extends Subject<T> {
private hasCompleted: boolean = false;

protected _subscribe(subscriber: Subscriber<any>): Subscription {
if (this.hasCompleted && this.hasNext) {
if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
} else if (this.hasCompleted && this.hasNext) {
subscriber.next(this.value);
subscriber.complete();
return Subscription.EMPTY;
} else if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
}

return super._subscribe(subscriber);
}

Expand All @@ -30,6 +29,12 @@ export class AsyncSubject<T> extends Subject<T> {
}
}

error(error: any): void {
if (!this.hasCompleted) {
super.error(error);
}
}

complete(): void {
this.hasCompleted = true;
if (this.hasNext) {
Expand Down

0 comments on commit c70a09d

Please sign in to comment.