Skip to content

Commit

Permalink
fix(combine): fix combine() to export its Producer class
Browse files Browse the repository at this point in the history
And should have a public member: streams. It is an array of the input streams, same API as merge
producer has. These fixes are for metadata purposes, for debugging tools.
  • Loading branch information
staltz committed May 9, 2016
1 parent 144341e commit 700a129
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ export interface CombineInstanceSignature<T> {
<R>(project: (...args: Array<any>) => R, ...streams: Array<Stream<any>>): Stream<R>;
}

class CombineListener<T> implements InternalListener<T> {
export class CombineListener<T> implements InternalListener<T> {
constructor(private i: number,
private p: CombineProducer<T>) {
p.ils.push(this);
Expand Down Expand Up @@ -174,7 +174,7 @@ class CombineListener<T> implements InternalListener<T> {
}
}

class CombineProducer<R> implements InternalProducer<R> {
export class CombineProducer<R> implements InternalProducer<R> {
public out: InternalListener<R> = emptyListener;
public ils: Array<CombineListener<any>> = [];
public ac: number; // ac is "active count", num of streams still not completed
Expand Down Expand Up @@ -279,21 +279,21 @@ export class MergeProducer<T> implements InternalProducer<T>, InternalListener<T
private out: InternalListener<T> = emptyListener;
private ac: number; // ac is activeCount, starts initialized

constructor(public s: Array<Stream<T>>) {
this.ac = s.length;
constructor(public streams: Array<Stream<T>>) {
this.ac = streams.length;
}

_start(out: InternalListener<T>): void {
this.out = out;
const s = this.s;
const s = this.streams;
const L = s.length;
for (let i = 0; i < L; i++) {
s[i]._add(this);
}
}

_stop(): void {
const s = this.s;
const s = this.streams;
const L = s.length;
for (let i = 0; i < L; i++) {
s[i]._remove(this);
Expand Down

0 comments on commit 700a129

Please sign in to comment.