Skip to content

Commit 5b46247

Browse files
authored
chore: split structure (#558)
1 parent db2226b commit 5b46247

17 files changed

+369
-278
lines changed

package.json

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
"name": "piscina",
33
"version": "4.5.0",
44
"description": "A fast, efficient Node.js Worker Thread Pool implementation",
5-
"main": "./dist/src/main.js",
5+
"main": "./dist/main.js",
6+
"types": "./dist/index.d.ts",
67
"exports": {
7-
"types": "./dist/src/index.d.ts",
8+
"types": "./dist/index.d.ts",
89
"import": "./dist/esm-wrapper.mjs",
9-
"require": "./dist/src/main.js"
10+
"require": "./dist/main.js"
1011
},
11-
"types": "./dist/src/index.d.ts",
1212
"scripts": {
1313
"build": "tsc && gen-esm-wrapper . dist/esm-wrapper.mjs",
1414
"lint": "standardx \"**/*.{ts,mjs,js,cjs}\" | snazzy",

src/abort.ts

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
interface AbortSignalEventTargetAddOptions {
2+
once: boolean;
3+
}
4+
5+
export interface AbortSignalEventTarget {
6+
addEventListener: (
7+
name: 'abort',
8+
listener: () => void,
9+
options?: AbortSignalEventTargetAddOptions
10+
) => void;
11+
removeEventListener: (name: 'abort', listener: () => void) => void;
12+
aborted?: boolean;
13+
reason?: unknown;
14+
}
15+
16+
export interface AbortSignalEventEmitter {
17+
off: (name: 'abort', listener: () => void) => void;
18+
once: (name: 'abort', listener: () => void) => void;
19+
}
20+
21+
export type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;
22+
23+
export class AbortError extends Error {
24+
constructor (reason?: AbortSignalEventTarget['reason']) {
25+
// TS does not recognizes the cause clause
26+
// @ts-expect-error
27+
super('The task has been aborted', { cause: reason });
28+
}
29+
30+
get name () {
31+
return 'AbortError';
32+
}
33+
}
34+
35+
export function onabort (abortSignal: AbortSignalAny, listener: () => void) {
36+
if ('addEventListener' in abortSignal) {
37+
abortSignal.addEventListener('abort', listener, { once: true });
38+
} else {
39+
abortSignal.once('abort', listener);
40+
}
41+
}

src/common.ts

+67-70
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,42 @@
1-
import type { MessagePort } from 'worker_threads';
1+
import type { Histogram } from 'node:perf_hooks';
2+
import { fileURLToPath, URL } from 'node:url';
23

3-
export const READY = '_WORKER_READY';
4-
5-
export interface StartupMessage {
6-
filename : string | null;
7-
name : string;
8-
port : MessagePort;
9-
sharedBuffer : Int32Array;
10-
useAtomics : boolean;
11-
niceIncrement : number;
12-
}
4+
import type { HistogramSummary } from './types';
5+
import { kMovable, kTransferable, kValue } from './symbols';
136

14-
export interface RequestMessage {
15-
taskId : number;
16-
task : any;
17-
filename: string;
18-
name : string;
19-
}
20-
21-
export interface ReadyMessage {
22-
[READY]: true
23-
};
24-
25-
export interface ResponseMessage {
26-
taskId : number;
27-
result : any;
28-
error: Error | null;
29-
}
30-
export const commonState = {
31-
isWorkerThread: false,
32-
workerData: undefined
33-
};
34-
35-
// Internal symbol used to mark Transferable objects returned
36-
// by the Piscina.move() function
37-
const kMovable = Symbol('Piscina.kMovable');
38-
export const kTransferable = Symbol.for('Piscina.transferable');
39-
export const kValue = Symbol.for('Piscina.valueOf');
40-
export const kQueueOptions = Symbol.for('Piscina.queueOptions');
7+
// States wether the worker is ready to receive tasks
8+
export const READY = '_WORKER_READY';
419

42-
// True if the object implements the Transferable interface
43-
export function isTransferable (value : any) : boolean {
44-
return value != null &&
45-
typeof value === 'object' &&
46-
kTransferable in value &&
47-
kValue in value;
10+
/**
11+
* True if the object implements the Transferable interface
12+
*
13+
* @export
14+
* @param {unknown} value
15+
* @return {*} {boolean}
16+
*/
17+
export function isTransferable (value: unknown): boolean {
18+
return (
19+
value != null &&
20+
typeof value === 'object' &&
21+
kTransferable in value &&
22+
kValue in value
23+
);
4824
}
4925

50-
// True if object implements Transferable and has been returned
51-
// by the Piscina.move() function
52-
export function isMovable (value : any) : boolean {
26+
/**
27+
* True if object implements Transferable and has been returned
28+
* by the Piscina.move() function
29+
*
30+
* TODO: narrow down the type of value
31+
* @export
32+
* @param {(unknown & PiscinaMovable)} value
33+
* @return {*} {boolean}
34+
*/
35+
export function isMovable (value: any): boolean {
5336
return isTransferable(value) && value[kMovable] === true;
5437
}
5538

56-
export function markMovable (value : object) : void {
39+
export function markMovable (value: {}): void {
5740
Object.defineProperty(value, kMovable, {
5841
enumerable: false,
5942
configurable: true,
@@ -62,31 +45,45 @@ export function markMovable (value : object) : void {
6245
});
6346
}
6447

65-
export interface Transferable {
66-
readonly [kTransferable] : object;
67-
readonly [kValue] : object;
68-
}
48+
// State of Piscina pool
49+
export const commonState = {
50+
isWorkerThread: false,
51+
workerData: undefined
52+
};
6953

70-
export interface Task {
71-
readonly [kQueueOptions] : object | null;
72-
}
54+
export function createHistogramSummary (histogram: Histogram): HistogramSummary {
55+
const { mean, stddev, min, max } = histogram;
7356

74-
export interface TaskQueue {
75-
readonly size : number;
76-
shift () : Task | null;
77-
remove (task : Task) : void;
78-
push (task : Task) : void;
57+
return {
58+
average: mean / 1000,
59+
mean: mean / 1000,
60+
stddev,
61+
min: min / 1000,
62+
max: max / 1000,
63+
p0_001: histogram.percentile(0.001) / 1000,
64+
p0_01: histogram.percentile(0.01) / 1000,
65+
p0_1: histogram.percentile(0.1) / 1000,
66+
p1: histogram.percentile(1) / 1000,
67+
p2_5: histogram.percentile(2.5) / 1000,
68+
p10: histogram.percentile(10) / 1000,
69+
p25: histogram.percentile(25) / 1000,
70+
p50: histogram.percentile(50) / 1000,
71+
p75: histogram.percentile(75) / 1000,
72+
p90: histogram.percentile(90) / 1000,
73+
p97_5: histogram.percentile(97.5) / 1000,
74+
p99: histogram.percentile(99) / 1000,
75+
p99_9: histogram.percentile(99.9) / 1000,
76+
p99_99: histogram.percentile(99.99) / 1000,
77+
p99_999: histogram.percentile(99.999) / 1000
78+
};
7979
}
8080

81-
export function isTaskQueue (value : any) : boolean {
82-
return typeof value === 'object' &&
83-
value !== null &&
84-
'size' in value &&
85-
typeof value.shift === 'function' &&
86-
typeof value.remove === 'function' &&
87-
typeof value.push === 'function';
81+
export function toHistogramIntegerNano (milliseconds: number): number {
82+
return Math.max(1, Math.trunc(milliseconds * 1000));
8883
}
8984

90-
export const kRequestCountField = 0;
91-
export const kResponseCountField = 1;
92-
export const kFieldCount = 2;
85+
export function maybeFileURLToPath (filename : string) : string {
86+
return filename.startsWith('file:')
87+
? fileURLToPath(new URL(filename))
88+
: filename;
89+
}

src/fixed-queue.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
* Source: https://github.com/nodejs/node/blob/de7b37880f5a541d5f874c1c2362a65a4be76cd0/lib/internal/fixed_queue.js
55
*/
66
import assert from 'node:assert';
7-
import { TaskQueue, Task } from './common';
7+
import { TaskQueue } from './task_queue';
8+
import { Task } from './types';
89
// Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two.
910
const kSize = 2048;
1011
const kMask = kSize - 1;

0 commit comments

Comments
 (0)