Skip to content

Commit

Permalink
feat(#305)!: allow to reset histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 committed Jan 5, 2025
1 parent 604ccc8 commit e5c3b18
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 126 deletions.
33 changes: 0 additions & 33 deletions src/common.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import type { Histogram } from 'node:perf_hooks';
import { fileURLToPath, URL } from 'node:url';
import { availableParallelism } from 'node:os';

import type { HistogramSummary } from './types';
import { kMovable, kTransferable, kValue } from './symbols';

// States wether the worker is ready to receive tasks
Expand Down Expand Up @@ -52,37 +50,6 @@ export const commonState = {
workerData: undefined
};

export function createHistogramSummary (histogram: Histogram): HistogramSummary {
const { mean, stddev, min, max } = histogram;

return {
average: mean / 1000,
mean: mean / 1000,
stddev,
min: min / 1000,
max: max / 1000,
p0_001: histogram.percentile(0.001) / 1000,
p0_01: histogram.percentile(0.01) / 1000,
p0_1: histogram.percentile(0.1) / 1000,
p1: histogram.percentile(1) / 1000,
p2_5: histogram.percentile(2.5) / 1000,
p10: histogram.percentile(10) / 1000,
p25: histogram.percentile(25) / 1000,
p50: histogram.percentile(50) / 1000,
p75: histogram.percentile(75) / 1000,
p90: histogram.percentile(90) / 1000,
p97_5: histogram.percentile(97.5) / 1000,
p99: histogram.percentile(99) / 1000,
p99_9: histogram.percentile(99.9) / 1000,
p99_99: histogram.percentile(99.99) / 1000,
p99_999: histogram.percentile(99.999) / 1000
};
}

export function toHistogramIntegerNano (milliseconds: number): number {
return Math.max(1, Math.trunc(milliseconds * 1000));
}

export function maybeFileURLToPath (filename : string) : string {
return filename.startsWith('file:')
? fileURLToPath(new URL(filename))
Expand Down
108 changes: 108 additions & 0 deletions src/histogram.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { RecordableHistogram, createHistogram } from 'node:perf_hooks';

export type PiscinaHistogramSummary = {
average: number;
mean: number;
stddev: number;
min: number;
max: number;
p0_001: number;
p0_01: number;
p0_1: number;
p1: number;
p2_5: number;
p10: number;
p25: number;
p50: number;
p75: number;
p90: number;
p97_5: number;
p99: number;
p99_9: number;
p99_99: number;
p99_999: number;
};

export type PiscinaHistogram = {
runTime: PiscinaHistogramSummary;
waitTime: PiscinaHistogramSummary;
resetRunTime(): void;
resetWaitTime(): void;
};

export class PiscinaHistogramHandler {
#runTime: RecordableHistogram;
#waitTime: RecordableHistogram;

constructor() {
this.#runTime = createHistogram();
this.#waitTime = createHistogram();
}

get runTimeSummary(): PiscinaHistogramSummary {
return PiscinaHistogramHandler.createHistogramSummary(this.#runTime);
}

get waitTimeSummary(): PiscinaHistogramSummary {
return PiscinaHistogramHandler.createHistogramSummary(this.#waitTime);
}

get runTimeCount(): number {
return this.#runTime.count;
}

get waitTimeCount(): number {
return this.#waitTime.count;
}

recordRunTime(value: number) {
this.#runTime.record(PiscinaHistogramHandler.toHistogramIntegerNano(value));
}

recordWaitTime(value: number) {
this.#waitTime.record(
PiscinaHistogramHandler.toHistogramIntegerNano(value)
);
}

resetWaitTime(): void {
this.#waitTime.reset();
}

resetRunTime(): void {
this.#runTime.reset();
}

static createHistogramSummary(
histogram: RecordableHistogram
): PiscinaHistogramSummary {
const { mean, stddev, min, max } = histogram;

return {
average: mean / 1000,
mean: mean / 1000,
stddev,
min: min / 1000,
max: max / 1000,
p0_001: histogram.percentile(0.001) / 1000,
p0_01: histogram.percentile(0.01) / 1000,
p0_1: histogram.percentile(0.1) / 1000,
p1: histogram.percentile(1) / 1000,
p2_5: histogram.percentile(2.5) / 1000,
p10: histogram.percentile(10) / 1000,
p25: histogram.percentile(25) / 1000,
p50: histogram.percentile(50) / 1000,
p75: histogram.percentile(75) / 1000,
p90: histogram.percentile(90) / 1000,
p97_5: histogram.percentile(97.5) / 1000,
p99: histogram.percentile(99) / 1000,
p99_9: histogram.percentile(99.9) / 1000,
p99_99: histogram.percentile(99.99) / 1000,
p99_999: histogram.percentile(99.999) / 1000,
};
}

static toHistogramIntegerNano(milliseconds: number): number {
return Math.max(1, Math.trunc(milliseconds * 1000));
}
}
71 changes: 41 additions & 30 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Worker, MessageChannel, MessagePort } from 'node:worker_threads';
import { once, EventEmitterAsyncResource } from 'node:events';
import { resolve } from 'node:path';
import { inspect, types } from 'node:util';
import { RecordableHistogram, createHistogram, performance } from 'node:perf_hooks';
import { performance } from 'node:perf_hooks';
import { setTimeout as sleep } from 'node:timers/promises';
import assert from 'node:assert';

Expand All @@ -13,7 +13,6 @@ import type {
Transferable,
ResourceLimits,
EnvSpecifier,
HistogramSummary
} from './types';
import {
kQueueOptions,
Expand Down Expand Up @@ -44,14 +43,16 @@ import {
AbortError,
onabort
} from './abort';
import {
PiscinaHistogram,
PiscinaHistogramHandler,
} from './histogram';
import { Errors } from './errors';
import {
READY,
commonState,
isTransferable,
markMovable,
createHistogramSummary,
toHistogramIntegerNano,
getAvailableParallelism,
maybeFileURLToPath
} from './common';
Expand Down Expand Up @@ -171,8 +172,7 @@ class ThreadPool {
taskQueue : TaskQueue;
skipQueue : TaskInfo[] = [];
completed : number = 0;
runTime? : RecordableHistogram;
waitTime? : RecordableHistogram;
histogram: PiscinaHistogramHandler | null = null;
_needsDrain : boolean;
start : number = performance.now();
inProcessPendingMessages : boolean = false;
Expand All @@ -192,8 +192,7 @@ class ThreadPool {
this.options = { ...kDefaultOptions, ...options, filename, maxQueue: 0 };

if (this.options.recordTiming) {
this.runTime = createHistogram();
this.waitTime = createHistogram();
this.histogram = new PiscinaHistogramHandler();
}

// The >= and <= could be > and < but this way we get 100 % coverage 🙃
Expand Down Expand Up @@ -458,7 +457,7 @@ class ThreadPool {
// Seeking for a real worker instead of customized one
if (candidate != null && candidate[kWorkerData] != null) {
const now = performance.now();
this.waitTime?.record(toHistogramIntegerNano(now - task.created));
this.histogram?.recordWaitTime(now - task.created)
task.started = now;
candidate[kWorkerData].postTask(task);
this._maybeDrain();
Expand Down Expand Up @@ -518,7 +517,7 @@ class ThreadPool {
(err : Error | null, result : any) => {
this.completed++;
if (taskInfo.started) {
this.runTime?.record(toHistogramIntegerNano(performance.now() - taskInfo.started));
this.histogram?.recordRunTime(performance.now() - taskInfo.started);
}
if (err !== null) {
reject(err);
Expand Down Expand Up @@ -718,6 +717,7 @@ class ThreadPool {

export default class Piscina<T = any, R = any> extends EventEmitterAsyncResource {
#pool : ThreadPool;
#histogram: PiscinaHistogram | null = null;

constructor (options : Options = {}) {
super({ ...options, name: 'Piscina' });
Expand Down Expand Up @@ -867,34 +867,45 @@ export default class Piscina<T = any, R = any> extends EventEmitterAsyncResource
return this.#pool.completed;
}

get waitTime () : HistogramSummary | null {
if (!this.#pool.waitTime) {
return null;
}

return createHistogramSummary(this.#pool.waitTime);
}

get runTime () : any {
if (!this.#pool.runTime) {
return null;
}
get histogram () : PiscinaHistogram {
if (this.#histogram == null) {
const piscinahistogram = {
// @ts-expect-error
get runTime() { return this.histogram?.runTimeSummary! },
// @ts-expect-error
get waitTime() { return this.histogram?.waitTimeSummary! },
resetRunTime() {
// @ts-expect-error
this.histogram?.resetRunTime()
},
resetWaitTime() {
// @ts-expect-error
this.histogram?.resetWaitTime()
},
}

Object.defineProperty(piscinahistogram, 'histogram', {
value: this.#pool.histogram,
writable: false,
enumerable: false,
configurable: false,
})

this.#histogram = piscinahistogram;
};

return createHistogramSummary(this.#pool.runTime);
return this.#histogram;
}

get utilization () : number {
if (!this.#pool.runTime) {
if (this.#pool.histogram == null) {
return 0;
}

// count is available as of Node.js v16.14.0 but not present in the types
const count = (this.#pool.runTime as RecordableHistogram & { count: number }).count;
if (count === 0) {
return 0;
}
const count = this.#pool.histogram.runTimeCount;

if (!this.#pool.runTime) {
if (count === 0) {
return 0;
}

Expand All @@ -903,7 +914,7 @@ export default class Piscina<T = any, R = any> extends EventEmitterAsyncResource
// of time the pool has been running multiplied by the
// maximum number of threads.
const capacity = this.duration * this.#pool.options.maxThreads;
const totalMeanRuntime = (this.#pool.runTime.mean / 1000) * count;
const totalMeanRuntime = (this.#pool.histogram.runTimeSummary.mean / 1000) * count;

// We calculate the appoximate pool utilization by multiplying
// the mean run time of all tasks by the number of runtime
Expand Down
23 changes: 0 additions & 23 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,6 @@ export interface Transferable {
readonly [kValue]: object;
}

export interface HistogramSummary {
average: number;
mean: number;
stddev: number;
min: number;
max: number;
p0_001: number;
p0_01: number;
p0_1: number;
p1: number;
p2_5: number;
p10: number;
p25: number;
p50: number;
p75: number;
p90: number;
p97_5: number;
p99: number;
p99_9: number;
p99_99: number;
p99_999: number;
}

export type ResourceLimits = Worker extends {
resourceLimits?: infer T;
}
Expand Down
10 changes: 5 additions & 5 deletions src/worker_pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { Worker, MessagePort, receiveMessageOnPort } from 'node:worker_threads';
import { createHistogram, RecordableHistogram } from 'node:perf_hooks';
import assert from 'node:assert';

import { HistogramSummary, RequestMessage, ResponseMessage } from '../types';
import { RequestMessage, ResponseMessage } from '../types';
import { Errors } from '../errors';

import { TaskInfo } from '../task_queue';
import { kFieldCount, kRequestCountField, kResponseCountField, kWorkerData } from '../symbols';
import { createHistogramSummary, toHistogramIntegerNano } from '../common';
import { PiscinaHistogramHandler, PiscinaHistogramSummary } from '../histogram';

import { AsynchronouslyCreatedResource, AsynchronouslyCreatedResourcePool } from './base';
export * from './balancer';
Expand All @@ -18,7 +18,7 @@ export type PiscinaWorker = {
id: number;
currentUsage: number;
isRunningAbortableTask: boolean;
histogram: HistogramSummary | null;
histogram: PiscinaHistogramSummary | null;
terminating: boolean;
destroyed: boolean;
[kWorkerData]: WorkerInfo;
Expand Down Expand Up @@ -96,7 +96,7 @@ export class WorkerInfo extends AsynchronouslyCreatedResource {

_handleResponse (message : ResponseMessage) : void {
if (message.time != null) {
this.histogram?.record(toHistogramIntegerNano(message.time));
this.histogram?.record(PiscinaHistogramHandler.toHistogramIntegerNano(message.time));
}

this.onMessage(message);
Expand Down Expand Up @@ -184,7 +184,7 @@ export class WorkerInfo extends AsynchronouslyCreatedResource {
return worker.isRunningAbortableTask();
},
get histogram () {
return worker.histogram != null ? createHistogramSummary(worker.histogram) : null;
return worker.histogram != null ? PiscinaHistogramHandler.createHistogramSummary(worker.histogram) : null;
},
get terminating () {
return worker.terminating;
Expand Down
Loading

0 comments on commit e5c3b18

Please sign in to comment.