Skip to content

Commit

Permalink
fix(Stream): stop the producer syncly after the Stream errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Apr 6, 2016
1 parent faba7bf commit 6c803ac
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ export class Stream<T> implements InternalListener<T> {
this._ils[i]._e(err);
}
}
if (this._prod) this._prod._stop();
this._ils = [];
}

_c(): void {
Expand Down
53 changes: 53 additions & 0 deletions tests/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,57 @@ describe('Stream', () => {
assert.equal(expected2.length, 0);
done();
});

it('should synchronously stop producer when error thrown', (done) => {
let on = false;
const stream = xs.create({
start: (listener) => {
on = true;
listener.next(10);
listener.next(20);
listener.next(30);
listener.error('oops');
},
stop: () => {
on = false;
},
});
const expected1 = [10, 20, 30];
const expected2 = [10, 20, 30];

stream.addListener({
next: (x: number) => {
assert.equal(on, true);
assert.equal(x, expected1.shift());
},
error: (err: any) => {
assert.equal(err, 'oops');
assert.equal(on, true);
assert.equal(expected1.length, 0);
},
complete: () => {
done('complete should not be called');
},
});
assert.equal(on, false);
assert.equal(expected1.length, 0);

stream.addListener({
next: (x: number) => {
assert.equal(on, true);
assert.equal(x, expected2.shift());
},
error: (err: any) => {
assert.equal(err, 'oops');
assert.equal(on, true);
assert.equal(expected2.length, 0);
},
complete: () => {
done('complete should not be called');
},
});
assert.equal(on, false);
assert.equal(expected2.length, 0);
done();
});
});

0 comments on commit 6c803ac

Please sign in to comment.