From e64685163ef5f6a9bae704cefaf501c74129acdb Mon Sep 17 00:00:00 2001 From: Ben Lesh <ben@benlesh.com> Date: Fri, 16 Jun 2017 02:01:35 -0400 Subject: [PATCH] feat(race): add higher-order lettable version of race Refactors race static to live under `observables\/race` --- src/add/observable/race.ts | 2 +- src/observable/race.ts | 100 +++++++++++++++++++++++++++++++++++++ src/operator/race.ts | 9 +--- src/operators/index.ts | 1 + src/operators/race.ts | 31 ++++++++++++ 5 files changed, 135 insertions(+), 8 deletions(-) create mode 100644 src/observable/race.ts create mode 100644 src/operators/race.ts diff --git a/src/add/observable/race.ts b/src/add/observable/race.ts index e871cf5113..68c58130d1 100644 --- a/src/add/observable/race.ts +++ b/src/add/observable/race.ts @@ -1,5 +1,5 @@ import { Observable } from '../../Observable'; -import { raceStatic } from '../../operator/race'; +import { race as raceStatic } from '../../observable/race'; Observable.race = raceStatic; diff --git a/src/observable/race.ts b/src/observable/race.ts new file mode 100644 index 0000000000..ef1b2feb00 --- /dev/null +++ b/src/observable/race.ts @@ -0,0 +1,100 @@ +import { Observable } from '../Observable'; +import { isArray } from '../util/isArray'; +import { ArrayObservable } from '../observable/ArrayObservable'; +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Subscription, TeardownLogic } from '../Subscription'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; + +/** + * Returns an Observable that mirrors the first source Observable to emit an item. + * @param {...Observables} ...observables sources used to race for which Observable emits first. + * @return {Observable} an Observable that mirrors the output of the first Observable to emit an item. + * @static true + * @name race + * @owner Observable + */ +export function race<T>(observables: Array<Observable<T>>): Observable<T>; +export function race<T>(observables: Array<Observable<any>>): Observable<T>; +export function race<T>(...observables: Array<Observable<T> | Array<Observable<T>>>): Observable<T>; +export function race<T>(...observables: Array<Observable<any> | Array<Observable<any>>>): Observable<T> { + // if the only argument is an array, it was most likely called with + // `race([obs1, obs2, ...])` + if (observables.length === 1) { + if (isArray(observables[0])) { + observables = <Array<Observable<any>>>observables[0]; + } else { + return <Observable<any>>observables[0]; + } + } + + return new ArrayObservable<T>(<any>observables).lift(new RaceOperator<T>()); +} + +export class RaceOperator<T> implements Operator<T, T> { + call(subscriber: Subscriber<T>, source: any): TeardownLogic { + return source.subscribe(new RaceSubscriber(subscriber)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +export class RaceSubscriber<T> extends OuterSubscriber<T, T> { + private hasFirst: boolean = false; + private observables: Observable<any>[] = []; + private subscriptions: Subscription[] = []; + + constructor(destination: Subscriber<T>) { + super(destination); + } + + protected _next(observable: any): void { + this.observables.push(observable); + } + + protected _complete() { + const observables = this.observables; + const len = observables.length; + + if (len === 0) { + this.destination.complete(); + } else { + for (let i = 0; i < len && !this.hasFirst; i++) { + let observable = observables[i]; + let subscription = subscribeToResult(this, observable, observable, i); + + if (this.subscriptions) { + this.subscriptions.push(subscription); + } + this.add(subscription); + } + this.observables = null; + } + } + + notifyNext(outerValue: T, innerValue: T, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber<T, T>): void { + if (!this.hasFirst) { + this.hasFirst = true; + + for (let i = 0; i < this.subscriptions.length; i++) { + if (i !== outerIndex) { + let subscription = this.subscriptions[i]; + + subscription.unsubscribe(); + this.remove(subscription); + } + } + + this.subscriptions = null; + } + + this.destination.next(innerValue); + } +} diff --git a/src/operator/race.ts b/src/operator/race.ts index a5675f7c7d..72326bab94 100644 --- a/src/operator/race.ts +++ b/src/operator/race.ts @@ -7,6 +7,7 @@ import { Subscription, TeardownLogic } from '../Subscription'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; +import { race as higherOrder } from '../operators'; /* tslint:disable:max-line-length */ export function race<T>(this: Observable<T>, observables: Array<Observable<T>>): Observable<T>; @@ -24,13 +25,7 @@ export function race<T, R>(this: Observable<T>, ...observables: Array<Observable * @owner Observable */ export function race<T>(this: Observable<T>, ...observables: Array<Observable<T> | Array<Observable<T>>>): Observable<T> { - // if the only argument is an array, it was most likely called with - // `pair([obs1, obs2, ...])` - if (observables.length === 1 && isArray(observables[0])) { - observables = <Array<Observable<T>>>observables[0]; - } - - return this.lift.call(raceStatic<T>(this, ...observables)); + return higherOrder(...observables)(this); } /** diff --git a/src/operators/index.ts b/src/operators/index.ts index 412226844a..1f6149c95a 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -9,6 +9,7 @@ export { mergeMap } from './mergeMap'; export { min } from './min'; export { multicast } from './multicast'; export { publish } from './publish'; +export { race } from './race'; export { reduce } from './reduce'; export { refCount } from './refCount'; export { scan } from './scan'; diff --git a/src/operators/race.ts b/src/operators/race.ts new file mode 100644 index 0000000000..a6efcdf5a2 --- /dev/null +++ b/src/operators/race.ts @@ -0,0 +1,31 @@ +import { Observable } from '../Observable'; +import { isArray } from '../util/isArray'; +import { MonoTypeOperatorFunction, OperatorFunction } from '../interfaces'; +import { race as raceStatic } from '../observable/race'; + +/* tslint:disable:max-line-length */ +export function race<T>(observables: Array<Observable<T>>): MonoTypeOperatorFunction<T>; +export function race<T, R>(observables: Array<Observable<T>>): OperatorFunction<T, R>; +export function race<T>(...observables: Array<Observable<T> | Array<Observable<T>>>): MonoTypeOperatorFunction<T>; +export function race<T, R>(...observables: Array<Observable<any> | Array<Observable<any>>>): OperatorFunction<T, R>; +/* tslint:enable:max-line-length */ + +/** + * Returns an Observable that mirrors the first source Observable to emit an item + * from the combination of this Observable and supplied Observables. + * @param {...Observables} ...observables Sources used to race for which Observable emits first. + * @return {Observable} An Observable that mirrors the output of the first Observable to emit an item. + * @method race + * @owner Observable + */ +export function race<T>(...observables: Array<Observable<T> | Array<Observable<T>>>): MonoTypeOperatorFunction<T> { + return function raceOperatorFunction(source: Observable<T>) { + // if the only argument is an array, it was most likely called with + // `pair([obs1, obs2, ...])` + if (observables.length === 1 && isArray(observables[0])) { + observables = <Array<Observable<T>>>observables[0]; + } + + return source.lift.call(raceStatic<T>(source, ...observables)); + }; +} \ No newline at end of file