Skip to content

Commit

Permalink
feat(windowTime): maxWindowSize parameter in windowTime operator
Browse files Browse the repository at this point in the history
Adds new parameter in windowTime operator to control how much values given
window can emit.

Closes #1301
  • Loading branch information
mpodlasin committed Feb 15, 2017
1 parent de6a8f4 commit 381be3f
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 24 deletions.
59 changes: 48 additions & 11 deletions spec/operators/windowTime-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,51 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '--a--(b|) ');
const y = cold( '-d--e| ');
const z = cold( '-g--h| ');
const values = { x: x, y: y, z: z };
const values = { x, y, z };

const result = source.windowTime(50, 100, rxTestScheduler);

expectObservable(result).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should close windows after max count is reached', () => {
const source = hot('--1--2--^--a--b--c--d--e--f--g-----|');
const subs = '^ !';
const timeSpan = time( '----------|');
// 100 frames 0---------1---------2------|
const expected = 'x---------y---------z------|';
const x = cold( '---a--(b|) ');
const y = cold( '--d--(e|) ');
const z = cold( '-g-----|');
const values = { x, y, z };

const result = source.windowTime(timeSpan, null, 2, rxTestScheduler);

expectObservable(result).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should close window after max count is reached with' +
'windowCreationInterval', () => {
const source = hot('--1--2--^-a--b--c--de-f---g--h--i-|');
const subs = '^ !';
// 100 frames 0---------1---------2-----|
// 50 ----|
// 50 ----|
// 50 ----|
const expected = 'x---------y---------z-----|';
const x = cold( '--a--(b|) ');
const y = cold( '-de-(f|) ');
const z = cold( '-h--i| ');
const values = { x, y, z };

const result = source.windowTime(50, 100, 3, rxTestScheduler);

expectObservable(result).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should emit windows given windowTimeSpan', () => {
const source = hot('--1--2--^--a--b--c--d--e--f--g--h--|');
const subs = '^ !';
Expand All @@ -40,9 +77,9 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a--b--c| ');
const y = cold( '--d--e--f-| ');
const z = cold( '-g--h--|');
const values = { x: x, y: y, z: z };
const values = { x, y, z };

const result = source.windowTime(timeSpan, null, rxTestScheduler);
const result = source.windowTime(timeSpan, rxTestScheduler);

expectObservable(result).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -61,7 +98,7 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a-| ');
const y = cold( '--d--(e|) ');
const z = cold( '-g--h| ');
const values = { x: x, y: y, z: z };
const values = { x, y, z };

const result = source.windowTime(timeSpan, interval, rxTestScheduler);

Expand All @@ -74,7 +111,7 @@ describe('Observable.prototype.windowTime', () => {
const subs = '(^!)';
const expected = '(w|)';
const w = cold('|');
const expectedValues = { w: w };
const expectedValues = { w };
const timeSpan = time('-----|');
const interval = time('----------|');

Expand All @@ -89,7 +126,7 @@ describe('Observable.prototype.windowTime', () => {
const subs = '(^!)';
const expected = '(w|)';
const w = cold('(a|)');
const expectedValues = { w: w };
const expectedValues = { w };
const timeSpan = time('-----|');
const interval = time('----------|');

Expand All @@ -110,7 +147,7 @@ describe('Observable.prototype.windowTime', () => {
const c = cold( '---| ');
const d = cold( '--');
const unsub = ' !';
const expectedValues = { a: a, b: b, c: c, d: d };
const expectedValues = { a, b, c, d };

const result = source.windowTime(timeSpan, interval, rxTestScheduler);

Expand All @@ -123,7 +160,7 @@ describe('Observable.prototype.windowTime', () => {
const subs = '(^!)';
const expected = '(w#)';
const w = cold('#');
const expectedValues = { w: w };
const expectedValues = { w };
const timeSpan = time('-----|');
const interval = time('----------|');

Expand All @@ -146,7 +183,7 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a-| ');
const y = cold( '--d--(e|) ');
const z = cold( '-g--h| ');
const values = { x: x, y: y, z: z };
const values = { x, y, z };

const result = source.windowTime(timeSpan, interval, rxTestScheduler);

Expand All @@ -168,7 +205,7 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a-| ');
const y = cold( '-- ');
const unsub = ' ! ';
const values = { x: x, y: y };
const values = { x, y };

const result = source.windowTime(timeSpan, interval, rxTestScheduler);

Expand All @@ -189,7 +226,7 @@ describe('Observable.prototype.windowTime', () => {
const x = cold( '---a-| ');
const y = cold( '--d-- ');
const unsub = ' ! ';
const values = { x: x, y: y };
const values = { x, y };

const result = source
.mergeMap((x: string) => Observable.of(x))
Expand Down
87 changes: 74 additions & 13 deletions src/operator/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { async } from '../scheduler/async';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { isNumeric } from '../util/isNumeric';
import { isScheduler } from '../util/isScheduler';

/**
* Branch out the source Observable values as a nested Observable periodically
Expand All @@ -24,7 +26,10 @@ import { Subscription } from '../Subscription';
* emits the current window and propagates the notification from the source
* Observable. If `windowCreationInterval` is not provided, the output
* Observable starts a new window when the previous window of duration
* `windowTimeSpan` completes.
* `windowTimeSpan` completes. If `maxWindowCount` is provided, each window
* will emit at most fixed number of values. Window will complete immediately
* after emitting last value and next one still will open as specified by
* `windowTimeSpan` and `windowCreationInterval` arguments.
*
* @example <caption>In every window of 1 second each, emit at most 2 click events</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
Expand All @@ -40,6 +45,12 @@ import { Subscription } from '../Subscription';
* .mergeAll(); // flatten the Observable-of-Observables
* result.subscribe(x => console.log(x));
*
* @example <caption>Same as example above but with maxWindowCount instead of take</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var result = clicks.windowTime(1000, 5000, 2) // each window has still at most 2 emissions
* .mergeAll(); // flatten the Observable-of-Observables
* result.subscribe(x => console.log(x));
* @see {@link window}
* @see {@link windowCount}
* @see {@link windowToggle}
Expand All @@ -49,6 +60,8 @@ import { Subscription } from '../Subscription';
* @param {number} windowTimeSpan The amount of time to fill each window.
* @param {number} [windowCreationInterval] The interval at which to start new
* windows.
* @param {number} [maxWindowSize=Number.POSITIVE_INFINITY] Max number of
* values each window can emit before completion.
* @param {Scheduler} [scheduler=async] The scheduler on which to schedule the
* intervals that determine window boundaries.
* @return {Observable<Observable<T>>} An observable of windows, which in turn
Expand All @@ -57,21 +70,52 @@ import { Subscription } from '../Subscription';
* @owner Observable
*/
export function windowTime<T>(this: Observable<T>, windowTimeSpan: number,
windowCreationInterval: number = null,
scheduler: IScheduler = async): Observable<Observable<T>> {
return this.lift(new WindowTimeOperator<T>(windowTimeSpan, windowCreationInterval, scheduler));
scheduler?: IScheduler): Observable<Observable<T>>;
export function windowTime<T>(this: Observable<T>, windowTimeSpan: number,
windowCreationInterval: number,
scheduler?: IScheduler): Observable<Observable<T>>;
export function windowTime<T>(this: Observable<T>, windowTimeSpan: number,
windowCreationInterval: number,
maxWindowSize: number,
scheduler?: IScheduler): Observable<Observable<T>>;

export function windowTime<T>(this: Observable<T>,
windowTimeSpan: number): Observable<Observable<T>> {

let scheduler: IScheduler = async;
let windowCreationInterval: number = null;
let maxWindowSize: number = Number.POSITIVE_INFINITY;

if (isScheduler(arguments[3])) {
scheduler = arguments[3];
}

if (isScheduler(arguments[2])) {
scheduler = arguments[2];
} else if (isNumeric(arguments[2])) {
maxWindowSize = arguments[2];
}

if (isScheduler(arguments[1])) {
scheduler = arguments[1];
} else if (isNumeric(arguments[1])) {
windowCreationInterval = arguments[1];
}

return this.lift(new WindowTimeOperator<T>(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler));
}

class WindowTimeOperator<T> implements Operator<T, Observable<T>> {

constructor(private windowTimeSpan: number,
private windowCreationInterval: number,
private windowCreationInterval: number | null,
private maxWindowSize: number,
private scheduler: IScheduler) {
}

call(subscriber: Subscriber<Observable<T>>, source: any): any {
return source.subscribe(new WindowTimeSubscriber(
subscriber, this.windowTimeSpan, this.windowCreationInterval, this.scheduler
subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler
));
}
}
Expand All @@ -84,7 +128,7 @@ interface CreationState<T> {
}

interface TimeSpanOnlyState<T> {
window: Subject<T>;
window: CountedSubject<T>;
windowTimeSpan: number;
subscriber: WindowTimeSubscriber<T>;
}
Expand All @@ -96,21 +140,35 @@ interface CloseWindowContext<T> {

interface CloseState<T> {
subscriber: WindowTimeSubscriber<T>;
window: Subject<T>;
window: CountedSubject<T>;
context: CloseWindowContext<T>;
}

class CountedSubject<T> extends Subject<T> {
private _numberOfNextedValues: number = 0;

next(value?: T): void {
this._numberOfNextedValues++;
super.next(value);
}

get numberOfNextedValues(): number {
return this._numberOfNextedValues;
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class WindowTimeSubscriber<T> extends Subscriber<T> {
private windows: Array<Subject<T>> = [];
private windows: CountedSubject<T>[] = [];

constructor(protected destination: Subscriber<Observable<T>>,
private windowTimeSpan: number,
private windowCreationInterval: number,
private windowCreationInterval: number | null,
private maxWindowSize: number,
private scheduler: IScheduler) {
super(destination);

Expand All @@ -133,6 +191,9 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
const window = windows[i];
if (!window.closed) {
window.next(value);
if (window.numberOfNextedValues >= this.maxWindowSize) {
this.closeWindow(window);
}
}
}
}
Expand All @@ -156,15 +217,15 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
this.destination.complete();
}

public openWindow(): Subject<T> {
const window = new Subject<T>();
public openWindow(): CountedSubject<T> {
const window = new CountedSubject<T>();
this.windows.push(window);
const destination = this.destination;
destination.next(window);
return window;
}

public closeWindow(window: Subject<T>): void {
public closeWindow(window: CountedSubject<T>): void {
window.complete();
const windows = this.windows;
windows.splice(windows.indexOf(window), 1);
Expand Down

0 comments on commit 381be3f

Please sign in to comment.