From d5190f78a84c9255b5fcc8863c811c11153cc8d3 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Thu, 12 Nov 2015 10:06:45 -0800 Subject: [PATCH] feat(forkJoin): accept array of observable as parameter relates to #594 --- spec/observables/forkJoin-spec.js | 119 ++++++++++++++++++++++---- src/Observable.ts | 5 +- src/observables/ForkJoinObservable.ts | 39 +++++---- 3 files changed, 130 insertions(+), 33 deletions(-) diff --git a/spec/observables/forkJoin-spec.js b/spec/observables/forkJoin-spec.js index a2dcc8eb7d2..3a107b92a32 100644 --- a/spec/observables/forkJoin-spec.js +++ b/spec/observables/forkJoin-spec.js @@ -8,18 +8,80 @@ describe('Observable.forkJoin', function () { hot('--a--b--c--d--|'), hot('(b|)'), hot('--1--2--3--|') - ); + ); var expected = '--------------(x|)'; expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); }); + it('should join the last values of the provided observables with selector', function () { + function selector(x, y, z) { + return x + y + z; + } + + var e1 = Observable.forkJoin( + hot('--a--b--c--d--|'), + hot('(b|)'), + hot('--1--2--3--|'), + selector + ); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: 'db3'}); + }); + + it('should accept single observable', function () { + var e1 = Observable.forkJoin( + hot('--a--b--c--d--|') + ); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: ['d']}); + }); + + it('should accept array of observable contains single', function () { + var e1 = Observable.forkJoin( + [hot('--a--b--c--d--|')] + ); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: ['d']}); + }); + + it('should accept single observable with selector', function () { + function selector(x) { + return x + x; + } + + var e1 = Observable.forkJoin( + hot('--a--b--c--d--|'), + selector + ); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: 'dd'}); + }); + + it('should accept array of observable contains single with selector', function () { + function selector(x) { + return x + x; + } + + var e1 = Observable.forkJoin( + [hot('--a--b--c--d--|')], + selector + ); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: 'dd'}); + }); + it('should accept lowercase-o observables', function () { var e1 = Observable.forkJoin( hot('--a--b--c--d--|'), hot('(b|)'), lowerCaseO('1', '2', '3') - ); + ); var expected = '--------------(x|)'; expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); @@ -29,7 +91,7 @@ describe('Observable.forkJoin', function () { var e1 = Observable.forkJoin( Observable.of(1), Promise.resolve(2) - ); + ); e1.subscribe(function (x) { expect(x).toEqual([1,2]); @@ -40,18 +102,45 @@ describe('Observable.forkJoin', function () { done); }); - it('forkJoin n-ary parameters empty', function () { + it('should accept array of observables', function () { + var e1 = Observable.forkJoin( + [hot('--a--b--c--d--|'), + hot('(b|)'), + hot('--1--2--3--|')] + ); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']}); + }); + + it('should accept array of observables with selector', function () { + function selector(x, y, z) { + return x + y + z; + } + + var e1 = Observable.forkJoin( + [hot('--a--b--c--d--|'), + hot('(b|)'), + hot('--1--2--3--|')], + selector + ); + var expected = '--------------(x|)'; + + expectObservable(e1).toBe(expected, {x: 'db3'}); + }); + + it('should not emit if any of source observable is empty', function () { var e1 = Observable.forkJoin( hot('--a--b--c--d--|'), hot('(b|)'), hot('------------------|') - ); + ); var expected = '------------------|'; expectObservable(e1).toBe(expected); }); - it('forkJoin n-ary parameters empty before end', function () { + it('should complete early if any of source is empty and completes before than others', function () { var e1 = Observable.forkJoin( hot('--a--b--c--d--|'), hot('(b|)'), @@ -62,7 +151,7 @@ describe('Observable.forkJoin', function () { expectObservable(e1).toBe(expected); }); - it('forkJoin empty empty', function () { + it('should complete when all sources are empty', function () { var e1 = Observable.forkJoin( hot('--------------|'), hot('---------|') @@ -72,14 +161,14 @@ describe('Observable.forkJoin', function () { expectObservable(e1).toBe(expected); }); - it('forkJoin none', function () { + it('should complete if source is not provided', function () { var e1 = Observable.forkJoin(); var expected = '|'; expectObservable(e1).toBe(expected); }); - it('forkJoin empty return', function () { + it('should complete when any of source is empty with selector', function () { function selector(x, y) { return x + y; } @@ -93,7 +182,7 @@ describe('Observable.forkJoin', function () { expectObservable(e1).toBe(expected); }); - it('forkJoin return return', function () { + it('should emit results by resultselector', function () { function selector(x, y) { return x + y; } @@ -107,7 +196,7 @@ describe('Observable.forkJoin', function () { expectObservable(e1).toBe(expected, {x: 'd2'}); }); - it('forkJoin empty throw', function () { + it('should raise error when any of source raises error with empty observable', function () { var e1 = Observable.forkJoin( hot('------#'), hot('---------|')); @@ -116,7 +205,7 @@ describe('Observable.forkJoin', function () { expectObservable(e1).toBe(expected); }); - it('forkJoin empty throw', function () { + it('should raise error when any of source raises error with selector with empty observable', function () { function selector(x, y) { return x + y; } @@ -130,7 +219,7 @@ describe('Observable.forkJoin', function () { expectObservable(e1).toBe(expected); }); - it('forkJoin return throw', function () { + it('should raise error when source raises error', function () { var e1 = Observable.forkJoin( hot('------#'), hot('---a-----|')); @@ -139,7 +228,7 @@ describe('Observable.forkJoin', function () { expectObservable(e1).toBe(expected); }); - it('forkJoin return throw', function () { + it('should raise error when source raises error with selector', function () { function selector(x, y) { return x + y; } @@ -152,4 +241,4 @@ describe('Observable.forkJoin', function () { expectObservable(e1).toBe(expected); }); -}); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index c8764b3be8d..367c7ef12e8 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -138,7 +138,10 @@ export class Observable implements CoreOperators { static concat: (...observables: Array | Scheduler>) => Observable; static defer: (observableFactory: () => Observable) => Observable; static empty: (scheduler?: Scheduler) => Observable; - static forkJoin: (...sources: Array | Promise | ((...values: Array) => any)>) => Observable; + static forkJoin: (...sources: Array | + Array> | + Promise | + ((...values: Array) => any)>) => Observable; static from: (iterable: any, scheduler?: Scheduler) => Observable; static fromArray: (array: T[], scheduler?: Scheduler) => Observable; static fromEvent: (element: any, eventName: string, selector?: (...args: Array) => T) => Observable; diff --git a/src/observables/ForkJoinObservable.ts b/src/observables/ForkJoinObservable.ts index 2de7f579088..87d25a7b453 100644 --- a/src/observables/ForkJoinObservable.ts +++ b/src/observables/ForkJoinObservable.ts @@ -3,41 +3,46 @@ import {Subscriber} from '../Subscriber'; import {PromiseObservable} from './PromiseObservable'; import {EmptyObservable} from './EmptyObservable'; import {isPromise} from '../util/isPromise'; +import {isArray} from '../util/isArray'; export class ForkJoinObservable extends Observable { - constructor(private sources: Array | - Promise | - ((...values: Array) => any)>) { - super(); + constructor(private sources: Array | Promise>, + private resultSelector?: (...values: Array) => any) { + super(); } static create(...sources: Array | + Array> | Promise | ((...values: Array) => any)>) - : Observable { - if (sources === null || sources.length === 0) { + : Observable; + static create(sources: Array | Promise>, + selector?: (...values: Array) => any) + : Observable; + static create(...sources: Array): Observable { + if (sources === null || arguments.length === 0) { return new EmptyObservable(); } - return new ForkJoinObservable(sources); - } - private getResultSelector(): (...values: Array) => any { - const sources = this.sources; + let resultSelector: (...values: Array) => any = null; + if (typeof sources[sources.length - 1] === 'function') { + resultSelector = sources.pop(); + } - let resultSelector = sources[sources.length - 1]; - if (typeof resultSelector !== 'function') { - return null; + // if the first and only other argument besides the resultSelector is an array + // assume it's been called with `forkJoin([obs1, obs2, obs3], resultSelector)` + if (sources.length === 1 && isArray(sources[0])) { + sources = sources[0]; } - this.sources.pop(); - return <(...values: Array) => any>resultSelector; + + return new ForkJoinObservable(sources, resultSelector); } _subscribe(subscriber: Subscriber) { - let resultSelector = this.getResultSelector(); const sources = this.sources; const len = sources.length; - const context = { completed: 0, total: len, values: emptyArray(len), selector: resultSelector }; + const context = { completed: 0, total: len, values: emptyArray(len), selector: this.resultSelector }; for (let i = 0; i < len; i++) { let source = sources[i]; if (isPromise(source)) {