diff --git a/src/add/observable/race.ts b/src/add/observable/race.ts index e871cf5113..68c58130d1 100644 --- a/src/add/observable/race.ts +++ b/src/add/observable/race.ts @@ -1,5 +1,5 @@ import { Observable } from '../../Observable'; -import { raceStatic } from '../../operator/race'; +import { race as raceStatic } from '../../observable/race'; Observable.race = raceStatic; diff --git a/src/observable/race.ts b/src/observable/race.ts new file mode 100644 index 0000000000..ef1b2feb00 --- /dev/null +++ b/src/observable/race.ts @@ -0,0 +1,100 @@ +import { Observable } from '../Observable'; +import { isArray } from '../util/isArray'; +import { ArrayObservable } from '../observable/ArrayObservable'; +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Subscription, TeardownLogic } from '../Subscription'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; + +/** + * Returns an Observable that mirrors the first source Observable to emit an item. + * @param {...Observables} ...observables sources used to race for which Observable emits first. + * @return {Observable} an Observable that mirrors the output of the first Observable to emit an item. + * @static true + * @name race + * @owner Observable + */ +export function race(observables: Array>): Observable; +export function race(observables: Array>): Observable; +export function race(...observables: Array | Array>>): Observable; +export function race(...observables: Array | Array>>): Observable { + // if the only argument is an array, it was most likely called with + // `race([obs1, obs2, ...])` + if (observables.length === 1) { + if (isArray(observables[0])) { + observables = >>observables[0]; + } else { + return >observables[0]; + } + } + + return new ArrayObservable(observables).lift(new RaceOperator()); +} + +export class RaceOperator implements Operator { + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new RaceSubscriber(subscriber)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +export class RaceSubscriber extends OuterSubscriber { + private hasFirst: boolean = false; + private observables: Observable[] = []; + private subscriptions: Subscription[] = []; + + constructor(destination: Subscriber) { + super(destination); + } + + protected _next(observable: any): void { + this.observables.push(observable); + } + + protected _complete() { + const observables = this.observables; + const len = observables.length; + + if (len === 0) { + this.destination.complete(); + } else { + for (let i = 0; i < len && !this.hasFirst; i++) { + let observable = observables[i]; + let subscription = subscribeToResult(this, observable, observable, i); + + if (this.subscriptions) { + this.subscriptions.push(subscription); + } + this.add(subscription); + } + this.observables = null; + } + } + + notifyNext(outerValue: T, innerValue: T, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + if (!this.hasFirst) { + this.hasFirst = true; + + for (let i = 0; i < this.subscriptions.length; i++) { + if (i !== outerIndex) { + let subscription = this.subscriptions[i]; + + subscription.unsubscribe(); + this.remove(subscription); + } + } + + this.subscriptions = null; + } + + this.destination.next(innerValue); + } +} diff --git a/src/operator/race.ts b/src/operator/race.ts index a5675f7c7d..72326bab94 100644 --- a/src/operator/race.ts +++ b/src/operator/race.ts @@ -7,6 +7,7 @@ import { Subscription, TeardownLogic } from '../Subscription'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; +import { race as higherOrder } from '../operators'; /* tslint:disable:max-line-length */ export function race(this: Observable, observables: Array>): Observable; @@ -24,13 +25,7 @@ export function race(this: Observable, ...observables: Array(this: Observable, ...observables: Array | Array>>): Observable { - // if the only argument is an array, it was most likely called with - // `pair([obs1, obs2, ...])` - if (observables.length === 1 && isArray(observables[0])) { - observables = >>observables[0]; - } - - return this.lift.call(raceStatic(this, ...observables)); + return higherOrder(...observables)(this); } /** diff --git a/src/operators/index.ts b/src/operators/index.ts index 412226844a..1f6149c95a 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -9,6 +9,7 @@ export { mergeMap } from './mergeMap'; export { min } from './min'; export { multicast } from './multicast'; export { publish } from './publish'; +export { race } from './race'; export { reduce } from './reduce'; export { refCount } from './refCount'; export { scan } from './scan'; diff --git a/src/operators/race.ts b/src/operators/race.ts new file mode 100644 index 0000000000..a6efcdf5a2 --- /dev/null +++ b/src/operators/race.ts @@ -0,0 +1,31 @@ +import { Observable } from '../Observable'; +import { isArray } from '../util/isArray'; +import { MonoTypeOperatorFunction, OperatorFunction } from '../interfaces'; +import { race as raceStatic } from '../observable/race'; + +/* tslint:disable:max-line-length */ +export function race(observables: Array>): MonoTypeOperatorFunction; +export function race(observables: Array>): OperatorFunction; +export function race(...observables: Array | Array>>): MonoTypeOperatorFunction; +export function race(...observables: Array | Array>>): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Returns an Observable that mirrors the first source Observable to emit an item + * from the combination of this Observable and supplied Observables. + * @param {...Observables} ...observables Sources used to race for which Observable emits first. + * @return {Observable} An Observable that mirrors the output of the first Observable to emit an item. + * @method race + * @owner Observable + */ +export function race(...observables: Array | Array>>): MonoTypeOperatorFunction { + return function raceOperatorFunction(source: Observable) { + // if the only argument is an array, it was most likely called with + // `pair([obs1, obs2, ...])` + if (observables.length === 1 && isArray(observables[0])) { + observables = >>observables[0]; + } + + return source.lift.call(raceStatic(source, ...observables)); + }; +} \ No newline at end of file