Skip to content

Commit

Permalink
fix(flattenConcurrently): fix TypeScript output type
Browse files Browse the repository at this point in the history
Fix issue #4.
  • Loading branch information
staltz committed Apr 30, 2016
1 parent 26f2241 commit b5445a5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
9 changes: 5 additions & 4 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1647,11 +1647,12 @@ export class Stream<T> implements InternalListener<T> {
*
* @return {Stream}
*/
flattenConcurrently<R, T extends Stream<R>>(): T {
flattenConcurrently<R>(): T {
const p = this._prod;
return <T> new Stream<R>(p instanceof MapOperator || p instanceof FilterMapOperator ?
new MapFlattenConcOperator(<MapOperator<R, Stream<R>>> <any> p) :
new FlattenConcOperator(<Stream<Stream<R>>> <any> this)
return <T> <any> new Stream<R>(
p instanceof MapOperator || p instanceof FilterMapOperator ?
new MapFlattenConcOperator(<MapOperator<R, Stream<R>>> <any> p) :
new FlattenConcOperator(<Stream<Stream<R>>> <any> this)
);
}

Expand Down
48 changes: 33 additions & 15 deletions tests/operator/flattenConcurrently.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import xs from '../../src/index';
import xs, {Stream, Listener} from '../../src/index';
import * as assert from 'assert';

describe('Stream.prototype.flattenConcurrently', () => {
describe('with map', () => {
it('should expand each periodic event with 3 sync events', (done) => {
const stream = xs.periodic(100).take(3)
.map(i => xs.of(1 + i, 2 + i, 3 + i))
.flattenConcurrently();
.map(i => xs.of(1 + i, 2 + i, 3 + i))
.flattenConcurrently();
const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5];
const listener = {

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
Expand All @@ -17,8 +18,24 @@ describe('Stream.prototype.flattenConcurrently', () => {
assert.equal(expected.length, 0);
done();
},
};
stream.addListener(listener);
});
});

it('should return a flat stream with correct TypeScript types', (done) => {
const streamStrings: Stream<string> = Stream.create({
start: (listener: Listener<string>) => {},
stop: () => {}
});

const streamBooleans: Stream<boolean> = Stream.create({
start: (listener: Listener<boolean>) => {},
stop: () => {}
});

// Type checked by the compiler. Without Stream<boolean> it does not compile.
const flat: Stream<boolean> = streamStrings.map(x => streamBooleans)
.flattenConcurrently();
done();
});

it('should expand 3 sync events as a periodic each', (done) => {
Expand All @@ -30,17 +47,17 @@ describe('Stream.prototype.flattenConcurrently', () => {
// -------10------11
// -----------20----------21
const expected = ['00', '01', '10', '20', '11', '21'];
const listener = {
next: (x: number) => {

stream.addListener({
next: (x: string) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
};
stream.addListener(listener);
});
});

it('should expand 3 async events as a periodic each', (done) => {
Expand All @@ -54,8 +71,9 @@ describe('Stream.prototype.flattenConcurrently', () => {
// ----10--11--12
// ------------20-----------21----------22
const expected = ['00', '01', '10', '02', '11', '12', '20', '21', '22'];

stream.addListener({
next: (x: number) => {
next: (x: string) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
Expand All @@ -79,17 +97,17 @@ describe('Stream.prototype.flattenConcurrently', () => {
// ------------20-----------21----------22

const expected = ['00', '01', '10', '02', '11', '12', '20', '21', '22'];
const listener = {
next: (x: number) => {

stream.addListener({
next: (x: string) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
}
};
stream.addListener(listener);
});
});

it('should propagate user mistakes in project as errors', (done) => {
Expand Down

0 comments on commit b5445a5

Please sign in to comment.