Skip to content

Commit

Permalink
fix(imitate): fix cyclic execution leak, and refactor
Browse files Browse the repository at this point in the history
Rewrite MimicStream with a different approach. Adds a hidden listener field (_hil) to Stream, which
is a special listener which does not trigger start nor stop when added/removed. imitate just adds
the MimicStream as a hidden listener on the target Stream, and MimicStream has its own listeners, it
does not redirect them to the target. This new approach breaks the usage of imitate for
Directed-Acyclic-Graphs, but we anyway intend MimicStream to be used _only_ to implement a cycle in
the stream graph. So this commit removes some tests.

Fixes bug #51, deprecates/invalidates issue #49.
  • Loading branch information
staltz committed Jun 12, 2016
1 parent 625fb3e commit 8a432b6
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 103 deletions.
29 changes: 14 additions & 15 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -928,12 +928,14 @@ export class TakeOperator<T> implements Operator<T, T> {

export class Stream<T> implements InternalListener<T> {
protected _ils: Array<InternalListener<T>>; // 'ils' = Internal listeners
protected _hil: InternalListener<T>; // 'hil' = Hidden Internal Listener
protected _stopID: any = empty;
protected _prod: InternalProducer<T>;

constructor(producer?: InternalProducer<T>) {
this._prod = producer;
this._ils = [];
this._hil = null;
}

_n(t: T): void {
Expand All @@ -943,6 +945,8 @@ export class Stream<T> implements InternalListener<T> {
const b = copy(a);
for (let i = 0; i < L; i++) b[i]._n(t);
}
const h = this._hil;
if (h) h._n(t);
}

_e(err: any): void {
Expand All @@ -952,6 +956,8 @@ export class Stream<T> implements InternalListener<T> {
const b = copy(a);
for (let i = 0; i < L; i++) b[i]._e(err);
}
const h = this._hil;
if (h) h._e(err);
this._x();
}

Expand All @@ -962,6 +968,8 @@ export class Stream<T> implements InternalListener<T> {
const b = copy(a);
for (let i = 0; i < L; i++) b[i]._c();
}
const h = this._hil;
if (h) h._c();
this._x();
}

Expand Down Expand Up @@ -1023,6 +1031,10 @@ export class Stream<T> implements InternalListener<T> {
}
}

_setHIL(il: InternalListener<T>): void {
this._hil = il;
}

private ctor(): typeof Stream {
return this instanceof MemoryStream ? MemoryStream : Stream;
}
Expand Down Expand Up @@ -1679,23 +1691,10 @@ export class Stream<T> implements InternalListener<T> {
}

export class MimicStream<T> extends Stream<T> {
private _target: Stream<T>;
constructor() {
super();
}

_add(il: InternalListener<T>): void {
const t = this._target;
if (!t) return;
t._add(il);
}

_remove(il: InternalListener<T>): void {
const t = this._target;
if (!t) return;
t._remove(il);
}

/**
* This method exists only on a MimicStream, which is created through
* `xs.createMimic()`. *imitate* changes this current MimicStream to behave
Expand Down Expand Up @@ -1730,7 +1729,7 @@ export class MimicStream<T> extends Stream<T> {
* declaration of `first$`. Then, after both `first$` and `second$` are
* defined, we hook `secondMimic$` with `second$` with `imitate()` to tell
* that they are "the same". `imitate` will not trigger the start of any
* stream, it simply forwards listeners of `secondMimic$` to `second$`.
* stream, it just binds `secondMimic$` and `second$` together.
*
* The following is an example where `imitate()` is important in Cycle.js
* applications. A parent component contains some child components. A child
Expand Down Expand Up @@ -1771,7 +1770,7 @@ export class MimicStream<T> extends Stream<T> {
'supports a Stream. Read more about this restriction here: ' +
'https://github.com/staltz/xstream#faq');
}
this._target = other;
other._setHIL(this);
}
}

Expand Down
88 changes: 51 additions & 37 deletions tests/mimicStream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// <reference path="../typings/globals/mocha/index.d.ts" />
/// <reference path="../typings/globals/node/index.d.ts" />
import xs, {Producer, Listener, Stream, MimicStream} from '../src/index';
import xs, {Producer, Listener, Stream, MemoryStream, MimicStream} from '../src/index';
import delay from '../src/extra/delay';
import * as assert from 'assert';

Expand All @@ -13,27 +13,6 @@ describe('MimicStream', () => {
done();
});

it('should imitate the given stream and send events to dependent streams', (done) => {
const stream = xs.periodic(50).take(3);
const proxyStream = xs.createMimic<number>();
const result = proxyStream.map(x => x * 10 + 1);
proxyStream.imitate(stream);

const expected = [1, 11, 21];
setTimeout(() => {
result.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
}, 125);
});

it('should be able to model a circular dependency in the stream graph', (done) => {
const fakeSecond = xs.createMimic<number>();
const first = fakeSecond.map(x => x * 10).take(3);
Expand All @@ -54,11 +33,13 @@ describe('MimicStream', () => {
});

it('should broadcast the source stream to multiple listeners', (done) => {
const source = xs.periodic(100).take(3);
const mimic = xs.createMimic();
mimic.imitate(source);
const expected1 = [0, 1, 2];
const expected2 = [1, 2];
const fakeSecond = xs.createMimic<number>();
const first = fakeSecond.map(x => x * 10).take(3);
const second = first.map(x => x + 1).startWith(1).compose(delay<number>(100));
fakeSecond.imitate(second);

const expected1 = [1, 11, 111, 1111];
const expected2 = [11, 111, 1111];
let completed1 = false;
let completed2 = false;

Expand All @@ -71,7 +52,8 @@ describe('MimicStream', () => {
completed1 = true;
}
};
mimic.addListener(listener1);
fakeSecond.addListener(listener1);
second.addListener({ next: () => { }, error: () => { }, complete: () => { } });

let listener2 = {
next: (x: number) => {
Expand All @@ -83,29 +65,34 @@ describe('MimicStream', () => {
}
};
setTimeout(() => {
mimic.addListener(listener2);
fakeSecond.addListener(listener2);
}, 150);

setTimeout(() => {
mimic.removeListener(listener1);
mimic.removeListener(listener2);
fakeSecond.removeListener(listener1);
fakeSecond.removeListener(listener2);
assert.equal(expected1.length, 0);
assert.equal(expected2.length, 0);
assert.equal(completed1, true);
assert.equal(completed2, true);
done();
}, 400);
}, 600);
});

it('should not create leaked cyclic executions', (done) => {
const expectedMimic = [2, 4, 8, 16];
it('should not cause leaked cyclic executions', (done) => {
const expectedMimic = [2, 4, 8, 16, 32 /* inertia due to stopping on next tick */];
const expectedResult = [2, 4, 8, 16];

const proxy$ = xs.createMimic<number>();
const source$ = proxy$.startWith(1).map(x => x * 2)
.debug(x => {
assert.equal(expectedMimic.length > 0, true);
assert.equal(x, expectedMimic.shift());
try {
assert.equal(expectedMimic.length > 0, true,
'should be expecting the next value ' + x);
assert.equal(x, expectedMimic.shift());
} catch (err) {
done(err);
}
});
const result$ = source$.compose(delay(100)).compose(s => <Stream<number>> s);
proxy$.imitate(result$)
Expand All @@ -116,12 +103,39 @@ describe('MimicStream', () => {
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expectedMimic.length, 0);
assert.equal(expectedMimic.length, 1); // still waiting for 32
assert.equal(expectedResult.length, 0);
setTimeout(() => {
done();
}, 1000);
},
});
});

describe('imitate', () => {
it('should not by itself start the target stream execution', (done) => {
let nextDelivered = false;
const stream = xs.periodic(50).take(3).debug(() => {
nextDelivered = true;
});
const proxyStream = xs.createMimic<number>();

setTimeout(() => {
assert.equal(nextDelivered, false);
done();
}, 125);

proxyStream.imitate(stream);
});

it('should throw an error when given a MemoryStream', (done) => {
const stream = xs.periodic(50).take(3).remember();
assert.strictEqual(stream instanceof MemoryStream, true);
const proxyStream = xs.createMimic<number>();
assert.throws(() => {
proxyStream.imitate(stream);
});
done();
});
});
});
51 changes: 0 additions & 51 deletions tests/operator/imitate.ts

This file was deleted.

0 comments on commit 8a432b6

Please sign in to comment.