From d0ee240a9b72fbf666ed9d1269a5a45d79825c9c Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Mon, 22 Aug 2016 13:27:03 +0300 Subject: [PATCH] feat(Stream): add new method setDebugListener on streams Read its JSDoc and tests for more details. --- src/core.ts | 40 ++++++++++++++++++++++++++++++++++++++++ tests/stream.ts | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/src/core.ts b/src/core.ts index 9eb1b20..bb42215 100644 --- a/src/core.ts +++ b/src/core.ts @@ -1064,6 +1064,8 @@ export class Stream implements InternalListener { public _prod: InternalProducer; protected _ils: Array>; // 'ils' = Internal listeners protected _stopID: any; + protected _dl: InternalListener; // the debug listener + protected _d: boolean; // flag indicating the existence of the debug listener protected _target: Stream; // imitation target if this Stream will imitate protected _err: any; @@ -1071,6 +1073,8 @@ export class Stream implements InternalListener { this._prod = producer || > NO; this._ils = []; this._stopID = NO; + this._dl = NO as InternalListener; + this._d = false; this._target = > NO; this._err = NO; } @@ -1078,6 +1082,7 @@ export class Stream implements InternalListener { _n(t: T): void { const a = this._ils; const L = a.length; + if (this._d) this._dl._n(t); if (L == 1) a[0]._n(t); else { const b = copy(a); for (let i = 0; i < L; i++) b[i]._n(t); @@ -1090,6 +1095,7 @@ export class Stream implements InternalListener { const a = this._ils; const L = a.length; this._x(); + if (this._d) this._dl._e(err); if (L == 1) a[0]._e(err); else { const b = copy(a); for (let i = 0; i < L; i++) b[i]._e(err); @@ -1100,6 +1106,7 @@ export class Stream implements InternalListener { const a = this._ils; const L = a.length; this._x(); + if (this._d) this._dl._c(); if (L == 1) a[0]._c(); else { const b = copy(a); for (let i = 0; i < L; i++) b[i]._c(); @@ -1916,6 +1923,39 @@ export class Stream implements InternalListener { shamefullySendComplete() { this._c(); } + + /** + * Adds a "debug" listener to the stream. There can only be one debug + * listener, that's why this is 'setDebugListener'. To remove the debug + * listener, just call setDebugListener(null). + * + * A debug listener is like any other listener. The only difference is that a + * debug listener is "stealthy": its presence/absence does not trigger the + * start/stop of the stream (or the producer inside the stream). This is + * useful so you can inspect what is going on without changing the behavior + * of the program. If you have an idle stream and you add a normal listener to + * it, the stream will start executing. But if you set a debug listener on an + * idle stream, it won't start executing (not until the first normal listener + * is added). + * + * As the name indicates, we don't recommend using this method to build app + * logic. In fact, in most cases the debug operator works just fine. Only use + * this one if you know what you're doing. + * + * @param {Listener} listener + */ + setDebugListener(listener: Listener) { + if (!listener) { + this._d = false; + this._dl = NO as InternalListener; + } else { + this._d = true; + (listener as any as InternalListener)._n = listener.next; + (listener as any as InternalListener)._e = listener.error; + (listener as any as InternalListener)._c = listener.complete; + this._dl = listener as any as InternalListener; + } + } } export class MemoryStream extends Stream { diff --git a/tests/stream.ts b/tests/stream.ts index cc7a9a8..b3dea81 100644 --- a/tests/stream.ts +++ b/tests/stream.ts @@ -311,6 +311,43 @@ describe('Stream', () => { }); }); + describe('setDebugListener', () => { + it('should not trigger a stream execution', (done) => { + const stream = xs.of(1, 2, 3); + const listener: Listener = { + next: () => done('should not be called'), + error: () => done('should not be called'), + complete: () => done('should not be called'), + }; + + stream.setDebugListener(listener); + setTimeout(() => done(), 200); + }); + + it('should spy an existing stream execution', (done) => { + const stream = xs.periodic(200).take(8); + const listener = { next: () => { }, error: () => { }, complete: () => { } }; + const expected = [0, 1, 2]; + + const debugListener: Listener = { + next: (x: number) => { + assert.strictEqual(x, expected.shift()); + }, + error: () => done('should not be called'), + complete: () => done('should not be called') + }; + stream.setDebugListener(debugListener); + + stream.addListener(listener); + setTimeout(() => stream.removeListener(listener), 700); + + setTimeout(() => { + assert.strictEqual(expected.length, 0); + done(); + }, 1000); + }); + }); + describe('addListener', () => { it('throws a helpful error if you forget the next function', (done) => { const stream = xs.empty();