Skip to content

Commit

Permalink
perf(ReplaySubject): slightly improved performance (#2677)
Browse files Browse the repository at this point in the history
* perf(ReplaySubject): slightly improved performance

* perf(ReplaySubject): little performance adjustments

* refactor(ReplaySubject): switch back to using splice() instead of shift()

* refactor(ReplaySubject): wrap splice() with an "if" condition for performance reasons
  • Loading branch information
martinsik authored and benlesh committed Mar 30, 2018
1 parent e21c62b commit 9fea36d
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 13 deletions.
17 changes: 17 additions & 0 deletions perf/micro/current-thread-scheduler/subject/replaysubject.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldRangeWithCurrentThreadScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread);
var newRangeWithCurrentThreadScheduler = RxNew.Observable.range(0, 25, RxNew.Scheduler.queue);

return suite
.add('old ReplaySubject with immediate scheduler', function () {
var subject = new RxOld.ReplaySubject(5, Number.POSITIVE_INFINITY, RxOld.Scheduler.currentThread);
oldRangeWithCurrentThreadScheduler.subscribe(subject);
})
.add('new ReplaySubject with immediate scheduler', function () {
var subject = new RxNew.ReplaySubject(5, Number.POSITIVE_INFINITY, RxNew.Scheduler.queue);
newRangeWithCurrentThreadScheduler.subscribe(subject);
});
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldRangeWithCurrentThreadScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread);
var newRangeWithCurrentThreadScheduler = RxNew.Observable.range(0, 25, RxNew.Scheduler.queue);

return suite
.add('old ReplaySubject with immediate scheduler', function () {
var subject = new RxOld.ReplaySubject(5, 50, RxOld.Scheduler.currentThread);
oldRangeWithCurrentThreadScheduler.subscribe(subject);
})
.add('new ReplaySubject with immediate scheduler', function () {
var subject = new RxNew.ReplaySubject(5, 50, RxNew.Scheduler.queue);
newRangeWithCurrentThreadScheduler.subscribe(subject);
});
};
17 changes: 17 additions & 0 deletions perf/micro/immediate-scheduler/subject/replaysubject.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldRangeWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate);
var newRangeWithImmediateScheduler = RxNew.Observable.range(0, 25);

return suite
.add('old ReplaySubject with immediate scheduler', function () {
var subject = new RxOld.ReplaySubject(5, Number.POSITIVE_INFINITY, RxOld.Scheduler.immediate);
oldRangeWithImmediateScheduler.subscribe(subject);
})
.add('new ReplaySubject with immediate scheduler', function () {
var subject = new RxNew.ReplaySubject(5, Number.POSITIVE_INFINITY);
newRangeWithImmediateScheduler.subscribe(subject);
});
};
17 changes: 17 additions & 0 deletions perf/micro/immediate-scheduler/subject/replaysubject_windowtime.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldRangeWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate);
var newRangeWithImmediateScheduler = RxNew.Observable.range(0, 25);

return suite
.add('old ReplaySubject with immediate scheduler', function () {
var subject = new RxOld.ReplaySubject(5, 50, RxOld.Scheduler.immediate);
oldRangeWithImmediateScheduler.subscribe(subject);
})
.add('new ReplaySubject with immediate scheduler', function () {
var subject = new RxNew.ReplaySubject(5, 50);
newRangeWithImmediateScheduler.subscribe(subject);
});
};
53 changes: 40 additions & 13 deletions src/internal/ReplaySubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,56 @@ import { SubjectSubscription } from './SubjectSubscription';
* @class ReplaySubject<T>
*/
export class ReplaySubject<T> extends Subject<T> {
private _events: ReplayEvent<T>[] = [];
private _events: (ReplayEvent<T> | T)[] = [];
private _bufferSize: number;
private _windowTime: number;
private _infiniteTimeWindow: boolean = false;

constructor(bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
private scheduler?: SchedulerLike) {
super();
this._bufferSize = bufferSize < 1 ? 1 : bufferSize;
this._windowTime = windowTime < 1 ? 1 : windowTime;

if (windowTime === Number.POSITIVE_INFINITY) {
this._infiniteTimeWindow = true;
this.next = this.nextInfiniteTimeWindow;
} else {
this.next = this.nextTimeWindow;
}
}

next(value: T): void {
const now = this._getNow();
this._events.push(new ReplayEvent(now, value));
private nextInfiniteTimeWindow(value: T): void {
const _events = this._events;
_events.push(value);
// Since this method is invoked in every next() call than the buffer
// can overgrow the max size only by one item
if (_events.length > this._bufferSize) {
_events.shift();
}

super.next(value);
}

private nextTimeWindow(value: T): void {
this._events.push(new ReplayEvent(this._getNow(), value));
this._trimBufferThenGetEvents();

super.next(value);
}

protected _subscribe(subscriber: Subscriber<T>): Subscription {
const _events = this._trimBufferThenGetEvents();
// When `_infiniteTimeWindow === true` then the buffer is already trimmed
const _infiniteTimeWindow = this._infiniteTimeWindow;
const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents();
const scheduler = this.scheduler;
const len = _events.length;
let subscription: Subscription;

if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.hasError) {
subscription = Subscription.EMPTY;
} else if (this.isStopped) {
} else if (this.isStopped || this.hasError) {
subscription = Subscription.EMPTY;
} else {
this.observers.push(subscriber);
Expand All @@ -49,9 +70,14 @@ export class ReplaySubject<T> extends Subject<T> {
subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler));
}

const len = _events.length;
for (let i = 0; i < len && !subscriber.closed; i++) {
subscriber.next(_events[i].value);
if (_infiniteTimeWindow) {
for (let i = 0; i < len && !subscriber.closed; i++) {
subscriber.next(<T>_events[i]);
}
} else {
for (let i = 0; i < len && !subscriber.closed; i++) {
subscriber.next((<ReplayEvent<T>>_events[i]).value);
}
}

if (this.hasError) {
Expand All @@ -71,9 +97,9 @@ export class ReplaySubject<T> extends Subject<T> {
const now = this._getNow();
const _bufferSize = this._bufferSize;
const _windowTime = this._windowTime;
const _events = this._events;
const _events = <ReplayEvent<T>[]>this._events;

let eventsCount = _events.length;
const eventsCount = _events.length;
let spliceCount = 0;

// Trim events that fall out of the time window.
Expand All @@ -96,6 +122,7 @@ export class ReplaySubject<T> extends Subject<T> {

return _events;
}

}

class ReplayEvent<T> {
Expand Down

0 comments on commit 9fea36d

Please sign in to comment.