Skip to content

Commit

Permalink
fix(map): remove map+map fusion optimization
Browse files Browse the repository at this point in the history
This fusion puts two consecutive map operators together, disguised as one map operation. This fusion
caused undesired issues such as #98, #108, #93. It works in most.js because that one is
unicast-first, but map+map fusion does not work well with multicast-only, like in xstream.

BREAKING CHANGE:
This change will remove map+map fusions. Your application code may or
may not rely on the bugs that map+map fusion caused, so we advise to
update carefully, testing your application code as you go. Generally
this is very straightforward and safe to update, as there are no visible
API changes.

Closes #98, #108, #93.
  • Loading branch information
staltz committed Oct 24, 2016
1 parent 47e67ff commit 1ca6a5c
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 24 deletions.
25 changes: 1 addition & 24 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,6 @@ function internalizeProducer<T>(producer: Producer<T>) {
(producer as InternalProducer<T> & Producer<T>)._stop = producer.stop;
}

function compose2<T, U>(f1: (t: T) => any, f2: (t: T) => any): (t: T) => any {
return function composedFn(arg: T): any {
return f1(f2(arg));
};
}

function and<T>(f1: (t: T) => boolean, f2: (t: T) => boolean): (t: T) => boolean {
return function andFn(t: T): boolean {
return f1(t) && f2(t);
Expand Down Expand Up @@ -1641,24 +1635,7 @@ export class Stream<T> implements InternalListener<T> {
const p = this._prod;
const ctor = this.ctor();
if (p instanceof FilterOperator) {
return new ctor<U>(new FilterMapOperator<T, U>(
(p as FilterOperator<T>).passes,
project,
(p as FilterOperator<T>).ins
));
}
if (p instanceof FilterMapOperator) {
return new ctor<U>(new FilterMapOperator<T, U>(
(p as FilterMapOperator<T, U>).passes,
compose2(project, (p as FilterMapOperator<T, U>).project),
(p as FilterMapOperator<T, U>).ins
));
}
if (p instanceof MapOperator) {
return new ctor<U>(new MapOperator<T, U>(
compose2(project, (p as MapOperator<T, U>).project),
(p as MapOperator<T, U>).ins
));
return new ctor<U>(new FilterMapOperator<T, U>(p.passes, project, p.ins));
}
return new ctor<U>(new MapOperator<T, U>(project, this));
}
Expand Down
64 changes: 64 additions & 0 deletions tests/operator/map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,68 @@ describe('Stream.prototype.map', () => {
assert.strictEqual(JSON.stringify(stream['_prod']['out']), '{}');
done();
});

it('should not repeat the map project function (e.g. no map+map fusion)', (done) => {
const stream = xs.create();
let firstMapCalled = 0;

const source$ = stream.map(x => {
firstMapCalled += 1;
return {a: 10, b: 20, c: 30};
});

const a$ = source$.map(x => x.a);
const b$ = source$.map(x => x.b);
const c$ = source$.map(x => x.c);

const expectedA = [10];
const expectedB = [20];
const expectedC = [30];

a$.addListener({next: a => assert.strictEqual(a, expectedA.shift())});
b$.addListener({next: b => assert.strictEqual(b, expectedB.shift())});
c$.addListener({next: c => assert.strictEqual(c, expectedC.shift())});

stream.shamefullySendNext(1);

setTimeout(() => {
assert.strictEqual(firstMapCalled, 1);
assert.strictEqual(expectedA.length, 0);
assert.strictEqual(expectedB.length, 0);
assert.strictEqual(expectedC.length, 0);
done();
});
});

it('should not repeat the map project function (e.g. no filter+map+map fusion)', (done) => {
const stream = xs.create();
let firstMapCalled = 0;

const source$ = stream.filter(x => x !== 42).map(x => {
firstMapCalled += 1;
return {a: 10, b: 20, c: 30};
});

const a$ = source$.map(x => x.a);
const b$ = source$.map(x => x.b);
const c$ = source$.map(x => x.c);

const expectedA = [10];
const expectedB = [20];
const expectedC = [30];

a$.addListener({next: a => assert.strictEqual(a, expectedA.shift())});
b$.addListener({next: b => assert.strictEqual(b, expectedB.shift())});
c$.addListener({next: c => assert.strictEqual(c, expectedC.shift())});

stream.shamefullySendNext(1);

setTimeout(() => {
assert.strictEqual(firstMapCalled, 1);
assert.strictEqual(expectedA.length, 0);
assert.strictEqual(expectedB.length, 0);
assert.strictEqual(expectedC.length, 0);
done();
});
});
});

0 comments on commit 1ca6a5c

Please sign in to comment.