-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(operator): add window operators: window, windowWhen, windowTime,…
… windowCount, windowToggle closes #195
- Loading branch information
Showing
14 changed files
with
602 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.window', function () { | ||
it('should emit windows that close and reopen', function (done) { | ||
var expected = [ | ||
[0, 1, 2], | ||
[3, 4, 5], | ||
[6, 7, 8] | ||
]; | ||
Observable.interval(100) | ||
.window(Observable.interval(320)) | ||
.take(3) | ||
.flatMap(function (x) { return x.toArray(); }) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.windowCount', function () { | ||
it('should emit windows at intervals', function (done) { | ||
var expected = [ | ||
[0, 1], | ||
[1, 2], | ||
[2, 3], | ||
[3] | ||
]; | ||
Observable.range(0, 4) | ||
.windowCount(2, 1) | ||
.take(3) | ||
.flatMap(function (x) { return x.toArray(); }) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.windowTime', function () { | ||
it('should emit windows at intervals', function (done) { | ||
var expected = [ | ||
[0, 1, 2], | ||
[3, 4, 5], | ||
[6, 7, 8] | ||
]; | ||
Observable.interval(100) | ||
.windowTime(320) | ||
.take(3) | ||
.flatMap(function (x) { return x.toArray(); }) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
|
||
|
||
it('should emit windows that have been created at intervals and close after the specified delay', function (done) { | ||
var expected = [ | ||
[0, 1, 2, 3, 4], | ||
[2, 3, 4, 5, 6], | ||
[4, 5, 6, 7, 8] | ||
]; | ||
Observable.interval(100) | ||
.windowTime(520, 220) | ||
.take(3) | ||
.flatMap(function (x) { return x.toArray(); }) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.windowToggle', function () { | ||
it('should emit windows that are opened by an observable from the first argument and closed by an observable returned by the function in the second argument', function (done) { | ||
Observable.interval(100).take(10) | ||
.windowToggle(Observable.timer(320).mapTo('test'), function (n) { | ||
expect(n).toBe('test'); | ||
return Observable.timer(320); | ||
}) | ||
.flatMap(function (w) { return w.toArray(); }) | ||
.subscribe(function (w) { | ||
expect(w).toEqual([3, 4, 5]) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.windowWhen', function () { | ||
it('should emit windows that close and reopen', function (done) { | ||
var expected = [ | ||
[0, 1, 2], | ||
[3, 4, 5], | ||
[6, 7, 8] | ||
]; | ||
Observable.interval(100) | ||
.windowWhen(function () { return Observable.timer(320); }) | ||
.take(3) | ||
.flatMap(function (x) { return x.toArray(); }) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import Operator from '../Operator'; | ||
import Observer from '../Observer'; | ||
import Subscriber from '../Subscriber'; | ||
import Observable from '../Observable'; | ||
import Subject from '../Subject'; | ||
|
||
import tryCatch from '../util/tryCatch'; | ||
import {errorObject} from '../util/errorObject'; | ||
import bindCallback from '../util/bindCallback'; | ||
|
||
export default function window<T>(closingNotifier: Observable<any>) : Observable<Observable<T>> { | ||
return this.lift(new WindowOperator(closingNotifier)); | ||
} | ||
|
||
export class WindowOperator<T, R> implements Operator<T, R> { | ||
|
||
constructor(private closingNotifier: Observable<any>) { | ||
} | ||
|
||
call(observer: Observer<T>): Observer<T> { | ||
return new WindowSubscriber(observer, this.closingNotifier); | ||
} | ||
} | ||
|
||
export class WindowSubscriber<T> extends Subscriber<T> { | ||
private window: Subject<T> = new Subject<T>(); | ||
|
||
constructor(destination: Observer<T>, private closingNotifier: Observable<any>) { | ||
super(destination); | ||
this.add(closingNotifier.subscribe(new WindowClosingNotifierSubscriber(this))); | ||
this.openWindow(); | ||
} | ||
|
||
_next(value: T) { | ||
this.window.next(value); | ||
} | ||
|
||
_error(err: any) { | ||
this.window.error(err); | ||
this.destination.error(err); | ||
} | ||
|
||
_complete() { | ||
this.window.complete(); | ||
this.destination.complete(); | ||
} | ||
|
||
openWindow() { | ||
const prevWindow = this.window; | ||
if (prevWindow) { | ||
prevWindow.complete(); | ||
} | ||
this.destination.next(this.window = new Subject<T>()); | ||
} | ||
} | ||
|
||
export class WindowClosingNotifierSubscriber<T> extends Subscriber<T> { | ||
constructor(private parent: WindowSubscriber<any>) { | ||
super(null); | ||
} | ||
|
||
_next() { | ||
this.parent.openWindow(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import Operator from '../Operator'; | ||
import Observer from '../Observer'; | ||
import Subscriber from '../Subscriber'; | ||
import Observable from '../Observable'; | ||
import Subject from '../Subject'; | ||
import Subscription from '../Subscription'; | ||
import Scheduler from '../Scheduler'; | ||
|
||
import tryCatch from '../util/tryCatch'; | ||
import {errorObject} from '../util/errorObject'; | ||
import bindCallback from '../util/bindCallback'; | ||
|
||
export default function windowCount<T>(windowSize: number, startWindowEvery: number = 0) : Observable<Observable<T>> { | ||
return this.lift(new WindowCountOperator(windowSize, startWindowEvery)); | ||
} | ||
|
||
export class WindowCountOperator<T, R> implements Operator<T, R> { | ||
|
||
constructor(private windowSize: number, private startWindowEvery: number) { | ||
} | ||
|
||
call(observer: Observer<T>): Observer<T> { | ||
return new WindowCountSubscriber(observer, this.windowSize, this.startWindowEvery); | ||
} | ||
} | ||
|
||
export class WindowCountSubscriber<T> extends Subscriber<T> { | ||
private windows: { count: number, window: Subject<T> } [] = []; | ||
private count: number = 0; | ||
|
||
constructor(destination: Observer<T>, private windowSize: number, private startWindowEvery: number) { | ||
super(destination); | ||
} | ||
|
||
_next(value: T) { | ||
const count = (this.count += 1); | ||
const startWindowEvery = this.startWindowEvery; | ||
const windowSize = this.windowSize; | ||
const windows = this.windows; | ||
|
||
if (startWindowEvery && count % this.startWindowEvery === 0) { | ||
let window = new Subject<T>(); | ||
windows.push({ count: 0, window }); | ||
this.destination.next(window); | ||
} | ||
|
||
const len = windows.length; | ||
for (let i = 0; i < len; i++) { | ||
let w = windows[i]; | ||
const window = w.window; | ||
window.next(value); | ||
if (windowSize === (w.count += 1)) { | ||
window.complete(); | ||
} | ||
} | ||
} | ||
|
||
_error(err: any) { | ||
const windows = this.windows; | ||
while (windows.length > 0) { | ||
windows.shift().window.error(err); | ||
} | ||
this.destination.error(err); | ||
} | ||
|
||
_complete() { | ||
const windows = this.windows; | ||
while (windows.length > 0) { | ||
windows.shift().window.complete(); | ||
} | ||
this.destination.complete(); | ||
} | ||
} |
Oops, something went wrong.