Skip to content

Commit

Permalink
fix(dropUntil): align with rxjs skipUntil
Browse files Browse the repository at this point in the history
Refactor dropUntil to leave its output stream unchanged when its 'other'
stream completes. Make output stream empty if 'other' stream is empty.

This change align dropUntil more closely with the skipUntil operator
from rxjs.

Refactor and add tests to verify proper behavior of dropUntil extra.

Resolve #237.
  • Loading branch information
xtianjohns authored and staltz committed Jul 21, 2018
1 parent 141014c commit 0c0cf40
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 12 deletions.
12 changes: 5 additions & 7 deletions src/extra/dropUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ class OtherIL<T> implements InternalListener<any>, OutSender<T> {
this.out._e(err);
}

_c() {
this.op.up();
}
_c() {}
}

export class DropUntilOperator<T> implements Operator<T, T> {
Expand Down Expand Up @@ -63,14 +61,14 @@ export class DropUntilOperator<T> implements Operator<T, T> {
_c() {
const u = this.out;
if (!u) return;
this.up();
this._stop();
u._c();
}
}

/**
* Starts emitting the input stream when another stream emits a next event. The
* output stream will complete if/when the other stream completes.
* output stream will emit no items if another stream is empty.
*
* Marble diagram:
*
Expand Down Expand Up @@ -106,8 +104,8 @@ export class DropUntilOperator<T> implements Operator<T, T> {
*
* #### Arguments:
*
* @param {Stream} other Some other stream that is used to know when should the
* output stream of this operator start emitting.
* @param {Stream} other Some other stream that is used to know when the output
* stream of this operator should start emitting.
* @return {Stream}
*/
export default function dropUntil<T>(other: Stream<any>): (ins: Stream<T>) => Stream<T> {
Expand Down
43 changes: 38 additions & 5 deletions tests/extra/dropUntil.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// <reference types="mocha"/>
/// <reference types="node" />
import xs from '../../src/index';
import concat from '../../src/extra/concat';
import dropUntil from '../../src/extra/dropUntil';
import delay from '../../src/extra/delay';
import * as assert from 'assert';
Expand All @@ -20,16 +21,30 @@ describe('dropUntil (extra)', () => {
complete: () => {
assert.strictEqual(expected.length, 0);
done();
},
}
});
});

it('should complete the stream when another stream emits complete', (done: any) => {
const source = xs.periodic(50).take(6);
const other = xs.empty().compose(delay(220));
it('should emit no items if another stream is empty', (done: any) => {
const source = xs.periodic(10).take(10);
const other = xs.empty().compose(delay(50));
const stream = source.compose(dropUntil(other));
const expected = [4, 5];
let emissions = 0;
stream.addListener({
next: () => emissions++,
error: (err: any) => done(err),
complete: () => {
assert.strictEqual(emissions, 0);
done();
}
});
});

it('should not wait for another stream to complete', (done: any) => {
const source = xs.periodic(50).take(2);
const other = concat(xs.of('foo'), xs.never());
const stream = source.compose(dropUntil(other));
const expected = [0, 1];
stream.addListener({
next: (x: number) => {
assert.strictEqual(x, expected.shift());
Expand All @@ -38,7 +53,25 @@ describe('dropUntil (extra)', () => {
complete: () => {
assert.strictEqual(expected.length, 0);
done();
}
});
});

it('should not complete the stream when another non empty stream emits complete', (done: any) => {
const source = xs.periodic(50).take(8);
const other = xs.periodic(1).take(1).compose(delay(220));
const stream = source.compose(dropUntil(other));
const expected = [4, 5, 6, 7];

stream.addListener({
next: (x: number) => {
assert.strictEqual(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.strictEqual(expected.length, 0);
done();
}
});
});
});

0 comments on commit 0c0cf40

Please sign in to comment.