Skip to content

Commit

Permalink
feat(endWhen): implement operator endWhen(), add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Mar 30, 2016
1 parent 65ddbd4 commit 23099ef
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {DebugOperator} from './operator/DebugOperator';
import {FoldOperator} from './operator/FoldOperator';
import {LastOperator} from './operator/LastOperator';
import {StartWithOperator} from './operator/StartWithOperator';
import {EndWhenOperator} from './operator/EndWhenOperator';
import {FlattenOperator} from './operator/FlattenOperator';
import {FlattenConcurrentlyOperator} from './operator/FlattenConcurrentlyOperator';
import {
Expand Down Expand Up @@ -196,6 +197,10 @@ export class Stream<T> implements InternalListener<T> {
return new Stream<T>(new StartWithOperator(this, x));
}

endWhen(other: Stream<any>): Stream<T> {
return new Stream<T>(new EndWhenOperator(other, this));
}

fold<R>(accumulate: (acc: R, t: T) => R, init: R): Stream<R> {
return new Stream<R>(new FoldOperator(accumulate, init, this));
}
Expand Down
2 changes: 1 addition & 1 deletion src/extra/delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class DelayOperator<T> implements Operator<T, T> {
}

export default function delay<T>(period: number): (ins: Stream<T>) => Stream<T> {
return function delayOperator(ins: Stream<T>) {
return function delayOperator(ins: Stream<T>): Stream<T> {
return new Stream<T>(new DelayOperator(period, ins));
};
}
70 changes: 70 additions & 0 deletions src/operator/EndWhenOperator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import {InternalListener} from '../InternalListener';
import {Operator} from '../Operator';
import {Stream} from '../Stream';
import {emptyListener} from '../utils/emptyListener';
import {empty} from '../utils/empty';

export class Proxy<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private prod: EndWhenOperator<T>) {
}

_n(t: T) {
this.out._n(t);
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.prod.end();
}
}

export class OtherListener<T> implements InternalListener<any> {
constructor(private out: Stream<T>,
private prod: EndWhenOperator<T>) {
}

_n(t: T) {
this.prod.end();
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.prod.end();
}
}

export class EndWhenOperator<T> implements Operator<T, T> {
private proxy: InternalListener<T> = emptyListener;
private oli: InternalListener<any> = emptyListener; // oli = other listener
private out: Stream<T> = <Stream<T>> empty;

constructor(private o: Stream<any>, // o = other
public ins: Stream<T>) {
}

_start(out: Stream<T>): void {
this.out = out;
this.o._add(this.oli = new OtherListener(out, this));
this.ins._add(this.proxy = new Proxy(out, this));
}

_stop(): void {
this.ins._remove(this.proxy);
}

end(): void {
this.o._remove(this.oli);
this.ins._remove(this.proxy);
this.out._c();
this.proxy = null;
this.out = null;
this.oli = null;
}
}
41 changes: 41 additions & 0 deletions tests/operator/endWhen.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import xs from '../../src/index';
import delay from '../../src/extra/delay';
import * as assert from 'assert';

describe('Stream.prototype.endWhen', () => {
it('should complete the stream when another stream emits next', (done) => {
const source = xs.interval(50);
const other = xs.interval(220).take(1);
const stream = source.endWhen(other);
const expected = [0, 1, 2, 3];

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});

it('should complete the stream when another stream emits complete', (done) => {
const source = xs.interval(50);
const other = xs.empty().compose(delay<any>(220));
const stream = source.endWhen(other);
const expected = [0, 1, 2, 3];

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});
});
1 change: 1 addition & 0 deletions tests/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ describe('Stream', () => {
assert.equal(typeof stream.drop, 'function');
assert.equal(typeof stream.last, 'function');
assert.equal(typeof stream.startWith, 'function');
assert.equal(typeof stream.endWhen, 'function');
assert.equal(typeof stream.fold, 'function');
assert.equal(typeof stream.flatten, 'function');
assert.equal(typeof stream.flattenConcurrently, 'function');
Expand Down

0 comments on commit 23099ef

Please sign in to comment.