From 65456705498e6172945d55c5272313293c21b347 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sun, 27 Mar 2016 18:21:44 +0300 Subject: [PATCH] feat(imitate): implement imitate() operator for circular dependencies --- src/Stream.ts | 40 +++++++++++++++++++++++----------------- tests/stream.ts | 31 +++++++++++++++++++++---------- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/src/Stream.ts b/src/Stream.ts index 061a39a..868bc50 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -34,15 +34,17 @@ export class Stream implements InternalListener { this._listeners = []; } - static create(producer: Producer): Stream { - (> ( producer))._start = - function _start(il: InternalListener) { - (> ( il)).next = il._n; - (> ( il)).error = il._e; - (> ( il)).complete = il._c; - this.start(> ( il)); - }; - (> ( producer))._stop = producer.stop; + static create(producer?: Producer): Stream { + if (producer) { + (> ( producer))._start = + function _start(il: InternalListener) { + (> ( il)).next = il._n; + (> ( il)).error = il._e; + (> ( il)).complete = il._c; + this.start(> ( il)); + }; + (> ( producer))._stop = producer.stop; + } return new Stream(> ( producer)); } @@ -89,7 +91,7 @@ export class Stream implements InternalListener { this._listeners[i]._c(); } } - this._stopID = setTimeout(() => this._prod._stop()); + if (this._prod) this._stopID = setTimeout(() => this._prod._stop()); this._listeners = []; } @@ -104,22 +106,22 @@ export class Stream implements InternalListener { this._remove(> ( listener)); } - _add(listener: InternalListener): void { - this._listeners.push(listener); + _add(il: InternalListener): void { + this._listeners.push(il); if (this._listeners.length === 1) { if (this._stopID !== empty) { clearTimeout(this._stopID); this._stopID = empty; } - this._prod._start(this); + if (this._prod) this._prod._start(this); } } - _remove(listener: InternalListener): void { - const i = this._listeners.indexOf(listener); + _remove(il: InternalListener): void { + const i = this._listeners.indexOf(il); if (i > -1) { this._listeners.splice(i, 1); - if (this._listeners.length <= 0) { + if (this._prod && this._listeners.length <= 0) { this._stopID = setTimeout(() => this._prod._stop()); } } @@ -163,7 +165,7 @@ export class Stream implements InternalListener { static empty(): Stream { return new Stream({ - _start(lner: InternalListener) { lner._c(); }, + _start(il: InternalListener) { il._c(); }, _stop: noop, }); } @@ -222,6 +224,10 @@ export class Stream implements InternalListener { streams.unshift(this); return Stream.combine(project, ...streams); }; + + imitate(other: Stream): void { + other._add(this); + } } export class MemoryStream extends Stream { diff --git a/tests/stream.ts b/tests/stream.ts index 36b6de4..43a2142 100644 --- a/tests/stream.ts +++ b/tests/stream.ts @@ -45,11 +45,7 @@ describe('Stream', () => { }); it('should have all the core operators as methods, plus addListener and removeListener', () => { - const emptyProducer = { - start(): void { return undefined; }, - stop(): void { return undefined; }, - }; - const stream = xs.create(emptyProducer); + const stream = xs.create(); assert.equal(typeof stream.addListener, 'function'); assert.equal(typeof stream.removeListener, 'function'); assert.equal(typeof stream.map, 'function'); @@ -71,11 +67,7 @@ describe('Stream', () => { const expected = [10, 20, 30]; let listenerGotEnd: boolean = false; - const emptyProducer = { - start(): void { return undefined; }, - stop(): void { return undefined; }, - }; - const stream = xs.create(emptyProducer); + const stream = xs.create(); stream.addListener({ next: (x: number) => { @@ -97,6 +89,25 @@ describe('Stream', () => { done(); }); + it('should allow being imitated by a proxy Stream', (done) => { + const stream = xs.of(10, 20, 30); + const proxyStream = xs.create(); + + const expected = [10, 20, 30]; + proxyStream.addListener({ + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: e => done(e), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); + + proxyStream.imitate(stream); + }); + it('should be possible to addListener and removeListener with 1 listener', (done) => { const stream = xs.interval(100); const expected = [0, 1, 2];