Skip to content

Commit 9caae7c

Browse files
committed
fix(SubjectSource): allow source to be run after being disposed
A source may be run and disposed multiple times in its lifetime, but BasicSubjectSource refuses to relay events after it is run, disposed, and run again. This is a fix for that issue. A most stream may be observed anew after ended by completion or error, so this update allows the same. In addition, a subject created by holdSubject() does not begin buffering events until it has an observer.
1 parent ee37933 commit 9caae7c

File tree

3 files changed

+58
-5
lines changed

3 files changed

+58
-5
lines changed

src/SubjectSource.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@ export interface SubjectSource<T> extends Source<T> {
1414
export class BasicSubjectSource<T> implements SubjectSource<T> {
1515
protected scheduler: Scheduler = defaultScheduler;
1616
protected sinks: Sink<T>[] = [];
17-
protected active: boolean = true;
17+
protected active: boolean = false;
1818

1919
run (sink: Sink<T>, scheduler: Scheduler): Disposable<T> {
2020
const n = this.add(sink);
21-
if (n === 1) this.scheduler = scheduler;
21+
if (n === 1) {
22+
this.scheduler = scheduler;
23+
this.active = true;
24+
}
2225
return new SubjectDisposable<T>(this, sink);
2326
}
2427

@@ -48,14 +51,14 @@ export class BasicSubjectSource<T> implements SubjectSource<T> {
4851
error (err: Error): void {
4952
if (!this.active || this.scheduler === void 0) return;
5053

51-
this.active = false;
54+
this._dispose();
5255
this._error(this.scheduler.now(), err);
5356
}
5457

5558
complete (value?: T): void {
5659
if (!this.active || this.scheduler === void 0) return;
5760

58-
this.active = false;
61+
this._dispose();
5962
this._complete(this.scheduler.now(), value);
6063
}
6164

test/holdSubject.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ describe('holdSubject', () => {
2525
it('should allow for adjusting bufferSize of stream', () => {
2626
const stream = holdSubject(3)
2727

28+
// Add an observer so the stream begins buffering events
29+
stream.observe(() => {})
30+
2831
stream.next(1)
2932
stream.next(2)
3033
stream.next(3)

test/subject.js

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ describe('subject()', () => {
7575
stream.complete()
7676
})
7777

78-
it('should not allow events after end', done => {
78+
it('should not notify existing observers after end', done => {
7979
const stream = subject()
8080

8181
stream
@@ -87,6 +87,53 @@ describe('subject()', () => {
8787
stream.next(1)
8888
})
8989

90+
it('should not notify existing observers after error', done => {
91+
const stream = subject()
92+
93+
stream
94+
.forEach(assert.fail)
95+
.then(done, () => done())
96+
.catch(assert.fail)
97+
98+
stream.error(new Error())
99+
stream.next(1)
100+
})
101+
102+
it('should allow new observers after end', done => {
103+
const stream = subject()
104+
105+
stream.complete()
106+
107+
stream
108+
.reduce((x, y) => x.concat(y), [])
109+
.then(x => assert.deepEqual(x, [1, 2, 3]))
110+
.then(done)
111+
112+
stream.next(1)
113+
stream.next(2)
114+
stream.next(3)
115+
116+
stream.complete()
117+
})
118+
119+
120+
it('should allow new observers after error', done => {
121+
const stream = subject()
122+
123+
stream.error(new Error())
124+
125+
stream
126+
.reduce((x, y) => x.concat(y), [])
127+
.then(x => assert.deepEqual(x, [1, 2, 3]))
128+
.then(done)
129+
130+
stream.next(1)
131+
stream.next(2)
132+
stream.next(3)
133+
134+
stream.complete()
135+
})
136+
90137
it('should support transient use as a signal stream', done => {
91138
const stream = subject()
92139
const signalStream = subject()

0 commit comments

Comments
 (0)