Skip to content

Commit

Permalink
perf(take): improve take() perf by using Proxy Observer class
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Feb 27, 2016
1 parent 5233f43 commit 6eae1a9
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions src/operator/TakeProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,30 @@ import {Producer} from '../Producer';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';

export class Proxy<T> implements Observer<T> {
constructor(public out: Stream<T>,
public prod: TakeProducer<T>) {
}

next(t: T) {
const {prod, out} = this;
if (prod.taken++ < prod.max) {
out.next(t);
} else {
out.end();
prod.stop();
}
}

error(err: any) {
this.out.error(err);
}

end() {
this.out.end();
}
}

export class TakeProducer<T> implements Producer<T> {
public proxy: Observer<T> = emptyObserver;
public taken: number = 0;
Expand All @@ -12,19 +36,7 @@ export class TakeProducer<T> implements Producer<T> {
}

start(out: Stream<T>): void {
this.proxy = {
next: (t: T) => {
if (this.taken++ < this.max) {
out.next(t);
} else {
out.end();
this.stop();
}
},
error: (err) => out.error(err),
end: () => out.end(),
};
this.ins.subscribe(this.proxy);
this.ins.subscribe(this.proxy = new Proxy(out, this));
}

stop(): void {
Expand Down

0 comments on commit 6eae1a9

Please sign in to comment.