Skip to content

Commit

Permalink
feat(operator): implement debug() operator with DebugMachine
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Feb 25, 2016
1 parent 32dd8ac commit e2a0342
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {MapMachine} from './operator/MapMachine';
import {FilterMachine} from './operator/FilterMachine';
import {TakeMachine} from './operator/TakeMachine';
import {SkipMachine} from './operator/SkipMachine';
import {DebugMachine} from './operator/DebugMachine';

export class Stream<T> implements Observer<T> {
public observers: Array<Observer<T>>;
Expand Down Expand Up @@ -63,4 +64,8 @@ export class Stream<T> implements Observer<T> {
skip(amount: number): Stream<T> {
return new Stream<T>(new SkipMachine(amount, this));
}

debug(spy: (t: T) => void = null): Stream<T> {
return new Stream<T>(new DebugMachine(spy, this));
}
}
31 changes: 31 additions & 0 deletions src/operator/DebugMachine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Stream} from '../Stream';

export class DebugMachine<T> implements Machine<T> {
public proxy: Observer<T>;

constructor(public spy: (t: T) => void = null,
public inStream: Stream<T>) {
}

start(outStream: Observer<T>): void {
this.proxy = {
next: (t: T) => {
if (this.spy) {
this.spy(t);
} else {
console.log(t);
}
outStream.next(t);
},
error: outStream.error,
complete: outStream.complete,
};
this.inStream.subscribe(this.proxy);
}

stop(): void {
this.inStream.unsubscribe(this.proxy);
}
}
21 changes: 21 additions & 0 deletions tests/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,24 @@ describe('Stream.prototype.skip', () => {
stream.subscribe(observer);
});
});

describe('Stream.prototype.debug', () => {
it('should allow inspecting the operator chain', (done) => {
const expected = [0, 1, 2];
const stream = xs.interval(50).debug(x => {
assert.equal(x, expected.shift());
});
let observer = {
next: (x: number) => {
if (x === 2) {
assert.equal(expected.length, 0);
stream.unsubscribe(observer);
done();
}
},
error: done.fail,
complete: done.fail,
};
stream.subscribe(observer);
});
});

0 comments on commit e2a0342

Please sign in to comment.