diff --git a/src/core.ts b/src/core.ts index 33de308..a4cde16 100644 --- a/src/core.ts +++ b/src/core.ts @@ -1647,11 +1647,12 @@ export class Stream implements InternalListener { * * @return {Stream} */ - flattenConcurrently>(): T { + flattenConcurrently(): T { const p = this._prod; - return new Stream(p instanceof MapOperator || p instanceof FilterMapOperator ? - new MapFlattenConcOperator(>> p) : - new FlattenConcOperator(>> this) + return new Stream( + p instanceof MapOperator || p instanceof FilterMapOperator ? + new MapFlattenConcOperator(>> p) : + new FlattenConcOperator(>> this) ); } diff --git a/tests/operator/flattenConcurrently.ts b/tests/operator/flattenConcurrently.ts index 1684f86..4990738 100644 --- a/tests/operator/flattenConcurrently.ts +++ b/tests/operator/flattenConcurrently.ts @@ -1,14 +1,15 @@ -import xs from '../../src/index'; +import xs, {Stream, Listener} from '../../src/index'; import * as assert from 'assert'; describe('Stream.prototype.flattenConcurrently', () => { describe('with map', () => { it('should expand each periodic event with 3 sync events', (done) => { const stream = xs.periodic(100).take(3) - .map(i => xs.of(1 + i, 2 + i, 3 + i)) - .flattenConcurrently(); + .map(i => xs.of(1 + i, 2 + i, 3 + i)) + .flattenConcurrently(); const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5]; - const listener = { + + stream.addListener({ next: (x: number) => { assert.equal(x, expected.shift()); }, @@ -17,8 +18,24 @@ describe('Stream.prototype.flattenConcurrently', () => { assert.equal(expected.length, 0); done(); }, - }; - stream.addListener(listener); + }); + }); + + it('should return a flat stream with correct TypeScript types', (done) => { + const streamStrings: Stream = Stream.create({ + start: (listener: Listener) => {}, + stop: () => {} + }); + + const streamBooleans: Stream = Stream.create({ + start: (listener: Listener) => {}, + stop: () => {} + }); + + // Type checked by the compiler. Without Stream it does not compile. + const flat: Stream = streamStrings.map(x => streamBooleans) + .flattenConcurrently(); + done(); }); it('should expand 3 sync events as a periodic each', (done) => { @@ -30,8 +47,9 @@ describe('Stream.prototype.flattenConcurrently', () => { // -------10------11 // -----------20----------21 const expected = ['00', '01', '10', '20', '11', '21']; - const listener = { - next: (x: number) => { + + stream.addListener({ + next: (x: string) => { assert.equal(x, expected.shift()); }, error: (err: any) => done(err), @@ -39,8 +57,7 @@ describe('Stream.prototype.flattenConcurrently', () => { assert.equal(expected.length, 0); done(); }, - }; - stream.addListener(listener); + }); }); it('should expand 3 async events as a periodic each', (done) => { @@ -54,8 +71,9 @@ describe('Stream.prototype.flattenConcurrently', () => { // ----10--11--12 // ------------20-----------21----------22 const expected = ['00', '01', '10', '02', '11', '12', '20', '21', '22']; + stream.addListener({ - next: (x: number) => { + next: (x: string) => { assert.equal(x, expected.shift()); }, error: (err: any) => done(err), @@ -79,8 +97,9 @@ describe('Stream.prototype.flattenConcurrently', () => { // ------------20-----------21----------22 const expected = ['00', '01', '10', '02', '11', '12', '20', '21', '22']; - const listener = { - next: (x: number) => { + + stream.addListener({ + next: (x: string) => { assert.equal(x, expected.shift()); }, error: (err: any) => done(err), @@ -88,8 +107,7 @@ describe('Stream.prototype.flattenConcurrently', () => { assert.equal(expected.length, 0); done(); } - }; - stream.addListener(listener); + }); }); it('should propagate user mistakes in project as errors', (done) => {