From fb6014deca5ea59bbc5745373f1144ff8aa4f4e8 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 15 Jun 2017 17:27:30 -0700 Subject: [PATCH] feat(multicast): add higher-order lettable variant of multicast Also adds MonoTypeOperatorFunction --- src/interfaces.ts | 4 +++ src/operator/multicast.ts | 45 ++++--------------------- src/operators/index.ts | 1 + src/operators/multicast.ts | 67 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 39 deletions(-) create mode 100644 src/operators/multicast.ts diff --git a/src/interfaces.ts b/src/interfaces.ts index c80aa235b2..5d825f231d 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -3,3 +3,7 @@ import { Observable } from './Observable'; export type UnaryFunction = (source: T) => R; export type OperatorFunction = UnaryFunction, Observable>; + +export type FactoryOrValue = T | (() => T); + +export type MonoTypeOperatorFunction = OperatorFunction; diff --git a/src/operator/multicast.ts b/src/operator/multicast.ts index 75a71cf0c5..5438f4bf9f 100644 --- a/src/operator/multicast.ts +++ b/src/operator/multicast.ts @@ -1,12 +1,12 @@ import { Subject } from '../Subject'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable'; +import { ConnectableObservable } from '../observable/ConnectableObservable'; +import { multicast as higherOrder } from '../operators'; +import { FactoryOrValue, MonoTypeOperatorFunction } from '../interfaces'; /* tslint:disable:max-line-length */ -export function multicast(this: Observable, subjectOrSubjectFactory: factoryOrValue>): ConnectableObservable; -export function multicast(SubjectFactory: (this: Observable) => Subject, selector?: selector): Observable; +export function multicast(this: Observable, subjectOrSubjectFactory: FactoryOrValue>): ConnectableObservable; +export function multicast(SubjectFactory: (this: Observable) => Subject, selector?: MonoTypeOperatorFunction): Observable; /* tslint:enable:max-line-length */ /** @@ -30,38 +30,5 @@ export function multicast(SubjectFactory: (this: Observable) => Subject */ export function multicast(this: Observable, subjectOrSubjectFactory: Subject | (() => Subject), selector?: (source: Observable) => Observable): Observable | ConnectableObservable { - let subjectFactory: () => Subject; - if (typeof subjectOrSubjectFactory === 'function') { - subjectFactory = <() => Subject>subjectOrSubjectFactory; - } else { - subjectFactory = function subjectFactory() { - return >subjectOrSubjectFactory; - }; - } - - if (typeof selector === 'function') { - return this.lift(new MulticastOperator(subjectFactory, selector)); - } - - const connectable: any = Object.create(this, connectableObservableDescriptor); - connectable.source = this; - connectable.subjectFactory = subjectFactory; - - return > connectable; -} - -export type factoryOrValue = T | (() => T); -export type selector = (source: Observable) => Observable; - -export class MulticastOperator implements Operator { - constructor(private subjectFactory: () => Subject, - private selector: (source: Observable) => Observable) { - } - call(subscriber: Subscriber, source: any): any { - const { selector } = this; - const subject = this.subjectFactory(); - const subscription = selector(subject).subscribe(subscriber); - subscription.add(source.subscribe(subject)); - return subscription; - } + return higherOrder(subjectOrSubjectFactory, selector)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index c852eaaf21..e2052f5f7b 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -5,6 +5,7 @@ export { map } from './map'; export { max } from './max'; export { mergeMap } from './mergeMap'; export { min } from './min'; +export { multicast } from './multicast'; export { reduce } from './reduce'; export { scan } from './scan'; export { switchMap } from './switchMap'; diff --git a/src/operators/multicast.ts b/src/operators/multicast.ts new file mode 100644 index 0000000000..2c9666edef --- /dev/null +++ b/src/operators/multicast.ts @@ -0,0 +1,67 @@ +import { Subject } from '../Subject'; +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; +import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable'; +import { FactoryOrValue, MonoTypeOperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function multicast(subjectOrSubjectFactory: FactoryOrValue>): MonoTypeOperatorFunction; +export function multicast(SubjectFactory: (this: Observable) => Subject, selector?: MonoTypeOperatorFunction): MonoTypeOperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Returns an Observable that emits the results of invoking a specified selector on items + * emitted by a ConnectableObservable that shares a single subscription to the underlying stream. + * + * + * + * @param {Function|Subject} subjectOrSubjectFactory - Factory function to create an intermediate subject through + * which the source sequence's elements will be multicast to the selector function + * or Subject to push source elements into. + * @param {Function} [selector] - Optional selector function that can use the multicasted source stream + * as many times as needed, without causing multiple subscriptions to the source stream. + * Subscribers to the given source will receive all notifications of the source from the + * time of the subscription forward. + * @return {Observable} An Observable that emits the results of invoking the selector + * on the items emitted by a `ConnectableObservable` that shares a single subscription to + * the underlying stream. + * @method multicast + * @owner Observable + */ +export function multicast(subjectOrSubjectFactory: Subject | (() => Subject), + selector?: (source: Observable) => Observable): MonoTypeOperatorFunction { + return function multicastOperatorFunction(source: Observable): Observable { + let subjectFactory: () => Subject; + if (typeof subjectOrSubjectFactory === 'function') { + subjectFactory = <() => Subject>subjectOrSubjectFactory; + } else { + subjectFactory = function subjectFactory() { + return >subjectOrSubjectFactory; + }; + } + + if (typeof selector === 'function') { + return source.lift(new MulticastOperator(subjectFactory, selector)); + } + + const connectable: any = Object.create(source, connectableObservableDescriptor); + connectable.source = source; + connectable.subjectFactory = subjectFactory; + + return > connectable; + }; +} + +export class MulticastOperator implements Operator { + constructor(private subjectFactory: () => Subject, + private selector: (source: Observable) => Observable) { + } + call(subscriber: Subscriber, source: any): any { + const { selector } = this; + const subject = this.subjectFactory(); + const subscription = selector(subject).subscribe(subscriber); + subscription.add(source.subscribe(subject)); + return subscription; + } +}