Skip to content

Commit

Permalink
feat(startWith): implement startWith operator
Browse files Browse the repository at this point in the history
Add implementation for startWith
Fix this.proxy compiler error in RememberOperator
  • Loading branch information
TylorS committed Mar 19, 2016
1 parent 46ef6e7 commit 3489ce3
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 7 deletions.
9 changes: 5 additions & 4 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {DebugOperator} from './operator/DebugOperator';
import {FoldOperator} from './operator/FoldOperator';
import {LastOperator} from './operator/LastOperator';
import {RememberOperator} from './operator/RememberOperator';
import {StartWithOperator} from './operator/StartWithOperator';
import {
CombineProducer,
CombineInstanceSignature,
Expand Down Expand Up @@ -92,10 +93,6 @@ export class Stream<T> implements Observer<T> {
return new Stream<R>(new CombineProducer<R>(project, streams));
};

static Stream<T>(): Stream<T> {
return new Stream<T>({start: noop, stop: noop});
}

static MemoryStream<T>(): MemoryStream<T> {
return new MemoryStream<T>({start: noop, stop: noop});
}
Expand Down Expand Up @@ -150,6 +147,10 @@ export class Stream<T> implements Observer<T> {
return new MemoryStream<T>(new RememberOperator(this));
}

startWith(x: T): Stream<T> {
return new Stream<T>(new StartWithOperator(this, x));
}

merge(other: Stream<T>): Stream<T> {
return Stream.merge(this, other);
}
Expand Down
9 changes: 6 additions & 3 deletions src/operator/RememberOperator.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import {Operator} from '../Operator';
import {Stream, MemoryStream} from '../Stream';
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {emptyObserver} from '../utils/emptyObserver';

export class RememberOperator<T> implements Operator<T, T> {
public value: any;
public out: Observer<T> = emptyObserver;

constructor(public ins: Stream<T>) {
}

start(out: MemoryStream<T>): void {
this.out = out;
this.ins.subscribe(out);
}

stop(): void {
this.ins.unsubscribe(this.proxy);
this.ins.unsubscribe(this.out);
}
}
20 changes: 20 additions & 0 deletions src/operator/StartWithOperator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {Stream} from '../Stream';
import {Observer} from '../Observer';
import {Operator} from '../Operator';
import {emptyObserver} from '../utils/emptyObserver';

export class StartWithOperator<T> implements Operator<T, T> {
public out: Observer<T> = emptyObserver;
constructor(public ins: Stream<T>, public value: T) {
}

start(out: Observer<T>): void {
this.out = out;
this.out.next(this.value);
this.ins.subscribe(out);
}

stop(): void {
this.ins.unsubscribe(this.out);
}
}

0 comments on commit 3489ce3

Please sign in to comment.