From 28d08834cb361665ce62f38058c7987e8903c997 Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Mon, 3 Apr 2017 21:50:43 +0200 Subject: [PATCH] perf(bufferCount): optimize bufferCount operator (#2359) * perf(bufferCount): add perf tests for buffercount with skip param * perf(bufferCount): optimize bufferCount operator * refactor(bufferCount): fix types * refactor(bufferCount): refactored as requested by @jayphelps * refactor(bufferCount): define destination as const --- .../operators/buffercount-skip.js | 18 ++++++ .../operators/buffercount-skip.js | 18 ++++++ src/operator/bufferCount.ts | 62 +++++++++++++++---- 3 files changed, 87 insertions(+), 11 deletions(-) create mode 100644 perf/micro/current-thread-scheduler/operators/buffercount-skip.js create mode 100644 perf/micro/immediate-scheduler/operators/buffercount-skip.js diff --git a/perf/micro/current-thread-scheduler/operators/buffercount-skip.js b/perf/micro/current-thread-scheduler/operators/buffercount-skip.js new file mode 100644 index 0000000000..1b621d7f00 --- /dev/null +++ b/perf/micro/current-thread-scheduler/operators/buffercount-skip.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldBufferCountWithCurrentThreadScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread).bufferWithCount(5, 3); + var newBufferCountWithCurrentThreadScheduler = RxNew.Observable.range(0, 25, RxNew.Scheduler.queue).bufferCount(5, 3); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old bufferCount with current thread scheduler', function () { + oldBufferCountWithCurrentThreadScheduler.subscribe(_next, _error, _complete); + }) + .add('new bufferCount with current thread scheduler', function () { + newBufferCountWithCurrentThreadScheduler.subscribe(_next, _error, _complete); + }); +}; \ No newline at end of file diff --git a/perf/micro/immediate-scheduler/operators/buffercount-skip.js b/perf/micro/immediate-scheduler/operators/buffercount-skip.js new file mode 100644 index 0000000000..ac78df077b --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/buffercount-skip.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldBufferCountWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).bufferWithCount(5, 3); + var newBufferCountWithImmediateScheduler = RxNew.Observable.range(0, 25).bufferCount(5, 3); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old bufferCount with immediate scheduler', function () { + oldBufferCountWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new bufferCount with immediate scheduler', function () { + newBufferCountWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; \ No newline at end of file diff --git a/src/operator/bufferCount.ts b/src/operator/bufferCount.ts index 7b4f2e4be8..f2a79089d5 100644 --- a/src/operator/bufferCount.ts +++ b/src/operator/bufferCount.ts @@ -1,6 +1,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; +import { TeardownLogic } from '../Subscription'; /** * Buffers the source Observable values until the size hits the maximum @@ -48,11 +49,18 @@ export function bufferCount(this: Observable, bufferSize: number, startBuf } class BufferCountOperator implements Operator { + private subscriberClass: any; + constructor(private bufferSize: number, private startBufferEvery: number) { + if (!startBufferEvery || bufferSize === startBufferEvery) { + this.subscriberClass = BufferCountSubscriber; + } else { + this.subscriberClass = BufferSkipCountSubscriber; + } } - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new BufferCountSubscriber(subscriber, this.bufferSize, this.startBufferEvery)); + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new this.subscriberClass(subscriber, this.bufferSize, this.startBufferEvery)); } } @@ -62,6 +70,38 @@ class BufferCountOperator implements Operator { * @extends {Ignored} */ class BufferCountSubscriber extends Subscriber { + private buffer: T[] = []; + + constructor(destination: Subscriber, private bufferSize: number) { + super(destination); + } + + protected _next(value: T): void { + const buffer = this.buffer; + + buffer.push(value); + + if (buffer.length == this.bufferSize) { + this.destination.next(buffer); + this.buffer = []; + } + } + + protected _complete(): void { + const buffer = this.buffer; + if (buffer.length > 0) { + this.destination.next(buffer); + } + super._complete(); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class BufferSkipCountSubscriber extends Subscriber { private buffers: Array = []; private count: number = 0; @@ -69,12 +109,11 @@ class BufferCountSubscriber extends Subscriber { super(destination); } - protected _next(value: T) { - const count = this.count++; - const { destination, bufferSize, startBufferEvery, buffers } = this; - const startOn = (startBufferEvery == null) ? bufferSize : startBufferEvery; + protected _next(value: T): void { + const { bufferSize, startBufferEvery, buffers, count } = this; - if (count % startOn === 0) { + this.count++; + if (count % startBufferEvery === 0) { buffers.push([]); } @@ -83,14 +122,14 @@ class BufferCountSubscriber extends Subscriber { buffer.push(value); if (buffer.length === bufferSize) { buffers.splice(i, 1); - destination.next(buffer); + this.destination.next(buffer); } } } - protected _complete() { - const destination = this.destination; - const buffers = this.buffers; + protected _complete(): void { + const { buffers, destination } = this; + while (buffers.length > 0) { let buffer = buffers.shift(); if (buffer.length > 0) { @@ -99,4 +138,5 @@ class BufferCountSubscriber extends Subscriber { } super._complete(); } + }