Skip to content

Commit

Permalink
feat(core): implement filter + map fusion
Browse files Browse the repository at this point in the history
Add new FilterMapOperator to speed up filter + map operations
  • Loading branch information
TylorS authored and Andre Medeiros committed Apr 14, 2016
1 parent 1326c97 commit b0507e6
Showing 1 changed file with 38 additions and 1 deletion.
39 changes: 38 additions & 1 deletion src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,39 @@ class MapOperator<T, R> implements Operator<T, R> {
}
}

class FilterMapOperator<T, R> implements Operator<T, R> {
private out: Stream<R> = null;

constructor(public predicate: (t: T) => boolean,
public project: (t: T) => R,
public ins: Stream<T>) {
}

_start(out: Stream<R>): void {
this.out = out;
this.ins._add(this);
}

_stop(): void {
this.ins._remove(this);
this.out = null;
}

_n(v: T) {
if (this.predicate(v)) {
this.out._n(this.project(v));
};
}

_e(e: any) {
this.out._e(e);
}

_c() {
this.out._c();
}
}

class MapToOperator<T, R> implements Operator<T, R> {
private out: Stream<R> = null;

Expand Down Expand Up @@ -1146,7 +1179,11 @@ export class Stream<T> implements InternalListener<T> {
* be emitted on the output Stream.
* @return {Stream}
*/
map<U>(project: (t: T) => U): Stream<U> {
map<U>(project: (t: T) => U): Stream<U> | Stream<T> {
if (this._prod instanceof FilterOperator) {
const prod = (<FilterOperator<T>> this._prod);
return new Stream<U>(new FilterMapOperator(prod.predicate, project, prod.ins));
}
return new Stream<U>(new MapOperator(project, this));
}

Expand Down

0 comments on commit b0507e6

Please sign in to comment.