Skip to content

Commit

Permalink
perf(bufferCount): optimize bufferCount operator (#2359)
Browse files Browse the repository at this point in the history
* perf(bufferCount): add perf tests for buffercount with skip param

* perf(bufferCount): optimize bufferCount operator

* refactor(bufferCount): fix types

* refactor(bufferCount): refactored as requested by @jayphelps

* refactor(bufferCount): define destination as const
  • Loading branch information
martinsik authored and benlesh committed Apr 3, 2017
1 parent c60c387 commit 28d0883
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 11 deletions.
18 changes: 18 additions & 0 deletions perf/micro/current-thread-scheduler/operators/buffercount-skip.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldBufferCountWithCurrentThreadScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread).bufferWithCount(5, 3);
var newBufferCountWithCurrentThreadScheduler = RxNew.Observable.range(0, 25, RxNew.Scheduler.queue).bufferCount(5, 3);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old bufferCount with current thread scheduler', function () {
oldBufferCountWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
})
.add('new bufferCount with current thread scheduler', function () {
newBufferCountWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
});
};
18 changes: 18 additions & 0 deletions perf/micro/immediate-scheduler/operators/buffercount-skip.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldBufferCountWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).bufferWithCount(5, 3);
var newBufferCountWithImmediateScheduler = RxNew.Observable.range(0, 25).bufferCount(5, 3);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old bufferCount with immediate scheduler', function () {
oldBufferCountWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new bufferCount with immediate scheduler', function () {
newBufferCountWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
62 changes: 51 additions & 11 deletions src/operator/bufferCount.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { TeardownLogic } from '../Subscription';

/**
* Buffers the source Observable values until the size hits the maximum
Expand Down Expand Up @@ -48,11 +49,18 @@ export function bufferCount<T>(this: Observable<T>, bufferSize: number, startBuf
}

class BufferCountOperator<T> implements Operator<T, T[]> {
private subscriberClass: any;

constructor(private bufferSize: number, private startBufferEvery: number) {
if (!startBufferEvery || bufferSize === startBufferEvery) {
this.subscriberClass = BufferCountSubscriber;
} else {
this.subscriberClass = BufferSkipCountSubscriber;
}
}

call(subscriber: Subscriber<T[]>, source: any): any {
return source.subscribe(new BufferCountSubscriber(subscriber, this.bufferSize, this.startBufferEvery));
call(subscriber: Subscriber<T[]>, source: any): TeardownLogic {
return source.subscribe(new this.subscriberClass(subscriber, this.bufferSize, this.startBufferEvery));
}
}

Expand All @@ -62,19 +70,50 @@ class BufferCountOperator<T> implements Operator<T, T[]> {
* @extends {Ignored}
*/
class BufferCountSubscriber<T> extends Subscriber<T> {
private buffer: T[] = [];

constructor(destination: Subscriber<T[]>, private bufferSize: number) {
super(destination);
}

protected _next(value: T): void {
const buffer = this.buffer;

buffer.push(value);

if (buffer.length == this.bufferSize) {
this.destination.next(buffer);
this.buffer = [];
}
}

protected _complete(): void {
const buffer = this.buffer;
if (buffer.length > 0) {
this.destination.next(buffer);
}
super._complete();
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class BufferSkipCountSubscriber<T> extends Subscriber<T> {
private buffers: Array<T[]> = [];
private count: number = 0;

constructor(destination: Subscriber<T[]>, private bufferSize: number, private startBufferEvery: number) {
super(destination);
}

protected _next(value: T) {
const count = this.count++;
const { destination, bufferSize, startBufferEvery, buffers } = this;
const startOn = (startBufferEvery == null) ? bufferSize : startBufferEvery;
protected _next(value: T): void {
const { bufferSize, startBufferEvery, buffers, count } = this;

if (count % startOn === 0) {
this.count++;
if (count % startBufferEvery === 0) {
buffers.push([]);
}

Expand All @@ -83,14 +122,14 @@ class BufferCountSubscriber<T> extends Subscriber<T> {
buffer.push(value);
if (buffer.length === bufferSize) {
buffers.splice(i, 1);
destination.next(buffer);
this.destination.next(buffer);
}
}
}

protected _complete() {
const destination = this.destination;
const buffers = this.buffers;
protected _complete(): void {
const { buffers, destination } = this;

while (buffers.length > 0) {
let buffer = buffers.shift();
if (buffer.length > 0) {
Expand All @@ -99,4 +138,5 @@ class BufferCountSubscriber<T> extends Subscriber<T> {
}
super._complete();
}

}

0 comments on commit 28d0883

Please sign in to comment.