Skip to content

Commit

Permalink
perf(combine): apply some perf optimizations to combine
Browse files Browse the repository at this point in the history
Inspired by some ideas in PR #14.
  • Loading branch information
staltz committed May 3, 2016
1 parent 79a2aeb commit ee4ec4c
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 34 deletions.
72 changes: 72 additions & 0 deletions perf/combine.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
var Benchmark = require('benchmark');
var xs = require('../index').default;
var most = require('most');
var rx = require('rx');
var rxjs = require('@reactivex/rxjs')
var kefir = require('kefir');
var bacon = require('baconjs');
var lodash = require('lodash');
var highland = require('highland');

var runners = require('./runners');
var kefirFromArray = runners.kefirFromArray;

// Create a stream from an Array of n integers
// filter out odds, map remaining evens by adding 1, then reduce by summing
var n = runners.getIntArg(500000);
var a = new Array(n);
for(var i = 0; i< a.length; ++i) {
a[i] = i;
}

var suite = Benchmark.Suite('combine(add3) -> filter ' + n + ' x 3 integers');
var options = {
defer: true,
onError: function(e) {
e.currentTarget.failure = e.error;
}
};

function add3(a, b, c) {
return a + b + c;
}

var xs1 = xs.fromArray(a);
var xs2 = xs.fromArray(a);
var xs3 = xs.fromArray(a);

var m1 = most.from(a);
var m2 = most.from(a);
var m3 = most.from(a);

var rx1 = rxjs.Observable.from(a);
var rx2 = rxjs.Observable.from(a);
var rx3 = rxjs.Observable.from(a);

suite
.add('xstream', function(deferred) {
runners.runXStream(deferred,
xs.combine(add3, xs1, xs2, xs3).filter(even));
}, options)
.add('most', function(deferred) {
runners.runMost(deferred,
most.combineArray(add3, [m1, m2, m3]).filter(even).drain());
}, options)
.add('rx 5', function(deferred) {
runners.runRx5(deferred,
rxjs.Observable.combineLatest(rx1, rx2, rx3, add3).filter(even));
}, options);

runners.runSuite(suite);

function add1(x) {
return x + 1;
}

function even(x) {
return x % 2 === 0;
}

function sum(x, y) {
return x + y;
}
60 changes: 26 additions & 34 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,13 @@ class CombineListener<T> implements InternalListener<T> {
}

_n(t: T): void {
const p = this.p;
if (!p.out) return;
const vals = p.vals;
p.hasVal[this.i] = true;
vals[this.i] = t;
if (!p.ready) {
p.up();
}
if (p.ready) {
const p = this.p, out = p.out;
if (!out) return;
if (p.up(t, this.i)) {
try {
p.out._n(invoke(p.project, vals));
out._n(invoke(p.project, p.vals));
} catch (e) {
p.out._e(e);
out._e(e);
}
}
}
Expand All @@ -183,49 +177,47 @@ class CombineListener<T> implements InternalListener<T> {
class CombineProducer<R> implements InternalProducer<R> {
public out: InternalListener<R> = emptyListener;
public ils: Array<CombineListener<any>> = [];
public ready: boolean = false;
public hasVal: Array<boolean>;
public ac: number; // ac is "active count", num of streams still not completed
public left: number; // number of streams that still need to emit a value
public vals: Array<any>;
public ac: number; // ac is activeCount

constructor(public project: CombineProjectFunction,
public streams: Array<Stream<any>>) {
this.hasVal = new Array(streams.length);
this.vals = new Array(streams.length);
this.ac = streams.length;
const n = this.ac = this.left = streams.length;
const vals = this.vals = new Array(n);
for (let i = 0; i < n; i++) {
vals[i] = empty;
}
}

up(): void {
for (let i = this.hasVal.length - 1; i >= 0; i--) {
if (!this.hasVal[i]) {
return;
}
}
this.ready = true;
up(t: any, i: number): boolean {
const v = this.vals[i];
const left = !this.left ? 0 : v === empty ? --this.left : this.left;
this.vals[i] = t;
return left === 0;
}

_start(out: InternalListener<R>): void {
this.out = out;
const s = this.streams;
const L = s.length;
if (L == 0) this.zero(out); else {
for (let i = 0; i < L; i++) {
const n = s.length;
if (n === 0) this.zero(out); else {
for (let i = 0; i < n; i++) {
s[i]._add(new CombineListener(i, this));
}
}
}

_stop(): void {
const streams = this.streams;
for (let i = streams.length - 1; i >= 0; i--) {
streams[i]._remove(this.ils[i]);
const s = this.streams;
const n = this.ac = this.left = s.length;
const vals = this.vals = new Array(n);
for (let i = 0; i < n; i++) {
s[i]._remove(this.ils[i]);
vals[i] = empty;
}
this.out = null;
this.ils = [];
this.ready = false;
this.hasVal = new Array(streams.length);
this.vals = new Array(streams.length);
this.ac = streams.length;
}

zero(out: InternalListener<R>): void {
Expand Down

0 comments on commit ee4ec4c

Please sign in to comment.