Skip to content

Commit

Permalink
feat(Stream): add new method setDebugListener on streams
Browse files Browse the repository at this point in the history
Read its JSDoc and tests for more details.
  • Loading branch information
staltz committed Aug 22, 2016
1 parent 58fae93 commit d0ee240
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
40 changes: 40 additions & 0 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1064,20 +1064,25 @@ export class Stream<T> implements InternalListener<T> {
public _prod: InternalProducer<T>;
protected _ils: Array<InternalListener<T>>; // 'ils' = Internal listeners
protected _stopID: any;
protected _dl: InternalListener<T>; // the debug listener
protected _d: boolean; // flag indicating the existence of the debug listener
protected _target: Stream<T>; // imitation target if this Stream will imitate
protected _err: any;

constructor(producer?: InternalProducer<T>) {
this._prod = producer || <InternalProducer<T>> NO;
this._ils = [];
this._stopID = NO;
this._dl = NO as InternalListener<T>;
this._d = false;
this._target = <Stream<T>> NO;
this._err = NO;
}

_n(t: T): void {
const a = this._ils;
const L = a.length;
if (this._d) this._dl._n(t);
if (L == 1) a[0]._n(t); else {
const b = copy(a);
for (let i = 0; i < L; i++) b[i]._n(t);
Expand All @@ -1090,6 +1095,7 @@ export class Stream<T> implements InternalListener<T> {
const a = this._ils;
const L = a.length;
this._x();
if (this._d) this._dl._e(err);
if (L == 1) a[0]._e(err); else {
const b = copy(a);
for (let i = 0; i < L; i++) b[i]._e(err);
Expand All @@ -1100,6 +1106,7 @@ export class Stream<T> implements InternalListener<T> {
const a = this._ils;
const L = a.length;
this._x();
if (this._d) this._dl._c();
if (L == 1) a[0]._c(); else {
const b = copy(a);
for (let i = 0; i < L; i++) b[i]._c();
Expand Down Expand Up @@ -1916,6 +1923,39 @@ export class Stream<T> implements InternalListener<T> {
shamefullySendComplete() {
this._c();
}

/**
* Adds a "debug" listener to the stream. There can only be one debug
* listener, that's why this is 'setDebugListener'. To remove the debug
* listener, just call setDebugListener(null).
*
* A debug listener is like any other listener. The only difference is that a
* debug listener is "stealthy": its presence/absence does not trigger the
* start/stop of the stream (or the producer inside the stream). This is
* useful so you can inspect what is going on without changing the behavior
* of the program. If you have an idle stream and you add a normal listener to
* it, the stream will start executing. But if you set a debug listener on an
* idle stream, it won't start executing (not until the first normal listener
* is added).
*
* As the name indicates, we don't recommend using this method to build app
* logic. In fact, in most cases the debug operator works just fine. Only use
* this one if you know what you're doing.
*
* @param {Listener<T>} listener
*/
setDebugListener(listener: Listener<T>) {
if (!listener) {
this._d = false;
this._dl = NO as InternalListener<T>;
} else {
this._d = true;
(listener as any as InternalListener<T>)._n = listener.next;
(listener as any as InternalListener<T>)._e = listener.error;
(listener as any as InternalListener<T>)._c = listener.complete;
this._dl = listener as any as InternalListener<T>;
}
}
}

export class MemoryStream<T> extends Stream<T> {
Expand Down
37 changes: 37 additions & 0 deletions tests/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,43 @@ describe('Stream', () => {
});
});

describe('setDebugListener', () => {
it('should not trigger a stream execution', (done) => {
const stream = xs.of(1, 2, 3);
const listener: Listener<number> = {
next: () => done('should not be called'),
error: () => done('should not be called'),
complete: () => done('should not be called'),
};

stream.setDebugListener(listener);
setTimeout(() => done(), 200);
});

it('should spy an existing stream execution', (done) => {
const stream = xs.periodic(200).take(8);
const listener = { next: () => { }, error: () => { }, complete: () => { } };
const expected = [0, 1, 2];

const debugListener: Listener<number> = {
next: (x: number) => {
assert.strictEqual(x, expected.shift());
},
error: () => done('should not be called'),
complete: () => done('should not be called')
};
stream.setDebugListener(debugListener);

stream.addListener(listener);
setTimeout(() => stream.removeListener(listener), 700);

setTimeout(() => {
assert.strictEqual(expected.length, 0);
done();
}, 1000);
});
});

describe('addListener', () => {
it('throws a helpful error if you forget the next function', (done) => {
const stream = xs.empty();
Expand Down

0 comments on commit d0ee240

Please sign in to comment.