Skip to content

Commit

Permalink
feat(operator): implement combine(), both static and instance
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Mar 6, 2016
1 parent 7498032 commit f65a6a3
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 10 deletions.
27 changes: 26 additions & 1 deletion src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@ import {DebugProducer} from './operator/DebugProducer';
import {FoldProducer} from './operator/FoldProducer';
import {LastProducer} from './operator/LastProducer';
import {RememberProducer} from './operator/RememberProducer';
import merge from './factory/merge';
import {
CombineProducer,
InstanceCombineSignature,
ProjectFunction} from './operator/CombineProducer';
import {empty} from './utils/empty';
import {combine, FactoryCombineSignature} from './factory/combine';
import merge from './factory/merge';
import from from './factory/from';
import interval from './factory/interval';

export class Stream<T> implements Observer<T> {
public _observers: Array<Observer<T>>;
Expand Down Expand Up @@ -81,6 +88,11 @@ export class Stream<T> implements Observer<T> {
}
}

static from: typeof from;
static combine: FactoryCombineSignature;
static merge: typeof merge;
static interval: typeof interval;

map<U>(project: (t: T) => U): Stream<U> {
return new Stream<U>(new MapProducer(project, this));
}
Expand Down Expand Up @@ -116,4 +128,17 @@ export class Stream<T> implements Observer<T> {
merge(other: Stream<T>): Stream<T> {
return merge(this, other);
}

combine: InstanceCombineSignature<T>;
}

Stream.from = from;
Stream.merge = merge;
Stream.combine = combine;
Stream.interval = interval;

Stream.prototype.combine = function <R>(project: ProjectFunction,
...streams: Array<Stream<any>>) {
streams.unshift(this);
return new Stream<R>(new CombineProducer<R>(project, streams));
};
29 changes: 29 additions & 0 deletions src/factory/combine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {Stream} from '../Stream';
import {ProjectFunction, CombineProducer} from '../operator/CombineProducer';

export interface FactoryCombineSignature {
<T1, T2, R>(
project: (t1: T1, t2: T2) => R,
stream2: Stream<T2>): Stream<R>;
<T1, T2, T3, R>(
project: (t1: T1, t2: T2, t3: T3) => R,
stream2: Stream<T2>,
stream3: Stream<T3>): Stream<R>;
<T1, T2, T3, T4, R>(
project: (t1: T1, t2: T2, t3: T3, t4: T4) => R,
stream2: Stream<T2>,
stream3: Stream<T3>,
stream4: Stream<T4>): Stream<R>;
<T1, T2, T3, T4, T5, R>(
project: (t1: T1, t2: T2, t3: T3, t4: T4, t5: T5) => R,
stream2: Stream<T2>,
stream3: Stream<T3>,
stream4: Stream<T4>,
stream5: Stream<T5>): Stream<R>;
<R>(project: (...args: Array<any>) => R, ...streams: Array<Stream<any>>): Stream<R>;
}

export function combine<R>(project: ProjectFunction,
...streams: Array<Stream<any>>) {
return new Stream<R>(new CombineProducer<R>(project, streams));
};
10 changes: 1 addition & 9 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
import {Stream} from './Stream';
import {Producer} from './Producer';
import {Observer} from './Observer';
import from from './factory/from';
import merge from './factory/merge';
import interval from './factory/interval';

export {
Producer,
Observer,
Stream,
};

export default {
Stream,
from,
merge,
interval,
};
export default Stream;
100 changes: 100 additions & 0 deletions src/operator/CombineProducer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import {Observer} from '../Observer';
import {Producer} from '../Producer';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';
import {invoke} from '../utils/invoke';

export class Proxy<T> implements Observer<T> {
constructor(public i: number, public prod: CombineProducer<T>) {
prod.proxies.push(this);
}

next(t: T): void {
const prod = this.prod;
prod.hasVal[this.i] = true;
prod.vals[this.i] = t;
if (!prod.ready) {
prod.up();
}
if (prod.ready) {
prod.out.next(invoke(prod.project, prod.vals));
}
}

error(err: any): void {
this.prod.out.error(err);
}

end(): void {
this.prod.out.end();
}
}

export interface ProjectFunction {
<T1, T2, R>(v1: T1, v2: T2): R;
<T1, T2, T3, R>(v1: T1, v2: T2, v3: T3): R;
<T1, T2, T3, T4, R>(v1: T1, v2: T2, v3: T3, v4: T4): R;
<T1, T2, T3, T4, T5, R>(v1: T1, v2: T2, v3: T3, v4: T4, v5: T5): R;
<T1, T2, T3, T4, T5, T6, R>(v1: T1, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6): R;
<R>(...values: Array<any>): R;
}

export interface InstanceCombineSignature<T> {
<T2, R>(
project: (t1: T, t2: T2) => R,
stream2: Stream<T2>): Stream<R>;
<T2, T3, R>(
project: (t1: T, t2: T2, t3: T3) => R,
stream2: Stream<T2>,
stream3: Stream<T3>): Stream<R>;
<T2, T3, T4, R>(
project: (t1: T, t2: T2, t3: T3, t4: T4) => R,
stream2: Stream<T2>,
stream3: Stream<T3>,
stream4: Stream<T4>): Stream<R>;
<T2, T3, T4, T5, R>(
project: (t1: T, t2: T2, t3: T3, t4: T4, t5: T5) => R,
stream2: Stream<T2>,
stream3: Stream<T3>,
stream4: Stream<T4>,
stream5: Stream<T5>): Stream<R>;
<R>(project: (...args: Array<any>) => R, ...streams: Array<Stream<any>>): Stream<R>;
}

export class CombineProducer<R> implements Producer<R> {
public out: Observer<R> = emptyObserver;
public proxies: Array<Proxy<any>> = [];
public ready: boolean = false;
public hasVal: Array<boolean>;
public vals: Array<any>;
public streams: Array<Stream<any>>;

constructor(public project: ProjectFunction, streams: Array<Stream<any>>) {
this.streams = streams;
this.vals = new Array(streams.length);
this.hasVal = new Array(streams.length);
}

up(): void {
for (let i = this.hasVal.length - 1; i >= 0; i--) {
if (!this.hasVal[i]) {
return;
}
}
this.ready = true;
}

start(out: Observer<R>): void {
this.out = out;
for (let i = this.streams.length - 1; i >= 0; i--) {
this.streams[i].subscribe(new Proxy(i, this));
}
}

stop(): void {
for (let i = this.streams.length - 1; i >= 0; i--) {
this.streams[i].unsubscribe(this.proxies[i]);
}
this.proxies = [];
}
}
11 changes: 11 additions & 0 deletions src/utils/invoke.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export function invoke(f: Function, args: Array<any>) {
switch (args.length) {
case 0: return f();
case 1: return f(args[0]);
case 2: return f(args[0], args[1]);
case 3: return f(args[0], args[1], args[2]);
case 4: return f(args[0], args[1], args[2], args[3]);
case 5: return f(args[0], args[1], args[2], args[3], args[4]);
default: return f.apply(void 0, args);
}
}
21 changes: 21 additions & 0 deletions tests/factory/combine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import xs from '../../src/index';
import * as assert from 'assert';

describe('xs.combine', () => {
it('should merge AND-style two streams together', (done) => {
const stream1 = xs.interval(100).take(2);
const stream2 = xs.interval(120).take(2);
const stream = xs.combine((x, y) => `${x}${y}`, stream1, stream2);
let expected = ['00', '10', '11'];
stream.subscribe({
next: (x) => {
assert.equal(x, expected.shift());
},
error: done.fail,
end: () => {
assert.equal(expected.length, 0);
done();
},
});
});
});
21 changes: 21 additions & 0 deletions tests/operator/combine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import xs from '../../src/index';
import * as assert from 'assert';

describe('Stream.prototype.combine', () => {
it('should merge AND-style another stream with the primary stream', (done) => {
const source = xs.interval(100).take(2);
const other = xs.interval(120).take(2);
const stream = source.combine((x, y) => `${x}${y}`, other);
let expected = ['00', '10', '11'];
stream.subscribe({
next: (x) => {
assert.equal(x, expected.shift());
},
error: done.fail,
end: () => {
assert.equal(expected.length, 0);
done();
},
});
});
});

0 comments on commit f65a6a3

Please sign in to comment.