From 582c7bead29feef774f3270a302ff446fee3528b Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 7 Feb 2018 14:27:02 -0800 Subject: [PATCH] feat(mergeMapTo): simplify interface - Implements mergeMapTo in terms of mergeMap - Removes resultSelector argument BREAKING CHANGE: `mergeMapTo` no longer accepts a resultSelector, to get this functionality, you'll want to use `mergeMap` and `map` together: `source.pipe(mergeMap(() => inner).pipe(map(y => x + y)))` --- spec/operators/mergeMapTo-spec.ts | 141 ------------------- src/internal/operators/mergeMapTo.ts | 139 +----------------- src/internal/patching/operator/mergeMapTo.ts | 23 +-- 3 files changed, 11 insertions(+), 292 deletions(-) diff --git a/spec/operators/mergeMapTo-spec.ts b/spec/operators/mergeMapTo-spec.ts index 7aaf5cf5c7..d042083008 100644 --- a/spec/operators/mergeMapTo-spec.ts +++ b/spec/operators/mergeMapTo-spec.ts @@ -75,56 +75,6 @@ describe('Observable.prototype.mergeMapTo', () => { }); }); - it('should mergeMapTo values to resolved promises with resultSelector', (done) => { - const source = Rx.Observable.from([4, 3, 2, 1]); - const resultSelectorCalledWith: number[][] = []; - const inner = Observable.from(Promise.resolve(42)); - const resultSelector = function (outerVal: number, innerVal: number, outerIndex: number, innerIndex: number) { - resultSelectorCalledWith.push([].slice.call(arguments)); - return 8; - }; - - const results: number[] = []; - const expectedCalls = [ - [4, 42, 0, 0], - [3, 42, 1, 0], - [2, 42, 2, 0], - [1, 42, 3, 0], - ]; - source.mergeMapTo(inner, resultSelector).subscribe( - (x) => { - results.push(x); - }, - (err) => { - done(new Error('Subscriber error handler not supposed to be called.')); - }, - () => { - expect(results).to.deep.equal([8, 8, 8, 8]); - expect(resultSelectorCalledWith).to.deep.equal(expectedCalls); - done(); - }); - }); - - it('should mergeMapTo values to rejected promises with resultSelector', (done) => { - const source = Rx.Observable.from([4, 3, 2, 1]); - const inner = Observable.from(Promise.reject(42)); - const resultSelector = () => { - throw 'this should not be called'; - }; - - source.mergeMapTo(inner, resultSelector).subscribe( - (x) => { - done(new Error('Subscriber next handler not supposed to be called.')); - }, - (err) => { - expect(err).to.equal(42); - done(); - }, - () => { - done(new Error('Subscriber complete handler not supposed to be called.')); - }); - }); - it('should mergeMapTo many outer values to many inner values', () => { const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; const e1 = hot('-a-------b-------c-------d-------| '); @@ -264,42 +214,6 @@ describe('Observable.prototype.mergeMapTo', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should mergeMapTo many cold Observable, with parameter concurrency=1', () => { - const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; - const e1 = hot('-a-------b-------c---| '); - const e1subs = '^ !'; - const inner = cold('----i---j---k---l---| ', values); - const innersubs = [' ^ ! ', - ' ^ ! ', - ' ^ !']; - const expected = '-----i---j---k---l-------i---j---k---l-------i---j---k---l---|'; - - function resultSelector(oV: string, iV: string, oI: number, iI: number) { return iV; } - const result = e1.mergeMapTo(inner, resultSelector, 1); - - expectObservable(result).toBe(expected, values); - expectSubscriptions(inner.subscriptions).toBe(innersubs); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - - it('should mergeMap to many cold Observable, with parameter concurrency=2', () => { - const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; - const e1 = hot('-a-------b-------c---| '); - const e1subs = '^ !'; - const inner = cold('----i---j---k---l---| ', values); - const innersubs = [' ^ ! ', - ' ^ ! ', - ' ^ !']; - const expected = '-----i---j---(ki)(lj)k---(li)j---k---l---|'; - - function resultSelector(oV: string, iV: string, oI: number, iI: number) { return iV; } - const result = e1.mergeMapTo(inner, resultSelector, 2); - - expectObservable(result).toBe(expected, values); - expectSubscriptions(inner.subscriptions).toBe(innersubs); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - it('should mergeMapTo many cold Observable, with parameter concurrency=1, without resultSelector', () => { const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; const e1 = hot('-a-------b-------c---| '); @@ -345,18 +259,6 @@ describe('Observable.prototype.mergeMapTo', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should mergeMapTo many outer to inner arrays, using resultSelector', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const e1subs = '^ !'; - const expected = '(2345)(4567)---(3456)---(2345)--|'; - - const source = e1.mergeMapTo(['0', '1', '2', '3'], - (x, y) => String(parseInt(x) + parseInt(y))); - - expectObservable(source).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - it('should mergeMapTo many outer to inner arrays, and outer throws', () => { const e1 = hot('2-----4--------3--------2-------#'); const e1subs = '^ !'; @@ -368,18 +270,6 @@ describe('Observable.prototype.mergeMapTo', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should mergeMapTo many outer to inner arrays, resultSelector, outer throws', () => { - const e1 = hot('2-----4--------3--------2-------#'); - const e1subs = '^ !'; - const expected = '(2345)(4567)---(3456)---(2345)--#'; - - const source = e1.mergeMapTo(['0', '1', '2', '3'], - (x, y) => String(parseInt(x) + parseInt(y))); - - expectObservable(source).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - it('should mergeMapTo many outer to inner arrays, outer gets unsubscribed', () => { const e1 = hot('2-----4--------3--------2-------|'); const e1subs = '^ !'; @@ -392,35 +282,6 @@ describe('Observable.prototype.mergeMapTo', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should mergeMapTo many outer to inner arrays, resultSelector, outer unsubscribed', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const e1subs = '^ !'; - const unsub = ' !'; - const expected = '(2345)(4567)--'; - - const source = e1.mergeMapTo(['0', '1', '2', '3'], - (x, y) => String(parseInt(x) + parseInt(y))); - - expectObservable(source, unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - - it('should mergeMapTo many outer to inner arrays, resultSelector throws', () => { - const e1 = hot('2-----4--------3--------2-------|'); - const e1subs = '^ !'; - const expected = '(2345)(4567)---#'; - - const source = e1.mergeMapTo(['0', '1', '2', '3'], (outer, inner) => { - if (outer === '3') { - throw 'error'; - } - return String(parseInt(outer) + parseInt(inner)); - }); - - expectObservable(source).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - it('should map and flatten', () => { const source = Observable.of(1, 2, 3, 4).mergeMapTo(Observable.of('!')); @@ -460,8 +321,6 @@ describe('Observable.prototype.mergeMapTo', () => { /* tslint:disable:no-unused-variable */ let a1: Rx.Observable = o.mergeMapTo(m); let a2: Rx.Observable = o.mergeMapTo(m, 3); - let a3: Rx.Observable<{ o: number; i: string; }> = o.mergeMapTo(m, (o, i) => ({ o, i })); - let a4: Rx.Observable<{ o: number; i: string; }> = o.mergeMapTo(m, (o, i) => ({ o, i }), 3); /* tslint:enable:no-unused-variable */ }); }); diff --git a/src/internal/operators/mergeMapTo.ts b/src/internal/operators/mergeMapTo.ts index a93fd3ca8d..5ed20749e8 100644 --- a/src/internal/operators/mergeMapTo.ts +++ b/src/internal/operators/mergeMapTo.ts @@ -1,16 +1,6 @@ -import { Observable } from '../Observable'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; -import { ObservableInput, OperatorFunction, PartialObserver } from '../types'; - -/* tslint:disable:max-line-length */ -export function mergeMapTo(observable: ObservableInput, concurrent?: number): OperatorFunction; -export function mergeMapTo(observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, concurrent?: number): OperatorFunction; -/* tslint:enable:max-line-length */ +import { Observable, ObservableInput } from '../Observable'; +import { OperatorFunction } from '../../internal/types'; +import { mergeMap } from './mergeMap'; /** * Projects each source value to the same Observable which is merged multiple @@ -39,129 +29,14 @@ export function mergeMapTo(observable: ObservableInput, resultSelect * * @param {ObservableInput} innerObservable An Observable to replace each value from * the source Observable. - * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] - * A function to produce the value on the output Observable based on the values - * and the indices of the source (outer) emission and the inner Observable - * emission. The arguments passed to this function are: - * - `outerValue`: the value that came from the source - * - `innerValue`: the value that came from the projected Observable - * - `outerIndex`: the "index" of the value that came from the source - * - `innerIndex`: the "index" of the value from the projected Observable * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input * Observables being subscribed to concurrently. * @return {Observable} An Observable that emits items from the given - * `innerObservable` (and optionally transformed through `resultSelector`) every - * time a value is emitted on the source Observable. + * `innerObservable` * @method mergeMapTo * @owner Observable */ -export function mergeMapTo(innerObservable: ObservableInput, - resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number, - concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction { - if (typeof resultSelector === 'number') { - concurrent = resultSelector; - resultSelector = null; - } - return (source: Observable) => source.lift(new MergeMapToOperator(innerObservable, resultSelector, concurrent)); -} - -// TODO: Figure out correct signature here: an Operator, R> -// needs to implement call(observer: Subscriber): Subscriber> -export class MergeMapToOperator implements Operator, R> { - constructor(private ish: ObservableInput, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, - private concurrent: number = Number.POSITIVE_INFINITY) { - } - - call(observer: Subscriber, source: any): any { - return source.subscribe(new MergeMapToSubscriber(observer, this.ish, this.resultSelector, this.concurrent)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class MergeMapToSubscriber extends OuterSubscriber { - private hasCompleted: boolean = false; - private buffer: T[] = []; - private active: number = 0; - protected index: number = 0; - - constructor(destination: Subscriber, - private ish: ObservableInput, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, - private concurrent: number = Number.POSITIVE_INFINITY) { - super(destination); - } - - protected _next(value: T): void { - if (this.active < this.concurrent) { - const resultSelector = this.resultSelector; - const index = this.index++; - const ish = this.ish; - const destination = this.destination; - - this.active++; - this._innerSub(ish, destination, resultSelector, value, index); - } else { - this.buffer.push(value); - } - } - - private _innerSub(ish: ObservableInput, - destination: PartialObserver, - resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, - value: T, - index: number): void { - this.add(subscribeToResult(this, ish, value, index)); - } - - protected _complete(): void { - this.hasCompleted = true; - if (this.active === 0 && this.buffer.length === 0) { - this.destination.complete(); - } - } - - notifyNext(outerValue: T, innerValue: I, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - const { resultSelector, destination } = this; - if (resultSelector) { - this.trySelectResult(outerValue, innerValue, outerIndex, innerIndex); - } else { - destination.next(innerValue); - } - } - - private trySelectResult(outerValue: T, innerValue: I, - outerIndex: number, innerIndex: number): void { - const { resultSelector, destination } = this; - let result: R; - try { - result = resultSelector(outerValue, innerValue, outerIndex, innerIndex); - } catch (err) { - destination.error(err); - return; - } - - destination.next(result); - } - - notifyError(err: any): void { - this.destination.error(err); - } - - notifyComplete(innerSub: Subscription): void { - const buffer = this.buffer; - this.remove(innerSub); - this.active--; - if (buffer.length > 0) { - this._next(buffer.shift()); - } else if (this.active === 0 && this.hasCompleted) { - this.destination.complete(); - } - } +export function mergeMapTo(innerObservable: ObservableInput, + concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction { + return mergeMap(() => innerObservable, concurrent); } diff --git a/src/internal/patching/operator/mergeMapTo.ts b/src/internal/patching/operator/mergeMapTo.ts index 6b8b7d7d6b..e66fad61b5 100644 --- a/src/internal/patching/operator/mergeMapTo.ts +++ b/src/internal/patching/operator/mergeMapTo.ts @@ -2,11 +2,6 @@ import { Observable } from '../../Observable'; import { ObservableInput } from '../../types'; import { mergeMapTo as higherOrder } from '../../operators/mergeMapTo'; -/* tslint:disable:max-line-length */ -export function mergeMapTo(this: Observable, observable: ObservableInput, concurrent?: number): Observable; -export function mergeMapTo(this: Observable, observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, concurrent?: number): Observable; -/* tslint:enable:max-line-length */ - /** * Projects each source value to the same Observable which is merged multiple * times in the output Observable. @@ -34,24 +29,14 @@ export function mergeMapTo(this: Observable, observable: ObservableI * * @param {ObservableInput} innerObservable An Observable to replace each value from * the source Observable. - * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] - * A function to produce the value on the output Observable based on the values - * and the indices of the source (outer) emission and the inner Observable - * emission. The arguments passed to this function are: - * - `outerValue`: the value that came from the source - * - `innerValue`: the value that came from the projected Observable - * - `outerIndex`: the "index" of the value that came from the source - * - `innerIndex`: the "index" of the value from the projected Observable * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input * Observables being subscribed to concurrently. * @return {Observable} An Observable that emits items from the given - * `innerObservable` (and optionally transformed through `resultSelector`) every - * time a value is emitted on the source Observable. + * `innerObservable`. * @method mergeMapTo * @owner Observable */ -export function mergeMapTo(this: Observable, innerObservable: ObservableInput, - resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number, - concurrent: number = Number.POSITIVE_INFINITY): Observable { - return higherOrder(innerObservable, resultSelector as any, concurrent)(this) as Observable; +export function mergeMapTo(this: Observable, innerObservable: ObservableInput, + concurrent: number = Number.POSITIVE_INFINITY): Observable { + return higherOrder(innerObservable, concurrent)(this) as Observable; }