From 3489ce339ac6c9e05641f1795b06b805f7c7e1b7 Mon Sep 17 00:00:00 2001 From: Tylor Steinberger Date: Sat, 19 Mar 2016 08:28:31 -0400 Subject: [PATCH] feat(startWith): implement startWith operator Add implementation for startWith Fix this.proxy compiler error in RememberOperator --- src/Stream.ts | 9 +++++---- src/operator/RememberOperator.ts | 9 ++++++--- src/operator/StartWithOperator.ts | 20 ++++++++++++++++++++ 3 files changed, 31 insertions(+), 7 deletions(-) create mode 100644 src/operator/StartWithOperator.ts diff --git a/src/Stream.ts b/src/Stream.ts index f24bad3..7bbb170 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -8,6 +8,7 @@ import {DebugOperator} from './operator/DebugOperator'; import {FoldOperator} from './operator/FoldOperator'; import {LastOperator} from './operator/LastOperator'; import {RememberOperator} from './operator/RememberOperator'; +import {StartWithOperator} from './operator/StartWithOperator'; import { CombineProducer, CombineInstanceSignature, @@ -92,10 +93,6 @@ export class Stream implements Observer { return new Stream(new CombineProducer(project, streams)); }; - static Stream(): Stream { - return new Stream({start: noop, stop: noop}); - } - static MemoryStream(): MemoryStream { return new MemoryStream({start: noop, stop: noop}); } @@ -150,6 +147,10 @@ export class Stream implements Observer { return new MemoryStream(new RememberOperator(this)); } + startWith(x: T): Stream { + return new Stream(new StartWithOperator(this, x)); + } + merge(other: Stream): Stream { return Stream.merge(this, other); } diff --git a/src/operator/RememberOperator.ts b/src/operator/RememberOperator.ts index 2c36bbb..30071fc 100644 --- a/src/operator/RememberOperator.ts +++ b/src/operator/RememberOperator.ts @@ -1,17 +1,20 @@ -import {Operator} from '../Operator'; import {Stream, MemoryStream} from '../Stream'; +import {Operator} from '../Operator'; +import {Observer} from '../Observer'; +import {emptyObserver} from '../utils/emptyObserver'; export class RememberOperator implements Operator { - public value: any; + public out: Observer = emptyObserver; constructor(public ins: Stream) { } start(out: MemoryStream): void { + this.out = out; this.ins.subscribe(out); } stop(): void { - this.ins.unsubscribe(this.proxy); + this.ins.unsubscribe(this.out); } } diff --git a/src/operator/StartWithOperator.ts b/src/operator/StartWithOperator.ts new file mode 100644 index 0000000..807f1b3 --- /dev/null +++ b/src/operator/StartWithOperator.ts @@ -0,0 +1,20 @@ +import {Stream} from '../Stream'; +import {Observer} from '../Observer'; +import {Operator} from '../Operator'; +import {emptyObserver} from '../utils/emptyObserver'; + +export class StartWithOperator implements Operator { + public out: Observer = emptyObserver; + constructor(public ins: Stream, public value: T) { + } + + start(out: Observer): void { + this.out = out; + this.out.next(this.value); + this.ins.subscribe(out); + } + + stop(): void { + this.ins.unsubscribe(this.out); + } +}