Skip to content

Commit

Permalink
fix(map): remove operator fusion to avoid bugs
Browse files Browse the repository at this point in the history
Operator fusion such as filter+map or map+flatten has been causing bugs to users, such as #165 and
#178. Since this library's focus is not "as high performance as possible", we need to prioritize
correctness and library size. Removing operator fusion should solve the mentioned issues besides
others not reported.

#165 and #178
  • Loading branch information
staltz committed Apr 3, 2017
1 parent 8f2503a commit f16e6a9
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 156 deletions.
2 changes: 1 addition & 1 deletion perf/filter-map-fusion.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ for(var i = 0; i< a.length; ++i) {
a[i] = i;
}

var suite = Benchmark.Suite('filter -> map -> reduce ' + n + ' integers');
var suite = Benchmark.Suite('filter -> map -> fusion ' + n + ' integers');
var options = {
defer: true,
onError: function(e) {
Expand Down
120 changes: 3 additions & 117 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -884,94 +884,6 @@ class Last<T> implements Operator<T, T> {
}
}

class MapFlattenListener<R> implements InternalListener<R> {
private out: Stream<R>;
private op: MapFlatten<any, R>;

constructor(out: Stream<R>, op: MapFlatten<any, R>) {
this.out = out;
this.op = op;
}

_n(r: R) {
this.out._n(r);
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.op.inner = NO as Stream<R>;
this.op.less();
}
}

class MapFlatten<T, R> implements Operator<T, R> {
public type: string;
public ins: Stream<T>;
public out: Stream<R>;
public mapOp: MapOp<T, Stream<R>>;
public inner: Stream<R>; // Current inner Stream
private il: InternalListener<R>; // Current inner InternalListener
private open: boolean;

constructor(mapOp: MapOp<T, Stream<R>>) {
this.type = `${mapOp.type}+flatten`;
this.ins = mapOp.ins;
this.out = NO as Stream<R>;
this.mapOp = mapOp;
this.inner = NO as Stream<R>;
this.il = NO_IL;
this.open = true;
}

_start(out: Stream<R>): void {
this.out = out;
this.inner = NO as Stream<R>;
this.il = NO_IL;
this.open = true;
this.mapOp.ins._add(this);
}

_stop(): void {
this.mapOp.ins._remove(this);
if (this.inner !== NO) this.inner._remove(this.il);
this.out = NO as Stream<R>;
this.inner = NO as Stream<R>;
this.il = NO_IL;
}

less(): void {
if (!this.open && this.inner === NO) {
const u = this.out;
if (u === NO) return;
u._c();
}
}

_n(v: T) {
const u = this.out;
if (u === NO) return;
const {inner, il} = this;
const s = _try(this.mapOp, v, u);
if (s === NO) return;
if (inner !== NO && il !== NO_IL) inner._remove(il);
(this.inner = s as Stream<R>)._add(this.il = new MapFlattenListener(u, this));
}

_e(err: any) {
const u = this.out;
if (u === NO) return;
u._e(err);
}

_c() {
this.open = false;
this.less();
}
}

class MapOp<T, R> implements Operator<T, R> {
public type = 'map';
public ins: Stream<T>;
Expand Down Expand Up @@ -1015,25 +927,6 @@ class MapOp<T, R> implements Operator<T, R> {
}
}

class FilterMapFusion<T, R> extends MapOp<T, R> {
public type = 'filter+map';
public passes: (t: T) => boolean;

constructor(passes: (t: T) => boolean, project: (t: T) => R, ins: Stream<T>) {
super(project, ins);
this.passes = passes;
}

_n(t: T) {
if (!this.passes(t)) return;
const u = this.out;
if (u === NO) return;
const r = _try(this, t, u);
if (r === NO) return;
u._n(r as R);
}
}

class Remember<T> implements InternalProducer<T> {
public type = 'remember';
public ins: Stream<T>;
Expand Down Expand Up @@ -1619,10 +1512,7 @@ export class Stream<T> implements InternalListener<T> {
} as CombineSignature;

protected _map<U>(project: (t: T) => U): Stream<U> | MemoryStream<U> {
const p = this._prod;
const ctor = this.ctor();
if (p instanceof Filter) return new ctor<U>(new FilterMapFusion<T, U>(p.f, project, p.ins));
return new ctor<U>(new MapOp<T, U>(project, this));
return new (this.ctor())<U>(new MapOp<T, U>(project, this));
}

/**
Expand Down Expand Up @@ -1665,7 +1555,7 @@ export class Stream<T> implements InternalListener<T> {
mapTo<U>(projectedValue: U): Stream<U> {
const s = this.map(() => projectedValue);
const op: Operator<T, U> = s._prod as Operator<T, U>;
op.type = op.type.replace('map', 'mapTo');
op.type = 'mapTo';
return s;
}

Expand Down Expand Up @@ -1890,11 +1780,7 @@ export class Stream<T> implements InternalListener<T> {
*/
flatten<R>(this: Stream<Stream<R>>): T {
const p = this._prod;
return new Stream<R>(
p instanceof MapOp && !(p instanceof FilterMapFusion) ?
new MapFlatten(p as MapOp<any, Stream<R>>) :
new Flatten(this)
) as T & Stream<R>;
return new Stream<R>(new Flatten(this)) as T & Stream<R>;
}

/**
Expand Down
20 changes: 0 additions & 20 deletions tests/operator/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,6 @@ describe('Stream.prototype.filter', () => {
});
});

it('should should have filter+map fusion metadata', (done: any) => {
const isEven = (x: number) => x % 2 === 0;
const stream = xs.of(1, 2, 3, 4, 5, 6, 7, 8)
.filter(isEven)
.map(x => 10 * x);

assert.strictEqual(stream['_prod']['type'], 'filter+map');
done();
});

it('should should have filter+mapTo fusion metadata', (done: any) => {
const isEven = (x: number) => x % 2 === 0;
const stream = xs.of(1, 2, 3, 4, 5, 6, 7, 8)
.filter(isEven)
.mapTo(10);

assert.strictEqual(stream['_prod']['type'], 'filter+mapTo');
done();
});

it('should call functions in correct order for filter+filter fusion', (done: any) => {
const object$ = xs.of<any>(
{ foo: { a: 10 } },
Expand Down
19 changes: 1 addition & 18 deletions tests/operator/flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,6 @@ describe('Stream.prototype.flatten', () => {
});
});

it('should have an ins field as metadata', (done: any) => {
const source: Stream<number> = xs.periodic(100).take(3);
const stream: Stream<number> = source
.map((i: number) => xs.of(1 + i, 2 + i, 3 + i))
.flatten();
assert.strictEqual(stream['_prod']['ins'], source);
done();
});

it('should return a flat stream with correct TypeScript types', (done: any) => {
const streamStrings: Stream<string> = Stream.create({
start: (listener: Listener<string>) => {},
Expand Down Expand Up @@ -285,7 +276,7 @@ describe('Stream.prototype.flatten', () => {
});
});

describe('with filter+map fusion', () => {
describe('with filter and map', () => {
it('should execute the predicate, the projection, and the flattening', (done: any) => {
let predicateCallCount = 0;
let projectCallCount = 0;
Expand Down Expand Up @@ -321,14 +312,6 @@ describe('Stream.prototype.flatten', () => {
});

describe('with mapTo', () => {
it('should have the correct \'type\' metadata on the operator producer', (done: any) => {
const source: Stream<Stream<number>> = xs.periodic(100).take(3)
.mapTo(xs.of(1, 2, 3));
const stream: Stream<number> = source.flatten();
assert.strictEqual(stream['_prod']['type'], 'mapTo+flatten');
done();
});

it('should not restart inner stream if switching to the same inner stream', (done: any) => {
const outer = fromDiagram('-A---------B----------C--------|');
const nums = fromDiagram( '-a-b-c-----------------------|', {
Expand Down

0 comments on commit f16e6a9

Please sign in to comment.