Skip to content

Commit

Permalink
fix(imitate): fix against cyclic propagation of errors
Browse files Browse the repository at this point in the history
See the new test for imitate()
  • Loading branch information
staltz committed Jun 14, 2016
1 parent 9cf6e52 commit 1aa0549
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -911,11 +911,13 @@ export class Stream<T> implements InternalListener<T> {
protected _stopID: any = empty;
protected _prod: InternalProducer<T>;
protected _target: Stream<T>; // imitation target if this Stream will imitate
protected _err: any;

constructor(producer?: InternalProducer<T>) {
this._prod = producer;
this._ils = [];
this._target = null;
this._err = null;
}

_n(t: T): void {
Expand All @@ -928,6 +930,8 @@ export class Stream<T> implements InternalListener<T> {
}

_e(err: any): void {
if (this._err) return;
this._err = err;
const a = this._ils;
const L = a.length;
if (L == 1) a[0]._e(err); else {
Expand All @@ -950,6 +954,7 @@ export class Stream<T> implements InternalListener<T> {
_x(): void { // tear down logic, after error or complete
if (this._ils.length === 0) return;
if (this._prod) this._prod._stop();
this._err = null;
this._ils = [];
}

Expand Down Expand Up @@ -1004,6 +1009,7 @@ export class Stream<T> implements InternalListener<T> {
a.splice(i, 1);
const p = this._prod;
if (p && a.length <= 0) {
this._err = null;
this._stopID = setTimeout(() => p._stop());
} else if (a.length === 1) {
this._pruneCycles();
Expand Down
32 changes: 32 additions & 0 deletions tests/operator/imitate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,38 @@ describe('Stream.prototype.imitate', () => {
});
});

it('should not propagate errors in a cycle', (done) => {
const proxyAction$ = xs.create<number>();
const state$ = proxyAction$.fold((state, action) => state + action, 0);
const action$ = state$.map(state => {
if (state === 3) {
throw new Error(':(');
}
return xs.of(1).compose(delay<number>(20));
}).flatten();
proxyAction$.imitate(action$);
const expected = [0, 1, 2];

let errors: Array<any> = [];
state$.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => {
errors.push(err);
},
complete: () => {
done('complete should not be called');
},
});

setTimeout(() => {
assert.equal(errors.length, 1);
assert.equal(expected.length, 0);
done();
}, 150);
});

it('should not by itself start the target stream execution', (done) => {
let nextDelivered = false;
const stream = xs.periodic(50).take(3).debug(() => {
Expand Down

0 comments on commit 1aa0549

Please sign in to comment.