From d29324501921f56b41f3b43481b30f184fad97c1 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 7 Feb 2018 14:15:26 -0800 Subject: [PATCH] feat(mergeMap|concatMap|concatMapTo): simplified the signatures BREAKING CHANGE: mergeMap, concatMap and concatMapTo no longer support a result selector, if you need to use a result selector, use the following pattern: `source.mergeMap(x => of(x + x).pipe(map(y => y + x))` (the pattern would be the same for `concatMap`). --- spec/operators/concatMap-spec.ts | 104 +-------- spec/operators/concatMapTo-spec.ts | 97 +------- spec/operators/mergeMap-spec.ts | 216 ------------------ src/internal/operators/concatMap.ts | 18 +- src/internal/operators/concatMapTo.ts | 20 +- src/internal/operators/mergeAll.ts | 7 +- src/internal/operators/mergeMap.ts | 63 ++--- src/internal/patching/operator/concatMap.ts | 18 +- src/internal/patching/operator/concatMapTo.ts | 18 +- src/internal/patching/operator/mergeMap.ts | 20 +- 10 files changed, 36 insertions(+), 545 deletions(-) diff --git a/spec/operators/concatMap-spec.ts b/spec/operators/concatMap-spec.ts index 25a4686a02..a1f0bb3832 100644 --- a/spec/operators/concatMap-spec.ts +++ b/spec/operators/concatMap-spec.ts @@ -538,6 +538,7 @@ describe('Observable.prototype.concatMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); +<<<<<<< HEAD it('should concatMap many outer to inner arrays, using resultSelector', () => { const e1 = hot('2-----4--------3--------2-------|'); const e1subs = '^ !'; @@ -573,6 +574,8 @@ describe('Observable.prototype.concatMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); +======= +>>>>>>> feat(mergeMap|concatMap|concatMapTo): simplified the signatures it('should mergeMap many outer to inner arrays, outer unsubscribed early', () => { const e1 = hot('2-----4--------3--------2-------|'); const e1subs = '^ ! '; @@ -585,19 +588,6 @@ describe('Observable.prototype.concatMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should concatMap many outer to inner arrays, resultSelector, outer unsubscribed', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const e1subs = '^ ! '; - const unsub = ' ! '; - const expected = '(44)--(8888)-- '; - - const result = e1.concatMap((value) => arrayRepeat(value, +value), - (x, y) => String(parseInt(x) + parseInt(y))); - - expectObservable(result, unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - it('should concatMap many outer to inner arrays, project throws', () => { const e1 = hot('2-----4--------3--------2-------|'); const e1subs = '^ ! '; @@ -615,45 +605,7 @@ describe('Observable.prototype.concatMap', () => { expectObservable(result).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - - it('should concatMap many outer to inner arrays, resultSelector throws', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const e1subs = '^ ! '; - const expected = '(44)--(8888)---# '; - - const result = e1.concatMap((value) => arrayRepeat(value, +value), - (inner, outer) => { - if (outer === '3') { - throw 'error'; - } - return String(parseInt(outer) + parseInt(inner)); - }); - - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - - it('should concatMap many outer to inner arrays, resultSelector, project throws', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const e1subs = '^ ! '; - const expected = '(44)--(8888)---# '; - - let invoked = 0; - const result = e1.concatMap((value) => { - invoked++; - if (invoked === 3) { - throw 'error'; - } - return arrayRepeat(value, +value); - }, (inner, outer) => { - return String(parseInt(outer) + parseInt(inner)); - }); - - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - - it('should map values to constant resolved promises and concatenate', (done) => { + it('should map values to constant resolved promises and concatenate', (done: MochaDone) => { const source = Rx.Observable.from([4, 3, 2, 1]); const project = (value: number) => Observable.from(Promise.resolve(42)); @@ -714,52 +666,4 @@ describe('Observable.prototype.concatMap', () => { done(new Error('Subscriber complete handler not supposed to be called.')); }); }); - - it('should concatMap values to resolved promises with resultSelector', (done) => { - const source = Rx.Observable.from([4, 3, 2, 1]); - const resultSelectorCalledWith: number[][] = []; - const project = (value: number, index: number) => Observable.from((Promise.resolve([value, index]))); - - const resultSelector = function (outerVal: any, innerVal: any, outerIndex: any, innerIndex: any): number { - resultSelectorCalledWith.push([].slice.call(arguments)); - return 8; - }; - - const results: number[] = []; - const expectedCalls = [ - [4, [4, 0], 0, 0], - [3, [3, 1], 1, 0], - [2, [2, 2], 2, 0], - [1, [1, 3], 3, 0] - ]; - source.concatMap(project, resultSelector).subscribe( - (x) => { - results.push(x); - }, (err) => { - done(new Error('Subscriber error handler not supposed to be called.')); - }, () => { - expect(results).to.deep.equal([8, 8, 8, 8]); - expect(resultSelectorCalledWith).to.deep.equal(expectedCalls); - done(); - }); - }); - - it('should concatMap values to rejected promises with resultSelector', (done) => { - const source = Rx.Observable.from([4, 3, 2, 1]); - const project = (value: number, index: number) => Observable.from(Promise.reject('' + value + '-' + index)); - - const resultSelector = () => { - throw 'this should not be called'; - }; - - source.concatMap(project, resultSelector).subscribe( - (x) => { - done(new Error('Subscriber next handler not supposed to be called.')); - }, (err) => { - expect(err).to.deep.equal('4-0'); - done(); - }, () => { - done(new Error('Subscriber complete handler not supposed to be called.')); - }); - }); }); diff --git a/spec/operators/concatMapTo-spec.ts b/spec/operators/concatMapTo-spec.ts index 63b88a2cac..61774397de 100644 --- a/spec/operators/concatMapTo-spec.ts +++ b/spec/operators/concatMapTo-spec.ts @@ -252,16 +252,6 @@ describe('Observable.prototype.concatMapTo', () => { expectObservable(result).toBe(expected); }); - it('should concatMapTo many outer to inner arrays, using resultSelector', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const expected = '(2345)(4567)---(3456)---(2345)--|'; - - const result = e1.concatMapTo(['0', '1', '2', '3'], - (x, y) => String(parseInt(x) + parseInt(y))); - - expectObservable(result).toBe(expected); - }); - it('should concatMapTo many outer to inner arrays, and outer throws', () => { const e1 = hot('2-----4--------3--------2-------#'); const expected = '(0123)(0123)---(0123)---(0123)--#'; @@ -271,16 +261,6 @@ describe('Observable.prototype.concatMapTo', () => { expectObservable(result).toBe(expected); }); - it('should concatMapTo many outer to inner arrays, resultSelector, outer throws', () => { - const e1 = hot('2-----4--------3--------2-------#'); - const expected = '(2345)(4567)---(3456)---(2345)--#'; - - const result = e1.concatMapTo(['0', '1', '2', '3'], - (x, y) => String(parseInt(x) + parseInt(y))); - - expectObservable(result).toBe(expected); - }); - it('should mergeMap many outer to inner arrays, outer unsubscribed early', () => { const e1 = hot('2-----4--------3--------2-------|'); const unsub = ' !'; @@ -291,32 +271,7 @@ describe('Observable.prototype.concatMapTo', () => { expectObservable(result, unsub).toBe(expected); }); - it('should concatMapTo many outer to inner arrays, resultSelector, outer unsubscribed', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const unsub = ' !'; - const expected = '(2345)(4567)--'; - - const result = e1.concatMapTo(['0', '1', '2', '3'], - (x, y) => String(parseInt(x) + parseInt(y))); - - expectObservable(result, unsub).toBe(expected); - }); - - it('should concatMapTo many outer to inner arrays, resultSelector throws', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const expected = '(2345)(4567)---#'; - - const result = e1.concatMapTo(['0', '1', '2', '3'], (x, y) => { - if (x === '3') { - throw 'error'; - } - return String(parseInt(x) + parseInt(y)); - }); - - expectObservable(result).toBe(expected); - }); - - it('should map values to constant resolved promises and concatenate', (done) => { + it('should map values to constant resolved promises and concatenate', (done: MochaDone) => { const source = Rx.Observable.from([4, 3, 2, 1]); const results: number[] = []; @@ -348,54 +303,4 @@ describe('Observable.prototype.concatMapTo', () => { done(new Error('Subscriber complete handler not supposed to be called.')); }); }); - - it('should concatMapTo values to resolved promises with resultSelector', (done) => { - const source = Rx.Observable.from([4, 3, 2, 1]); - const resultSelectorCalledWith: number[][] = []; - const inner = Observable.from(Promise.resolve(42)); - const resultSelector = function (outerVal: number, innerVal: number, outerIndex: number, innerIndex: number) { - resultSelectorCalledWith.push([].slice.call(arguments)); - return 8; - }; - - const results: number[] = []; - const expectedCalls = [ - [4, 42, 0, 0], - [3, 42, 1, 0], - [2, 42, 2, 0], - [1, 42, 3, 0] - ]; - source.concatMapTo(inner, resultSelector).subscribe( - (x) => { - results.push(x); - }, - (err) => { - done(new Error('Subscriber error handler not supposed to be called.')); - }, - () => { - expect(results).to.deep.equal([8, 8, 8, 8]); - expect(resultSelectorCalledWith).to.deep.equal(expectedCalls); - done(); - }); - }); - - it('should concatMapTo values to rejected promises with resultSelector', (done) => { - const source = Rx.Observable.from([4, 3, 2, 1]); - const inner = Observable.from(Promise.reject(42)); - const resultSelector = () => { - throw 'this should not be called'; - }; - - source.concatMapTo(inner, resultSelector).subscribe( - (x) => { - done(new Error('Subscriber next handler not supposed to be called.')); - }, - (err) => { - expect(err).to.equal(42); - done(); - }, - () => { - done(new Error('Subscriber complete handler not supposed to be called.')); - }); - }); }); diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index 4aaecef9d3..39a989a99a 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -115,60 +115,6 @@ describe('Observable.prototype.mergeMap', () => { }); }); - it('should mergeMap values to resolved promises with resultSelector', (done) => { - const source = Rx.Observable.from([4, 3, 2, 1]); - const resultSelectorCalledWith: number[][] = []; - const project = function (value: number, index: number) { - return Observable.from(Promise.resolve([value, index])); - }; - const resultSelector = function (outerVal: number, innerVal: number[], outerIndex: number, innerIndex: number) { - resultSelectorCalledWith.push([].slice.call(arguments)); - return 8; - }; - - const results: number[] = []; - const expectedCalls = [ - [4, [4, 0], 0, 0], - [3, [3, 1], 1, 0], - [2, [2, 2], 2, 0], - [1, [1, 3], 3, 0], - ]; - source.mergeMap(project, resultSelector).subscribe( - (x) => { - results.push(x); - }, - (err) => { - done(new Error('Subscriber error handler not supposed to be called.')); - }, - () => { - expect(results).to.deep.equal([8, 8, 8, 8]); - expect(resultSelectorCalledWith).to.deep.equal(expectedCalls); - done(); - }); - }); - - it('should mergeMap values to rejected promises with resultSelector', (done) => { - const source = Rx.Observable.from([4, 3, 2, 1]); - const project = function (value: number, index: number) { - return Observable.from(Promise.reject('' + value + '-' + index)); - }; - const resultSelector = () => { - throw 'this should not be called'; - }; - - source.mergeMap(project, resultSelector).subscribe( - (x) => { - done(new Error('Subscriber next handler not supposed to be called.')); - }, - (err) => { - expect(err).to.equal('4-0'); - done(); - }, - () => { - done(new Error('Subscriber complete handler not supposed to be called.')); - }); - }); - it('should mergeMap many outer values to many inner values', () => { const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; const e1 = hot('-a-------b-------c-------d-------| '); @@ -283,92 +229,6 @@ describe('Observable.prototype.mergeMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should mergeMap to many cold Observable, with parameter concurrency=1', () => { - const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; - const e1 = hot('-a-------b-------c---| '); - const e1subs = '^ !'; - const inner = cold('----i---j---k---l---| ', values); - const innersubs = [' ^ ! ', - ' ^ ! ', - ' ^ !']; - const expected = '-----i---j---k---l-------i---j---k---l-------i---j---k---l---|'; - - function project() { return inner; } - function resultSelector(oV: string, iV: string, oI: number, iI: number) { return iV; } - const result = e1.mergeMap(project, resultSelector, 1); - - expectObservable(result).toBe(expected, values); - expectSubscriptions(inner.subscriptions).toBe(innersubs); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - - it('should mergeMap to many cold Observable, with parameter concurrency=2', () => { - const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; - const e1 = hot('-a-------b-------c---| '); - const e1subs = '^ !'; - const inner = cold('----i---j---k---l---| ', values); - const innersubs = [' ^ ! ', - ' ^ ! ', - ' ^ !']; - const expected = '-----i---j---(ki)(lj)k---(li)j---k---l---|'; - - function project() { return inner; } - function resultSelector(oV: string, iV: string, oI: number, iI: number) { return iV; } - const result = e1.mergeMap(project, resultSelector, 2); - - expectObservable(result).toBe(expected, values); - expectSubscriptions(inner.subscriptions).toBe(innersubs); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - - it('should mergeMap to many hot Observable, with parameter concurrency=1', () => { - const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; - const e1 = hot('-a-------b-------c---| '); - const e1subs = '^ !'; - const hotA = hot('x----i---j---k---l---| ', values); - const hotB = hot('-x-x-xxxx-x-x-xxxxx-x----i---j---k---l---| ', values); - const hotC = hot('x-xxxx---x-x-x-x-x-xx--x--x-x--x--xxxx-x-----i---j---k---l---|', values); - const asubs = ' ^ ! '; - const bsubs = ' ^ ! '; - const csubs = ' ^ !'; - const expected = '-----i---j---k---l-------i---j---k---l-------i---j---k---l---|'; - const inners = { a: hotA, b: hotB, c: hotC }; - - function project(x: string) { return inners[x]; } - function resultSelector(oV: string, iV: string, oI: number, iI: number) { return iV; } - const result = e1.mergeMap(project, resultSelector, 1); - - expectObservable(result).toBe(expected, values); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - expectSubscriptions(hotA.subscriptions).toBe(asubs); - expectSubscriptions(hotB.subscriptions).toBe(bsubs); - expectSubscriptions(hotC.subscriptions).toBe(csubs); - }); - - it('should mergeMap to many hot Observable, with parameter concurrency=2', () => { - const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; - const e1 = hot('-a-------b-------c---| '); - const e1subs = '^ !'; - const hotA = hot('x----i---j---k---l---| ', values); - const hotB = hot('-x-x-xxxx----i---j---k---l---| ', values); - const hotC = hot('x-xxxx---x-x-x-x-x-xx----i---j---k---l---|', values); - const asubs = ' ^ ! '; - const bsubs = ' ^ ! '; - const csubs = ' ^ !'; - const expected = '-----i---j---(ki)(lj)k---(li)j---k---l---|'; - const inners = { a: hotA, b: hotB, c: hotC }; - - function project(x: string) { return inners[x]; } - function resultSelector(oV: string, iV: string, oI: number, iI: number) { return iV; } - const result = e1.mergeMap(project, resultSelector, 2); - - expectObservable(result).toBe(expected, values); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - expectSubscriptions(hotA.subscriptions).toBe(asubs); - expectSubscriptions(hotB.subscriptions).toBe(bsubs); - expectSubscriptions(hotC.subscriptions).toBe(csubs); - }); - it('should mergeMap to many cold Observable, with parameter concurrency=1, without resultSelector', () => { const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; const e1 = hot('-a-------b-------c---| '); @@ -616,18 +476,6 @@ describe('Observable.prototype.mergeMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should mergeMap many outer to inner arrays, using resultSelector', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const e1subs = '^ !'; - const expected = '(44)--(8888)---(666)----(44)----|'; - - const source = e1.mergeMap((value) => arrayRepeat(value, +value), - (x, y) => String(parseInt(x) + (+y))); - - expectObservable(source).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - it('should mergeMap many outer to inner arrays, and outer throws', () => { const e1 = hot('2-----4--------3--------2-------#'); const e1subs = '^ !'; @@ -639,18 +487,6 @@ describe('Observable.prototype.mergeMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should mergeMap many outer to inner arrays, resultSelector, outer throws', () => { - const e1 = hot('2-----4--------3--------2-------#'); - const e1subs = '^ !'; - const expected = '(44)--(8888)---(666)----(44)----#'; - - const source = e1.mergeMap((value) => arrayRepeat(value, +value), - (x, y) => String(parseInt(x) + (+y))); - - expectObservable(source).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - it('should mergeMap many outer to inner arrays, outer gets unsubscribed', () => { const e1 = hot('2-----4--------3--------2-------|'); const unsub = ' ! '; @@ -663,19 +499,6 @@ describe('Observable.prototype.mergeMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should mergeMap many outer to inner arrays, resultSelector, outer unsubscribed', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const unsub = ' ! '; - const e1subs = '^ ! '; - const expected = '(44)--(8888)-- '; - - const source = e1.mergeMap((value) => arrayRepeat(value, +value), - (x, y) => String(parseInt(x) + (+y))); - - expectObservable(source, unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - it('should mergeMap many outer to inner arrays, project throws', () => { const e1 = hot('2-----4--------3--------2-------|'); const e1subs = '^ ! '; @@ -694,43 +517,6 @@ describe('Observable.prototype.mergeMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should mergeMap many outer to inner arrays, resultSelector throws', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const e1subs = '^ ! '; - const expected = '(44)--(8888)---# '; - - const source = e1.mergeMap((value) => arrayRepeat(value, +value), - (inner, outer) => { - if (outer === '3') { - throw 'error'; - } - return String((+outer) + parseInt(inner)); - }); - - expectObservable(source).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - - it('should mergeMap many outer to inner arrays, resultSelector, project throws', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const e1subs = '^ ! '; - const expected = '(44)--(8888)---# '; - - let invoked = 0; - const source = e1.mergeMap((value) => { - invoked++; - if (invoked === 3) { - throw 'error'; - } - return arrayRepeat(value, +value); - }, (inner, outer) => { - return String((+outer) + parseInt(inner)); - }); - - expectObservable(source).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - it('should map and flatten', () => { const source = Observable.of(1, 2, 3, 4).mergeMap((x) => Observable.of(x + '!')); @@ -769,8 +555,6 @@ describe('Observable.prototype.mergeMap', () => { /* tslint:disable:no-unused-variable */ let a1: Rx.Observable = o.mergeMap(x => x.toString()); let a2: Rx.Observable = o.mergeMap(x => x.toString(), 3); - let a3: Rx.Observable<{ o: number; i: string; }> = o.mergeMap(x => x.toString(), (o, i) => ({ o, i })); - let a4: Rx.Observable<{ o: number; i: string; }> = o.mergeMap(x => x.toString(), (o, i) => ({ o, i }), 3); /* tslint:enable:no-unused-variable */ }); }); diff --git a/src/internal/operators/concatMap.ts b/src/internal/operators/concatMap.ts index 1ac7db96a6..bc8931f935 100644 --- a/src/internal/operators/concatMap.ts +++ b/src/internal/operators/concatMap.ts @@ -1,11 +1,6 @@ import { mergeMap } from './mergeMap'; import { ObservableInput, OperatorFunction } from '../types'; -/* tslint:disable:max-line-length */ -export function concatMap(project: (value: T, index: number) => ObservableInput): OperatorFunction; -export function concatMap(project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction; -/* tslint:enable:max-line-length */ - /** * Projects each source value to an Observable which is merged in the output * Observable, in a serialized fashion waiting for each one to complete before @@ -50,14 +45,6 @@ export function concatMap(project: (value: T, index: number) => Observ * @param {function(value: T, ?index: number): ObservableInput} project A function * that, when applied to an item emitted by the source Observable, returns an * Observable. - * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] - * A function to produce the value on the output Observable based on the values - * and the indices of the source (outer) emission and the inner Observable - * emission. The arguments passed to this function are: - * - `outerValue`: the value that came from the source - * - `innerValue`: the value that came from the projected Observable - * - `outerIndex`: the "index" of the value that came from the source - * - `innerIndex`: the "index" of the value from the projected Observable * @return {Observable} An Observable that emits the result of applying the * projection function (and the optional `resultSelector`) to each item emitted * by the source Observable and taking values from each projected inner @@ -65,7 +52,6 @@ export function concatMap(project: (value: T, index: number) => Observ * @method concatMap * @owner Observable */ -export function concatMap(project: (value: T, index: number) => ObservableInput, - resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { - return mergeMap(project, resultSelector, 1); +export function concatMap(project: (value: T, index: number) => ObservableInput): OperatorFunction { + return mergeMap(project, 1); } diff --git a/src/internal/operators/concatMapTo.ts b/src/internal/operators/concatMapTo.ts index 1399111592..c5536466d4 100644 --- a/src/internal/operators/concatMapTo.ts +++ b/src/internal/operators/concatMapTo.ts @@ -1,11 +1,6 @@ import { concatMap } from './concatMap'; import { ObservableInput, OperatorFunction } from '../types'; -/* tslint:disable:max-line-length */ -export function concatMapTo(observable: ObservableInput): OperatorFunction; -export function concatMapTo(observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction; -/* tslint:enable:max-line-length */ - /** * Projects each source value to the same Observable which is merged multiple * times in a serialized fashion on the output Observable. @@ -48,23 +43,14 @@ export function concatMapTo(observable: ObservableInput, resultSelec * * @param {ObservableInput} innerObservable An Observable to replace each value from * the source Observable. - * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] - * A function to produce the value on the output Observable based on the values - * and the indices of the source (outer) emission and the inner Observable - * emission. The arguments passed to this function are: - * - `outerValue`: the value that came from the source - * - `innerValue`: the value that came from the projected Observable - * - `outerIndex`: the "index" of the value that came from the source - * - `innerIndex`: the "index" of the value from the projected Observable * @return {Observable} An observable of values merged together by joining the * passed observable with itself, one after the other, for each value emitted * from the source. * @method concatMapTo * @owner Observable */ -export function concatMapTo( - innerObservable: ObservableInput, - resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R +export function concatMapTo( + innerObservable: ObservableInput ): OperatorFunction { - return concatMap(() => innerObservable, resultSelector); + return concatMap(() => innerObservable); } diff --git a/src/internal/operators/mergeAll.ts b/src/internal/operators/mergeAll.ts index 328407eb83..7f4a2d5313 100644 --- a/src/internal/operators/mergeAll.ts +++ b/src/internal/operators/mergeAll.ts @@ -1,10 +1,9 @@ import { mergeMap } from './mergeMap'; import { identity } from '../util/identity'; -import { OperatorFunction, ObservableInput } from '../types'; +import { MonoTypeOperatorFunction, OperatorFunction, ObservableInput } from '../types'; export function mergeAll(concurrent?: number): OperatorFunction, T>; -export function mergeAll(concurrent?: number): OperatorFunction; /** * Converts a higher-order Observable into a first-order Observable which @@ -50,6 +49,6 @@ export function mergeAll(concurrent?: number): OperatorFunction; * @method mergeAll * @owner Observable */ -export function mergeAll(concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction { - return mergeMap(identity as (value: T, index: number) => ObservableInput<{}>, null, concurrent); +export function mergeAll(concurrent: number = Number.POSITIVE_INFINITY): MonoTypeOperatorFunction { + return mergeMap(identity as (value: T, index: number) => ObservableInput, concurrent); } diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 059a01e6c0..0756fbe328 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -9,7 +9,6 @@ import { ObservableInput, OperatorFunction } from '../types'; /* tslint:disable:max-line-length */ export function mergeMap(project: (value: T, index: number) => ObservableInput, concurrent?: number): OperatorFunction; -export function mergeMap(project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, concurrent?: number): OperatorFunction; /* tslint:enable:max-line-length */ /** @@ -53,14 +52,6 @@ export function mergeMap(project: (value: T, index: number) => Observab * @param {function(value: T, ?index: number): ObservableInput} project A function * that, when applied to an item emitted by the source Observable, returns an * Observable. - * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] - * A function to produce the value on the output Observable based on the values - * and the indices of the source (outer) emission and the inner Observable - * emission. The arguments passed to this function are: - * - `outerValue`: the value that came from the source - * - `innerValue`: the value that came from the projected Observable - * - `outerIndex`: the "index" of the value that came from the source - * - `innerIndex`: the "index" of the value from the projected Observable * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input * Observables being subscribed to concurrently. * @return {Observable} An Observable that emits the result of applying the @@ -70,27 +61,21 @@ export function mergeMap(project: (value: T, index: number) => Observab * @method mergeMap * @owner Observable */ -export function mergeMap(project: (value: T, index: number) => ObservableInput, - resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number, - concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction { +export function mergeMap(project: (value: T, index: number) => ObservableInput, + concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction { return function mergeMapOperatorFunction(source: Observable) { - if (typeof resultSelector === 'number') { - concurrent = resultSelector; - resultSelector = null; - } - return source.lift(new MergeMapOperator(project, resultSelector, concurrent)); + return source.lift(new MergeMapOperator(project, concurrent)); }; } -export class MergeMapOperator implements Operator { - constructor(private project: (value: T, index: number) => ObservableInput, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, +export class MergeMapOperator implements Operator { + constructor(private project: (value: T, index: number) => ObservableInput, private concurrent: number = Number.POSITIVE_INFINITY) { } - call(observer: Subscriber, source: any): any { + call(observer: Subscriber, source: any): any { return source.subscribe(new MergeMapSubscriber( - observer, this.project, this.resultSelector, this.concurrent + observer, this.project, this.concurrent )); } } @@ -100,15 +85,14 @@ export class MergeMapOperator implements Operator { * @ignore * @extends {Ignored} */ -export class MergeMapSubscriber extends OuterSubscriber { +export class MergeMapSubscriber extends OuterSubscriber { private hasCompleted: boolean = false; private buffer: T[] = []; private active: number = 0; protected index: number = 0; - constructor(destination: Subscriber, - private project: (value: T, index: number) => ObservableInput, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, + constructor(destination: Subscriber, + private project: (value: T, index: number) => ObservableInput, private concurrent: number = Number.POSITIVE_INFINITY) { super(destination); } @@ -122,7 +106,7 @@ export class MergeMapSubscriber extends OuterSubscriber { } protected _tryNext(value: T) { - let result: ObservableInput; + let result: ObservableInput; const index = this.index++; try { result = this.project(value, index); @@ -134,8 +118,8 @@ export class MergeMapSubscriber extends OuterSubscriber { this._innerSub(result, value, index); } - private _innerSub(ish: ObservableInput, value: T, index: number): void { - this.add(subscribeToResult(this, ish, value, index)); + private _innerSub(ish: ObservableInput, value: T, index: number): void { + this.add(subscribeToResult(this, ish, value, index)); } protected _complete(): void { @@ -145,25 +129,10 @@ export class MergeMapSubscriber extends OuterSubscriber { } } - notifyNext(outerValue: T, innerValue: I, + notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - if (this.resultSelector) { - this._notifyResultSelector(outerValue, innerValue, outerIndex, innerIndex); - } else { - this.destination.next(innerValue); - } - } - - private _notifyResultSelector(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) { - let result: R; - try { - result = this.resultSelector(outerValue, innerValue, outerIndex, innerIndex); - } catch (err) { - this.destination.error(err); - return; - } - this.destination.next(result); + innerSub: InnerSubscriber): void { + this.destination.next(innerValue); } notifyComplete(innerSub: Subscription): void { diff --git a/src/internal/patching/operator/concatMap.ts b/src/internal/patching/operator/concatMap.ts index da5813b450..fc33d84849 100644 --- a/src/internal/patching/operator/concatMap.ts +++ b/src/internal/patching/operator/concatMap.ts @@ -2,11 +2,6 @@ import { concatMap as higherOrderConcatMap } from '../../operators/concatMap'; import { Observable } from '../../Observable'; import { ObservableInput } from '../../types'; -/* tslint:disable:max-line-length */ -export function concatMap(this: Observable, project: (value: T, index: number) => ObservableInput): Observable; -export function concatMap(this: Observable, project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable; -/* tslint:enable:max-line-length */ - /** * Projects each source value to an Observable which is merged in the output * Observable, in a serialized fashion waiting for each one to complete before @@ -51,14 +46,6 @@ export function concatMap(this: Observable, project: (value: T, inde * @param {function(value: T, ?index: number): ObservableInput} project A function * that, when applied to an item emitted by the source Observable, returns an * Observable. - * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] - * A function to produce the value on the output Observable based on the values - * and the indices of the source (outer) emission and the inner Observable - * emission. The arguments passed to this function are: - * - `outerValue`: the value that came from the source - * - `innerValue`: the value that came from the projected Observable - * - `outerIndex`: the "index" of the value that came from the source - * - `innerIndex`: the "index" of the value from the projected Observable * @return {Observable} An Observable that emits the result of applying the * projection function (and the optional `resultSelector`) to each item emitted * by the source Observable and taking values from each projected inner @@ -66,7 +53,6 @@ export function concatMap(this: Observable, project: (value: T, inde * @method concatMap * @owner Observable */ -export function concatMap(this: Observable, project: (value: T, index: number) => ObservableInput, - resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { - return higherOrderConcatMap(project, resultSelector)(this); +export function concatMap(this: Observable, project: (value: T, index: number) => ObservableInput) { + return higherOrderConcatMap(project)(this); } diff --git a/src/internal/patching/operator/concatMapTo.ts b/src/internal/patching/operator/concatMapTo.ts index 61d3a19223..1afca9888e 100644 --- a/src/internal/patching/operator/concatMapTo.ts +++ b/src/internal/patching/operator/concatMapTo.ts @@ -2,11 +2,6 @@ import { Observable } from '../../Observable'; import { ObservableInput } from '../../types'; import { concatMapTo as higherOrder } from '../../operators/concatMapTo'; -/* tslint:disable:max-line-length */ -export function concatMapTo(this: Observable, observable: ObservableInput): Observable; -export function concatMapTo(this: Observable, observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable; -/* tslint:enable:max-line-length */ - /** * Projects each source value to the same Observable which is merged multiple * times in a serialized fashion on the output Observable. @@ -49,21 +44,12 @@ export function concatMapTo(this: Observable, observable: Observable * * @param {ObservableInput} innerObservable An Observable to replace each value from * the source Observable. - * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] - * A function to produce the value on the output Observable based on the values - * and the indices of the source (outer) emission and the inner Observable - * emission. The arguments passed to this function are: - * - `outerValue`: the value that came from the source - * - `innerValue`: the value that came from the projected Observable - * - `outerIndex`: the "index" of the value that came from the source - * - `innerIndex`: the "index" of the value from the projected Observable * @return {Observable} An observable of values merged together by joining the * passed observable with itself, one after the other, for each value emitted * from the source. * @method concatMapTo * @owner Observable */ -export function concatMapTo(this: Observable, innerObservable: ObservableInput, - resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable { - return higherOrder(innerObservable, resultSelector)(this); +export function concatMapTo(this: Observable, innerObservable: Observable): Observable { + return higherOrder(innerObservable)(this); } diff --git a/src/internal/patching/operator/mergeMap.ts b/src/internal/patching/operator/mergeMap.ts index 0eac740955..8bb5a3457b 100644 --- a/src/internal/patching/operator/mergeMap.ts +++ b/src/internal/patching/operator/mergeMap.ts @@ -2,11 +2,6 @@ import { Observable } from '../../Observable'; import { ObservableInput } from '../../types'; import { mergeMap as higherOrderMergeMap } from '../../operators/mergeMap'; -/* tslint:disable:max-line-length */ -export function mergeMap(this: Observable, project: (value: T, index: number) => ObservableInput, concurrent?: number): Observable; -export function mergeMap(this: Observable, project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, concurrent?: number): Observable; -/* tslint:enable:max-line-length */ - /** * Projects each source value to an Observable which is merged in the output * Observable. @@ -48,14 +43,6 @@ export function mergeMap(this: Observable, project: (value: T, index * @param {function(value: T, ?index: number): ObservableInput} project A function * that, when applied to an item emitted by the source Observable, returns an * Observable. - * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] - * A function to produce the value on the output Observable based on the values - * and the indices of the source (outer) emission and the inner Observable - * emission. The arguments passed to this function are: - * - `outerValue`: the value that came from the source - * - `innerValue`: the value that came from the projected Observable - * - `outerIndex`: the "index" of the value that came from the source - * - `innerIndex`: the "index" of the value from the projected Observable * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input * Observables being subscribed to concurrently. * @return {Observable} An Observable that emits the result of applying the @@ -65,8 +52,7 @@ export function mergeMap(this: Observable, project: (value: T, index * @method mergeMap * @owner Observable */ -export function mergeMap(this: Observable, project: (value: T, index: number) => ObservableInput, - resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number, - concurrent: number = Number.POSITIVE_INFINITY): Observable { - return higherOrderMergeMap(project, resultSelector, concurrent)(this) as Observable; +export function mergeMap(this: Observable, project: (value: T, index: number) => ObservableInput, + concurrent: number = Number.POSITIVE_INFINITY): Observable { + return higherOrderMergeMap(project, concurrent)(this) as Observable; }