Skip to content

Commit

Permalink
feat(publishReplay): add selector function to publishReplay (#2885)
Browse files Browse the repository at this point in the history
* feat(publishReplay): add selector function to publishReplay

2844

* fix(publishReplay): make the selector optional

* test(publishReplay): add more test for the selector function

* fix(publishReplay): fix publishReplay function signatures
  • Loading branch information
martinsik authored and benlesh committed Oct 18, 2017
1 parent 39b4af5 commit e0efd13
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 10 deletions.
71 changes: 71 additions & 0 deletions spec/operators/publishReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,4 +409,75 @@ describe('Observable.prototype.publishReplay', () => {

published.connect();
});

it('should mirror a simple source Observable with selector', () => {
const values = {a: 2, b: 4, c: 6, d: 8};
const selector = observable => observable.map(v => 2 * v);
const source = cold('--1-2---3-4---|');
const sourceSubs = '^ !';
const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector);
const expected = '--a-b---c-d---|';

expectObservable(published).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should emit an error when the selector throws an exception', () => {
const error = "It's broken";
const selector = () => {
throw error;
};
const source = cold('--1-2---3-4---|');
const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector);

// The exception is thrown outside Rx chain (not as an error notification).
expect(() => published.subscribe()).to.throw(error);
});

it('should emit an error when the selector returns an Observable that emits an error', () => {
const error = "It's broken";
const innerObservable = cold('--5-6----#', undefined, error);
const selector = observable => observable.mergeMapTo(innerObservable);
const source = cold('--1--2---3---|');
const sourceSubs = '^ !';
const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector);
const expected = '----5-65-6-#';

expectObservable(published).toBe(expected, undefined, error);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should terminate immediately when the selector returns an empty Observable', () => {
const selector = () => Observable.empty();
const source = cold('--1--2---3---|');
const sourceSubs = '(^!)';
const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector);
const expected = '|';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should not emit and should not complete/error when the selector returns never', () => {
const selector = () => Observable.never();
const source = cold('-');
const sourceSubs = '^';
const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector);
const expected = '-';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should emit error when the selector returns Observable.throw', () => {
const error = "It's broken";
const selector = () => Observable.throw(error);
const source = cold('--1--2---3---|');
const sourceSubs = '(^!)';
const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector);
const expected = '#';

expectObservable(published).toBe(expected, undefined, error);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});
});
20 changes: 15 additions & 5 deletions src/operator/publishReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@ import { Observable } from '../Observable';
import { IScheduler } from '../Scheduler';
import { ConnectableObservable } from '../observable/ConnectableObservable';
import { publishReplay as higherOrder } from '../operators/publishReplay';
import { OperatorFunction, MonoTypeOperatorFunction } from '../interfaces';

/* tslint:disable:max-line-length */
export function publishReplay<T>(this: Observable<T>, bufferSize?: number, windowTime?: number, scheduler?: IScheduler): ConnectableObservable<T>;
export function publishReplay<T>(this: Observable<T>, bufferSize?: number, windowTime?: number, selector?: MonoTypeOperatorFunction<T>, scheduler?: IScheduler): Observable<T>;
export function publishReplay<T, R>(this: Observable<T>, bufferSize?: number, windowTime?: number, selector?: OperatorFunction<T, R>): Observable<R>;
/* tslint:enable:max-line-length */

/**
* @param bufferSize
* @param windowTime
* @param selectorOrScheduler
* @param scheduler
* @return {ConnectableObservable<T>}
* @return {Observable<T> | ConnectableObservable<T>}
* @method publishReplay
* @owner Observable
*/
export function publishReplay<T>(this: Observable<T>, bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
scheduler?: IScheduler): ConnectableObservable<T> {
return higherOrder(bufferSize, windowTime, scheduler)(this) as ConnectableObservable<T>;
export function publishReplay<T, R>(this: Observable<T>, bufferSize?: number,
windowTime?: number,
selectorOrScheduler?: IScheduler | OperatorFunction<T, R>,
scheduler?: IScheduler): Observable<R> | ConnectableObservable<R> {

return higherOrder(bufferSize, windowTime, selectorOrScheduler as any, scheduler)(this);
}
25 changes: 20 additions & 5 deletions src/operators/publishReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,25 @@ import { ReplaySubject } from '../ReplaySubject';
import { IScheduler } from '../Scheduler';
import { multicast } from './multicast';
import { ConnectableObservable } from '../observable/ConnectableObservable';
import { UnaryFunction } from '../interfaces';
import { UnaryFunction, MonoTypeOperatorFunction, OperatorFunction } from '../interfaces';

export function publishReplay<T>(bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
scheduler?: IScheduler): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, scheduler))(source) as ConnectableObservable<T>;
/* tslint:disable:max-line-length */
export function publishReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export function publishReplay<T>(bufferSize?: number, windowTime?: number, selector?: MonoTypeOperatorFunction<T>, scheduler?: IScheduler): MonoTypeOperatorFunction<T>;
export function publishReplay<T, R>(bufferSize?: number, windowTime?: number, selector?: OperatorFunction<T, R>, scheduler?: IScheduler): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */

export function publishReplay<T, R>(bufferSize?: number,
windowTime?: number,
selectorOrScheduler?: IScheduler | OperatorFunction<T, R>,
scheduler?: IScheduler): UnaryFunction<Observable<T>, ConnectableObservable<R> | Observable<R>> {

if (selectorOrScheduler && typeof selectorOrScheduler !== 'function') {
scheduler = selectorOrScheduler;
}

const selector = typeof selectorOrScheduler === 'function' ? selectorOrScheduler : undefined;
const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);

return (source: Observable<T>) => multicast(() => subject, selector)(source) as Observable<R> | ConnectableObservable<R>;
}

0 comments on commit e0efd13

Please sign in to comment.