From 23099ef8585950b71d0375408c0a3f7199298d6e Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Wed, 30 Mar 2016 10:12:15 +0300 Subject: [PATCH] feat(endWhen): implement operator endWhen(), add tests --- src/Stream.ts | 5 +++ src/extra/delay.ts | 2 +- src/operator/EndWhenOperator.ts | 70 +++++++++++++++++++++++++++++++++ tests/operator/endWhen.ts | 41 +++++++++++++++++++ tests/stream.ts | 1 + 5 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 src/operator/EndWhenOperator.ts create mode 100644 tests/operator/endWhen.ts diff --git a/src/Stream.ts b/src/Stream.ts index 9cfc248..2f10811 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -11,6 +11,7 @@ import {DebugOperator} from './operator/DebugOperator'; import {FoldOperator} from './operator/FoldOperator'; import {LastOperator} from './operator/LastOperator'; import {StartWithOperator} from './operator/StartWithOperator'; +import {EndWhenOperator} from './operator/EndWhenOperator'; import {FlattenOperator} from './operator/FlattenOperator'; import {FlattenConcurrentlyOperator} from './operator/FlattenConcurrentlyOperator'; import { @@ -196,6 +197,10 @@ export class Stream implements InternalListener { return new Stream(new StartWithOperator(this, x)); } + endWhen(other: Stream): Stream { + return new Stream(new EndWhenOperator(other, this)); + } + fold(accumulate: (acc: R, t: T) => R, init: R): Stream { return new Stream(new FoldOperator(accumulate, init, this)); } diff --git a/src/extra/delay.ts b/src/extra/delay.ts index 1378d3f..5934947 100644 --- a/src/extra/delay.ts +++ b/src/extra/delay.ts @@ -50,7 +50,7 @@ class DelayOperator implements Operator { } export default function delay(period: number): (ins: Stream) => Stream { - return function delayOperator(ins: Stream) { + return function delayOperator(ins: Stream): Stream { return new Stream(new DelayOperator(period, ins)); }; } diff --git a/src/operator/EndWhenOperator.ts b/src/operator/EndWhenOperator.ts new file mode 100644 index 0000000..e61fe91 --- /dev/null +++ b/src/operator/EndWhenOperator.ts @@ -0,0 +1,70 @@ +import {InternalListener} from '../InternalListener'; +import {Operator} from '../Operator'; +import {Stream} from '../Stream'; +import {emptyListener} from '../utils/emptyListener'; +import {empty} from '../utils/empty'; + +export class Proxy implements InternalListener { + constructor(private out: Stream, + private prod: EndWhenOperator) { + } + + _n(t: T) { + this.out._n(t); + } + + _e(err: any) { + this.out._e(err); + } + + _c() { + this.prod.end(); + } +} + +export class OtherListener implements InternalListener { + constructor(private out: Stream, + private prod: EndWhenOperator) { + } + + _n(t: T) { + this.prod.end(); + } + + _e(err: any) { + this.out._e(err); + } + + _c() { + this.prod.end(); + } +} + +export class EndWhenOperator implements Operator { + private proxy: InternalListener = emptyListener; + private oli: InternalListener = emptyListener; // oli = other listener + private out: Stream = > empty; + + constructor(private o: Stream, // o = other + public ins: Stream) { + } + + _start(out: Stream): void { + this.out = out; + this.o._add(this.oli = new OtherListener(out, this)); + this.ins._add(this.proxy = new Proxy(out, this)); + } + + _stop(): void { + this.ins._remove(this.proxy); + } + + end(): void { + this.o._remove(this.oli); + this.ins._remove(this.proxy); + this.out._c(); + this.proxy = null; + this.out = null; + this.oli = null; + } +} diff --git a/tests/operator/endWhen.ts b/tests/operator/endWhen.ts new file mode 100644 index 0000000..c161f86 --- /dev/null +++ b/tests/operator/endWhen.ts @@ -0,0 +1,41 @@ +import xs from '../../src/index'; +import delay from '../../src/extra/delay'; +import * as assert from 'assert'; + +describe('Stream.prototype.endWhen', () => { + it('should complete the stream when another stream emits next', (done) => { + const source = xs.interval(50); + const other = xs.interval(220).take(1); + const stream = source.endWhen(other); + const expected = [0, 1, 2, 3]; + + stream.addListener({ + next: (x: number) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); + }); + + it('should complete the stream when another stream emits complete', (done) => { + const source = xs.interval(50); + const other = xs.empty().compose(delay(220)); + const stream = source.endWhen(other); + const expected = [0, 1, 2, 3]; + + stream.addListener({ + next: (x: number) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); + }); +}); diff --git a/tests/stream.ts b/tests/stream.ts index cc4d2ad..d69a11e 100644 --- a/tests/stream.ts +++ b/tests/stream.ts @@ -57,6 +57,7 @@ describe('Stream', () => { assert.equal(typeof stream.drop, 'function'); assert.equal(typeof stream.last, 'function'); assert.equal(typeof stream.startWith, 'function'); + assert.equal(typeof stream.endWhen, 'function'); assert.equal(typeof stream.fold, 'function'); assert.equal(typeof stream.flatten, 'function'); assert.equal(typeof stream.flattenConcurrently, 'function');