Skip to content

Commit

Permalink
feat(extra): implement new flattenSequentially() extra operator
Browse files Browse the repository at this point in the history
Behaves like concatAll() in RxJS v5.
  • Loading branch information
staltz committed Apr 26, 2016
1 parent c913569 commit 4a6e63e
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 0 deletions.
79 changes: 79 additions & 0 deletions src/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import {Operator, Stream, InternalListener} from '../core';

class FSInner<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: FlattenSeqOperator<T>) {
}

_n(t: T) {
this.out._n(t);
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.op.less();
}
}

export class FlattenSeqOperator<T> implements Operator<Stream<T>, T> {
private open: boolean = true;
private active: boolean = false;
private seq: Array<Stream<T>> = [];
private out: Stream<T> = null;

constructor(public ins: Stream<Stream<T>>) {
}

_start(out: Stream<T>): void {
this.out = out;
this.ins._add(this);
}

_stop(): void {
this.ins._remove(this);
this.open = true;
this.active = false;
this.seq = [];
this.out = null;
}

less(): void {
this.active = false;
const seq = this.seq;
if (seq.length > 0) {
this._n(seq.shift());
}
if (!this.open && !this.active) {
this.out._c();
}
}

_n(s: Stream<T>) {
if (this.active) {
this.seq.push(s);
} else {
this.active = true;
s._add(new FSInner(this.out, this));
}
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.open = false;
if (this.seq.length === 0) {
this.out._c();
}
}
}

export default function flattenSequentially<T>(): (ins: Stream<Stream<T>>) => Stream<T> {
return function flattenSequentiallyOperator(ins: Stream<Stream<T>>): Stream<T> {
return new Stream<T>(new FlattenSeqOperator(ins));
};
}
135 changes: 135 additions & 0 deletions tests/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import xs from '../../src/index';
import flattenSequentially from '../../src/extra/flattenSequentially';
import * as assert from 'assert';

describe('flattenSequentially (extra)', () => {
describe('with map', () => {
it('should expand each periodic event with 3 sync events', (done) => {
const stream = xs.periodic(100).take(3)
.map(i => xs.of(1 + i, 2 + i, 3 + i))
.compose(flattenSequentially());
const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5];
const listener = {
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
};
stream.addListener(listener);
});

it('should expand each sync event as a periodic stream and concatenate', (done) => {
const stream = xs.of(1, 2, 3)
.map(i => xs.periodic(100).take(3).map(x => `${i}${x}`))
.compose(flattenSequentially());
const expected = ['10', '11', '12', '20', '21', '22', '30', '31', '32'];
const listener = {
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
};
stream.addListener(listener);
});

it('should expand 3 sync events as a periodic each', (done) => {
const stream = xs.of(0, 1, 2)
.map(i => xs.periodic(100 * i).take(2).map(x => `${i}${x}`))
.compose(flattenSequentially());
// ---x---x---x---x---x---x
// ---00--01
// -------10------11
// -----------20----------21
const expected = ['00', '01', '10', '11', '20', '21'];
const listener = {
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
};
stream.addListener(listener);
});

it('should expand 3 async events as a periodic each', (done) => {
const stream = xs.periodic(140).take(3)
.map(i =>
xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`)
)
.compose(flattenSequentially());
// ---x---x---x---x---x---x---x---x---x---x---x---x
// ---00--01--02
// ----10--11--12
// ------------20-----------21----------22
const expected = ['00', '01', '02', '10', '11', '12', '20', '21', '22'];
stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});

it('should expand 3 async events as a periodic each, no optimization', (done) => {
const stream = xs.periodic(140).take(3)
.map(i =>
xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`)
)
.filter(() => true) // breaks an optimization map+flattenSequentially
.compose(flattenSequentially());
// ---x---x---x---x---x---x---x---x---x---x---x---x
// ---00--01--02
// ----10--11--12
// ------------20-----------21----------22

const expected = ['00', '01', '02', '10', '11', '12', '20', '21', '22'];
const listener = {
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
}
};
stream.addListener(listener);
});

it('should propagate user mistakes in project as errors', (done) => {
const source = xs.periodic(30).take(1);
const stream = source.map(
x => {
const y = (<string> <any> x).toLowerCase();
return xs.of(y);
}
).compose(flattenSequentially());

stream.addListener({
next: () => done('next should not be called'),
error: (err) => {
assert.notStrictEqual(err.message.match(/is not a function$/), null);
done();
},
complete: () => {
done('complete should not be called');
},
});
});
});
});
1 change: 1 addition & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"src/extra/debounce.ts",
"src/extra/delay.ts",
"src/extra/dropRepeats.ts",
"src/extra/flattenSequentially.ts",
"src/extra/fromEvent.ts",
"src/extra/pairwise.ts",
"src/index.ts"
Expand Down

0 comments on commit 4a6e63e

Please sign in to comment.