Skip to content

Commit

Permalink
fix(combine): guard CombineListener against invalid out stream
Browse files Browse the repository at this point in the history
The out of the CombineProducer is set to null in the producer stop() method, and after that the
CombineListener may get a null pointer exception, in corner cases (see test).
  • Loading branch information
staltz committed May 2, 2016
1 parent 28afee9 commit 74c6061
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 22 deletions.
46 changes: 25 additions & 21 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,51 +143,55 @@ export interface CombineInstanceSignature<T> {

class CombineListener<T> implements InternalListener<T> {
constructor(private i: number,
private prod: CombineProducer<T>) {
prod.proxies.push(this);
private p: CombineProducer<T>) {
p.ils.push(this);
}

_n(t: T): void {
const prod = this.prod;
const vals = prod.vals;
prod.hasVal[this.i] = true;
const p = this.p;
if (!p.out) return;
const vals = p.vals;
p.hasVal[this.i] = true;
vals[this.i] = t;
if (!prod.ready) {
prod.up();
if (!p.ready) {
p.up();
}
if (prod.ready) {
if (p.ready) {
try {
prod.out._n(invoke(prod.project, vals));
p.out._n(invoke(p.project, vals));
} catch (e) {
prod.out._e(e);
p.out._e(e);
}
}
}

_e(err: any): void {
this.prod.out._e(err);
const out = this.p.out;
if (!out) return;
out._e(err);
}

_c(): void {
const prod = this.prod;
if (--prod.ac === 0) {
prod.out._c();
const p = this.p;
if (!p.out) return;
if (--p.ac === 0) {
p.out._c();
}
}
}

class CombineProducer<R> implements InternalProducer<R> {
public out: InternalListener<R> = emptyListener;
public ac: number; // ac is activeCount
public proxies: Array<CombineListener<any>> = [];
public ils: Array<CombineListener<any>> = [];
public ready: boolean = false;
public hasVal: Array<boolean>;
public vals: Array<any>;
public ac: number; // ac is activeCount

constructor(public project: CombineProjectFunction,
public streams: Array<Stream<any>>) {
this.vals = new Array(streams.length);
this.hasVal = new Array(streams.length);
this.vals = new Array(streams.length);
this.ac = streams.length;
}

Expand All @@ -211,14 +215,14 @@ class CombineProducer<R> implements InternalProducer<R> {
_stop(): void {
const streams = this.streams;
for (let i = streams.length - 1; i >= 0; i--) {
streams[i]._remove(this.proxies[i]);
streams[i]._remove(this.ils[i]);
}
this.out = null;
this.ac = streams.length;
this.proxies = [];
this.ils = [];
this.ready = false;
this.vals = new Array(streams.length);
this.hasVal = new Array(streams.length);
this.vals = new Array(streams.length);
this.ac = streams.length;
}
}

Expand Down
58 changes: 57 additions & 1 deletion tests/factory/combine.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import xs from '../../src/index';
import xs, {Stream} from '../../src/index';
import * as assert from 'assert';

describe('xs.combine', () => {
Expand Down Expand Up @@ -72,4 +72,60 @@ describe('xs.combine', () => {
},
});
});

it('should not break future listeners when CombineProducer tears down', (done) => {
// --0--1-2--| innerA
// ---0---1--| innerB
// ----0----1-2--| outer
const innerA = xs.create<number>();
const innerB = xs.create<number>();
const outer = xs.create<number>();
const arrayInners: Array<Stream<number>> = [];
const stream = outer
.map(x => {
return xs.combine(
(...args: Array<number>) => '' + x + args.join(''),
...arrayInners
);
})
.flatten();
const expected = ['00'];

setTimeout(() => {
arrayInners.push(innerA);
outer.shamefullySendNext(0);
}, 100);
setTimeout(() => {
innerA.shamefullySendNext(0)
}, 150);
setTimeout(() => {
innerB.shamefullySendNext(0)
}, 175);
setTimeout(() => {
arrayInners.push(innerB);
outer.shamefullySendNext(1);
innerA.shamefullySendNext(1);
}, 200);
setTimeout(() => {
innerA.shamefullySendNext(2);
outer.shamefullySendNext(2);
innerB.shamefullySendNext(1)
}, 250);
setTimeout(() => {
innerA.shamefullySendComplete();
innerB.shamefullySendComplete();
outer.shamefullySendComplete();
}, 550);

stream.addListener({
next: (x: string) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});
});

0 comments on commit 74c6061

Please sign in to comment.