Skip to content

Commit

Permalink
fix(flatten): when same inner stream, restart
Browse files Browse the repository at this point in the history
Fix flatten operator so that if the new inner stream is the same as the previous inner stream, it
synchronously stops the inner stream before subscribing to it, which will cause a synchronous
(re)start. This is important to maintain referential transparency when the inner may be defined
either from the closure or internally in the map project function. Both cases should be the same.

Closes issue #90.
  • Loading branch information
staltz committed Jul 23, 2016
1 parent 1ccc232 commit 819bc94
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 11 deletions.
25 changes: 14 additions & 11 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ export class FlattenOperator<T> implements Operator<Stream<T>, T> {
const u = this.out;
if (u === NO) return;
const {inner, il} = this;
if (s === inner && s._prod !== NO) s._stopNow();
if (inner !== NO && il !== NO_IL) inner._remove(il);
(this.inner = s)._add(this.il = new FlattenListener(u, this));
}
Expand Down Expand Up @@ -832,14 +833,16 @@ export class MapFlattenOperator<T, R> implements Operator<T, R> {
const u = this.out;
if (u === NO) return;
const {inner, il} = this;
if (inner !== NO && il !== NO_IL) inner._remove(il);
let s: Stream<R>;
try {
(this.inner = this.mapOp.project(v))._add(
this.il = new MapFlattenInner(u, this)
);
s = this.mapOp.project(v);
} catch (e) {
u._e(e);
return;
}
if (s === inner && s._prod !== NO) s._stopNow();
if (inner !== NO && il !== NO_IL) inner._remove(il);
(this.inner = s)._add(this.il = new MapFlattenInner(u, this));
}

_e(err: any) {
Expand Down Expand Up @@ -1056,9 +1059,9 @@ export class TakeOperator<T> implements Operator<T, T> {
}

export class Stream<T> implements InternalListener<T> {
public _prod: InternalProducer<T>;
protected _ils: Array<InternalListener<T>>; // 'ils' = Internal listeners
protected _stopID: any;
protected _prod: InternalProducer<T>;
protected _target: Stream<T>; // imitation target if this Stream will imitate
protected _err: any;

Expand Down Expand Up @@ -1108,9 +1111,9 @@ export class Stream<T> implements InternalListener<T> {
this._ils = [];
}

_lateStop() {
// this._prod is not null, because this _lateStop is called from _remove
// where we already checked that this._prod is truthy
_stopNow() {
// WARNING: code that calls this method should
// first check if this._prod is valid (not `NO`)
this._prod._stop();
this._err = NO;
this._stopID = NO;
Expand Down Expand Up @@ -1140,7 +1143,7 @@ export class Stream<T> implements InternalListener<T> {
a.splice(i, 1);
if (this._prod !== NO && a.length <= 0) {
this._err = NO;
this._stopID = setTimeout(() => this._lateStop());
this._stopID = setTimeout(() => this._stopNow());
} else if (a.length === 1) {
this._pruneCycles();
}
Expand Down Expand Up @@ -1931,9 +1934,9 @@ export class MemoryStream<T> extends Stream<T> {
super._add(il);
}

_lateStop() {
_stopNow() {
this._has = false;
super._lateStop();
super._stopNow();
}

_x(): void {
Expand Down
71 changes: 71 additions & 0 deletions tests/operator/flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@ import fromDiagram from '../../src/extra/fromDiagram';
import * as assert from 'assert';

describe('Stream.prototype.flatten', () => {
describe('with map+debug to break the fusion', () => {
it('should restart inner stream if switching to the same inner stream', (done) => {
const outer = fromDiagram('-A---------B----------C--------|');
const nums = fromDiagram( '-a-b-c-----------------------|', {
values: {a: 1, b: 2, c: 3}
});
const inner = nums.fold((acc, x) => acc + x, 0);

const stream = outer.map(() => inner).debug(() => { }).flatten();

const expected = [0, 1, 3, 6, 0, 1, 3, 6, 0, 1, 3, 6];

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
}
});
});
});

describe('with map', () => {
it('should expand each periodic event with 3 sync events', (done) => {
const source: Stream<Stream<number>> = xs.periodic(100).take(3)
Expand Down Expand Up @@ -215,6 +240,29 @@ describe('Stream.prototype.flatten', () => {
}
});
});

it('should restart inner stream if switching to the same inner stream', (done) => {
const outer = fromDiagram('-A---------B----------C--------|');
const nums = fromDiagram( '-a-b-c-----------------------|', {
values: {a: 1, b: 2, c: 3}
});
const inner = nums.fold((acc, x) => acc + x, 0);

const stream = outer.map(() => inner).flatten();

const expected = [0, 1, 3, 6, 0, 1, 3, 6, 0, 1, 3, 6];

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
}
});
});
});

describe('with filter+map fusion', () => {
Expand Down Expand Up @@ -260,5 +308,28 @@ describe('Stream.prototype.flatten', () => {
assert.strictEqual(stream['_prod']['type'], 'mapTo+flatten');
done();
});

it('should restart inner stream if switching to the same inner stream', (done) => {
const outer = fromDiagram('-A---------B----------C--------|');
const nums = fromDiagram( '-a-b-c-----------------------|', {
values: {a: 1, b: 2, c: 3}
});
const inner = nums.fold((acc, x) => acc + x, 0);

const stream = outer.mapTo(inner).flatten();

const expected = [0, 1, 3, 6, 0, 1, 3, 6, 0, 1, 3, 6];

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
}
});
});
});
});

0 comments on commit 819bc94

Please sign in to comment.