Skip to content

Commit

Permalink
fix(windowCount): fix windowCount window opening times
Browse files Browse the repository at this point in the history
Fix bugs with windowCount. This commit reverts PR #273, in order to have
windowCount pass comprehensive marble tests. windowCount must open the
first window immediately, not when the first next() event arrives, to
comply with legacy windowWithCount and with RxJS Next window and
windowTime.
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 13, 2015
1 parent ababa3d commit 908ae56
Showing 1 changed file with 21 additions and 32 deletions.
53 changes: 21 additions & 32 deletions src/operators/windowCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,65 @@ import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function windowCount<T>(windowSize: number, startWindowEvery: number = 0): Observable<Observable<T>> {
export default function windowCount<T>(windowSize: number,
startWindowEvery: number = 0): Observable<Observable<T>> {
return this.lift(new WindowCountOperator(windowSize, startWindowEvery));
}

class WindowCountOperator<T, R> implements Operator<T, R> {

constructor(private windowSize: number, private startWindowEvery: number) {
constructor(private windowSize: number,
private startWindowEvery: number) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new WindowCountSubscriber(subscriber, this.windowSize, this.startWindowEvery);
}
}

interface WindowObject<T> {
count: number;
notified: boolean;
window: Subject<T>;
}

class WindowCountSubscriber<T> extends Subscriber<T> {
private windows: WindowObject<T>[] = [
{ count: 0, notified : false, window : new Subject<T>() }
];
private windows: Subject<T>[] = [ new Subject<T>() ];
private count: number = 0;

constructor(destination: Subscriber<T>, private windowSize: number, private startWindowEvery: number) {
constructor(destination: Subscriber<T>,
private windowSize: number,
private startWindowEvery: number) {
super(destination);
destination.next(this.windows[0]);
}

_next(value: T) {
const count = (this.count += 1);
const startWindowEvery = (this.startWindowEvery > 0) ? this.startWindowEvery : this.windowSize;
const windowSize = this.windowSize;
const windows = this.windows;
const len = windows.length;

if (count % startWindowEvery === 0) {
let window = new Subject<T>();
windows.push({ count: 0, notified : false, window : window });
}

for (let i = 0; i < len; i++) {
let w = windows[i];
const window = w.window;

if (!w.notified) {
w.notified = true;
this.destination.next(window);
}

window.next(value);
if (windowSize === (w.count += 1)) {
window.complete();
}
windows[i].next(value);
}
const c = this.count - windowSize + 1;
if (c >= 0 && c % startWindowEvery === 0) {
windows.shift().complete();
}
if (++this.count % startWindowEvery === 0) {
let window = new Subject<T>();
windows.push(window);
this.destination.next(window);
}
}

_error(err: any) {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().window.error(err);
windows.shift().error(err);
}
this.destination.error(err);
}

_complete() {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().window.complete();
windows.shift().complete();
}
this.destination.complete();
}
Expand Down

0 comments on commit 908ae56

Please sign in to comment.