Skip to content

Commit

Permalink
perf(flatten): avoid cut() method in flattening
Browse files Browse the repository at this point in the history
  • Loading branch information
staltz committed May 2, 2016
1 parent 25d5d58 commit 28afee9
Showing 1 changed file with 27 additions and 41 deletions.
68 changes: 27 additions & 41 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ export class DropOperator<T> implements Operator<T, T> {
}
}

class OtherListener<T> implements InternalListener<any> {
class OtherIL<T> implements InternalListener<any> {
constructor(private out: Stream<T>,
private op: EndWhenOperator<T>) {
}
Expand All @@ -418,23 +418,23 @@ class OtherListener<T> implements InternalListener<any> {

export class EndWhenOperator<T> implements Operator<T, T> {
private out: Stream<T> = null;
private oli: InternalListener<any> = emptyListener; // oli = other listener
private oil: InternalListener<any> = emptyListener; // oil = other InternalListener

constructor(public o: Stream<any>, // o = other
public ins: Stream<T>) {
}

_start(out: Stream<T>): void {
this.out = out;
this.o._add(this.oli = new OtherListener(out, this));
this.o._add(this.oil = new OtherIL(out, this));
this.ins._add(this);
}

_stop(): void {
this.ins._remove(this);
this.o._remove(this.oli);
this.o._remove(this.oil);
this.out = null;
this.oli = null;
this.oil = null;
}

end(): void {
Expand Down Expand Up @@ -488,7 +488,7 @@ export class FilterOperator<T> implements Operator<T, T> {
}
}

class FCInner<T> implements InternalListener<T> {
class FCIL<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: FlattenConcOperator<T>) {
}
Expand Down Expand Up @@ -532,7 +532,7 @@ export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {

_n(s: Stream<T>) {
this.active++;
s._add(new FCInner(this.out, this));
s._add(new FCIL(this.out, this));
}

_e(err: any) {
Expand All @@ -544,7 +544,7 @@ export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {
}
}

class FInner<T> implements InternalListener<T> {
class FIL<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: FlattenOperator<T>) {
}
Expand All @@ -558,14 +558,14 @@ class FInner<T> implements InternalListener<T> {
}

_c() {
this.op.curr = null;
this.op.inner = null;
this.op.less();
}
}

export class FlattenOperator<T> implements Operator<Stream<T>, T> {
public curr: Stream<T> = null; // Current inner Stream
private inner: InternalListener<T> = null; // Current inner InternalListener
public inner: Stream<T> = null; // Current inner Stream
private il: InternalListener<T> = null; // Current inner InternalListener
private open: boolean = true;
private out: Stream<T> = null;

Expand All @@ -579,28 +579,20 @@ export class FlattenOperator<T> implements Operator<Stream<T>, T> {

_stop(): void {
this.ins._remove(this);
this.curr = null;
this.inner = null;
this.il = null;
this.open = true;
this.out = null;
}

cut(): void {
const {curr, inner} = this;
if (curr && inner) {
curr._remove(inner);
}
}

less(): void {
if (!this.open && !this.curr) {
this.out._c();
}
if (!this.open && !this.inner) this.out._c();
}

_n(s: Stream<T>) {
this.cut();
(this.curr = s)._add(this.inner = new FInner(this.out, this));
const {inner, il} = this;
if (inner && il) inner._remove(il);
(this.inner = s)._add(this.il = new FIL(this.out, this));
}

_e(err: any) {
Expand Down Expand Up @@ -692,7 +684,7 @@ export class LastOperator<T> implements Operator<T, T> {
}
}

class MFCInner<T> implements InternalListener<T> {
class MFCIL<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: MapFlattenConcOperator<T>) {
}
Expand Down Expand Up @@ -737,7 +729,7 @@ export class MapFlattenConcOperator<T> implements InternalProducer<T>, InternalL
_n(v: T) {
this.active++;
try {
this.mapOp.project(v)._add(new MFCInner(this.out, this));
this.mapOp.project(v)._add(new MFCIL(this.out, this));
} catch (e) {
this.out._e(e);
}
Expand All @@ -752,7 +744,7 @@ export class MapFlattenConcOperator<T> implements InternalProducer<T>, InternalL
}
}

class MFInner<T> implements InternalListener<T> {
class MFIL<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: MapFlattenOperator<T>) {
}
Expand All @@ -766,14 +758,14 @@ class MFInner<T> implements InternalListener<T> {
}

_c() {
this.op.curr = null;
this.op.inner = null;
this.op.less();
}
}

export class MapFlattenOperator<T> implements InternalProducer<T>, InternalListener<T> {
public curr: Stream<T> = null; // Current inner Stream
private inner: InternalListener<T> = null; // Current inner InternalListener
public inner: Stream<T> = null; // Current inner Stream
private il: InternalListener<T> = null; // Current inner InternalListener
private open: boolean = true;
private out: Stream<T> = null;

Expand All @@ -787,29 +779,23 @@ export class MapFlattenOperator<T> implements InternalProducer<T>, InternalListe

_stop(): void {
this.mapOp.ins._remove(this);
this.curr = null;
this.inner = null;
this.il = null;
this.open = true;
this.out = null;
}

cut(): void {
const {curr, inner} = this;
if (curr && inner) {
curr._remove(inner);
}
}

less(): void {
if (!this.open && !this.curr) {
if (!this.open && !this.inner) {
this.out._c();
}
}

_n(v: T) {
this.cut();
const {inner, il} = this;
if (inner && il) inner._remove(il);
try {
(this.curr = this.mapOp.project(v))._add(this.inner = new MFInner(this.out, this));
(this.inner = this.mapOp.project(v))._add(this.il = new MFIL(this.out, this));
} catch (e) {
this.out._e(e);
}
Expand Down

0 comments on commit 28afee9

Please sign in to comment.