Skip to content

Commit

Permalink
feat(forkJoin): accept array of observable as parameter
Browse files Browse the repository at this point in the history
relates to #594
  • Loading branch information
kwonoj authored and benlesh committed Nov 18, 2015
1 parent 24086c0 commit d45f672
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 34 deletions.
119 changes: 104 additions & 15 deletions spec/observables/forkJoin-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,80 @@ describe('Observable.forkJoin', function () {
hot('--a--b--c--d--|'),
hot('(b|)'),
hot('--1--2--3--|')
);
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
});

it('should join the last values of the provided observables with selector', function () {
function selector(x, y, z) {
return x + y + z;
}

var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('(b|)'),
hot('--1--2--3--|'),
selector
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: 'db3'});
});

it('should accept single observable', function () {
var e1 = Observable.forkJoin(
hot('--a--b--c--d--|')
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: ['d']});
});

it('should accept array of observable contains single', function () {
var e1 = Observable.forkJoin(
[hot('--a--b--c--d--|')]
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: ['d']});
});

it('should accept single observable with selector', function () {
function selector(x) {
return x + x;
}

var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
selector
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: 'dd'});
});

it('should accept array of observable contains single with selector', function () {
function selector(x) {
return x + x;
}

var e1 = Observable.forkJoin(
[hot('--a--b--c--d--|')],
selector
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: 'dd'});
});

it('should accept lowercase-o observables', function () {
var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('(b|)'),
lowerCaseO('1', '2', '3')
);
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
Expand All @@ -29,7 +91,7 @@ describe('Observable.forkJoin', function () {
var e1 = Observable.forkJoin(
Observable.of(1),
Promise.resolve(2)
);
);

e1.subscribe(function (x) {
expect(x).toEqual([1,2]);
Expand All @@ -40,18 +102,45 @@ describe('Observable.forkJoin', function () {
done);
});

it('forkJoin n-ary parameters empty', function () {
it('should accept array of observables', function () {
var e1 = Observable.forkJoin(
[hot('--a--b--c--d--|'),
hot('(b|)'),
hot('--1--2--3--|')]
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
});

it('should accept array of observables with selector', function () {
function selector(x, y, z) {
return x + y + z;
}

var e1 = Observable.forkJoin(
[hot('--a--b--c--d--|'),
hot('(b|)'),
hot('--1--2--3--|')],
selector
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: 'db3'});
});

it('should not emit if any of source observable is empty', function () {
var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('(b|)'),
hot('------------------|')
);
);
var expected = '------------------|';

expectObservable(e1).toBe(expected);
});

it('forkJoin n-ary parameters empty before end', function () {
it('should complete early if any of source is empty and completes before than others', function () {
var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('(b|)'),
Expand All @@ -62,7 +151,7 @@ describe('Observable.forkJoin', function () {
expectObservable(e1).toBe(expected);
});

it('forkJoin empty empty', function () {
it('should complete when all sources are empty', function () {
var e1 = Observable.forkJoin(
hot('--------------|'),
hot('---------|')
Expand All @@ -72,14 +161,14 @@ describe('Observable.forkJoin', function () {
expectObservable(e1).toBe(expected);
});

it('forkJoin none', function () {
it('should complete if source is not provided', function () {
var e1 = Observable.forkJoin();
var expected = '|';

expectObservable(e1).toBe(expected);
});

it('forkJoin empty return', function () {
it('should complete when any of source is empty with selector', function () {
function selector(x, y) {
return x + y;
}
Expand All @@ -93,7 +182,7 @@ describe('Observable.forkJoin', function () {
expectObservable(e1).toBe(expected);
});

it('forkJoin return return', function () {
it('should emit results by resultselector', function () {
function selector(x, y) {
return x + y;
}
Expand All @@ -107,7 +196,7 @@ describe('Observable.forkJoin', function () {
expectObservable(e1).toBe(expected, {x: 'd2'});
});

it('forkJoin empty throw', function () {
it('should raise error when any of source raises error with empty observable', function () {
var e1 = Observable.forkJoin(
hot('------#'),
hot('---------|'));
Expand All @@ -116,7 +205,7 @@ describe('Observable.forkJoin', function () {
expectObservable(e1).toBe(expected);
});

it('forkJoin empty throw', function () {
it('should raise error when any of source raises error with selector with empty observable', function () {
function selector(x, y) {
return x + y;
}
Expand All @@ -130,7 +219,7 @@ describe('Observable.forkJoin', function () {
expectObservable(e1).toBe(expected);
});

it('forkJoin return throw', function () {
it('should raise error when source raises error', function () {
var e1 = Observable.forkJoin(
hot('------#'),
hot('---a-----|'));
Expand All @@ -139,7 +228,7 @@ describe('Observable.forkJoin', function () {
expectObservable(e1).toBe(expected);
});

it('forkJoin return throw', function () {
it('should raise error when source raises error with selector', function () {
function selector(x, y) {
return x + y;
}
Expand All @@ -152,4 +241,4 @@ describe('Observable.forkJoin', function () {

expectObservable(e1).toBe(expected);
});
});
});
5 changes: 4 additions & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ export class Observable<T> implements CoreOperators<T> {
static concat: <T>(...observables: Array<Observable<any> | Scheduler>) => Observable<T>;
static defer: <T>(observableFactory: () => Observable<T>) => Observable<T>;
static empty: <T>(scheduler?: Scheduler) => Observable<T>;
static forkJoin: (...sources: Array<Observable<any> | Promise<any> | ((...values: Array<any>) => any)>) => Observable<any>;
static forkJoin: (...sources: Array<Observable<any> |
Array<Observable<any>> |
Promise<any> |
((...values: Array<any>) => any)>) => Observable<any>;
static from: <T>(iterable: any, scheduler?: Scheduler) => Observable<T>;
static fromArray: <T>(array: T[], scheduler?: Scheduler) => Observable<T>;
static fromEvent: <T>(element: any, eventName: string, selector?: (...args: Array<any>) => T) => Observable<T>;
Expand Down
36 changes: 18 additions & 18 deletions src/observables/ForkJoinObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,41 @@ import {Subscriber} from '../Subscriber';
import {PromiseObservable} from './PromiseObservable';
import {EmptyObservable} from './EmptyObservable';
import {isPromise} from '../util/isPromise';
import {isArray} from '../util/isArray';

export class ForkJoinObservable<T> extends Observable<T> {
constructor(private sources: Array<Observable<any> |
Promise<any> |
((...values: Array<any>) => any)>) {
super();
constructor(private sources: Array<Observable<any> | Promise<any>>,
private resultSelector?: (...values: Array<any>) => any) {
super();
}

static create(...sources: Array<Observable<any> |
Array<Observable<any>> |
Promise<any> |
((...values: Array<any>) => any)>)
: Observable<any> {
if (sources === null || sources.length === 0) {
((...values: Array<any>) => any)>): Observable<any> {
if (sources === null || arguments.length === 0) {
return new EmptyObservable();
}
return new ForkJoinObservable(sources);
}

private getResultSelector(): (...values: Array<any>) => any {
const sources = this.sources;
let resultSelector: (...values: Array<any>) => any = null;
if (typeof sources[sources.length - 1] === 'function') {
resultSelector = <(...values: Array<any>) => any>sources.pop();
}

let resultSelector = sources[sources.length - 1];
if (typeof resultSelector !== 'function') {
return null;
// if the first and only other argument besides the resultSelector is an array
// assume it's been called with `forkJoin([obs1, obs2, obs3], resultSelector)`
if (sources.length === 1 && isArray(sources[0])) {
sources = <Array<Observable<any>>>sources[0];
}
this.sources.pop();
return <(...values: Array<any>) => any>resultSelector;

return new ForkJoinObservable(<Array<Observable<any> | Promise<any>>>sources, resultSelector);
}

_subscribe(subscriber: Subscriber<any>) {
let resultSelector = this.getResultSelector();
const sources = this.sources;
const len = sources.length;

const context = { completed: 0, total: len, values: emptyArray(len), selector: resultSelector };
const context = { completed: 0, total: len, values: emptyArray(len), selector: this.resultSelector };
for (let i = 0; i < len; i++) {
let source = sources[i];
if (isPromise(source)) {
Expand Down

0 comments on commit d45f672

Please sign in to comment.