Skip to content

Commit

Permalink
esm: have a single hooks thread for all workers
Browse files Browse the repository at this point in the history
  • Loading branch information
dygabo committed Apr 26, 2024
1 parent 4221631 commit 1025b03
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 55 deletions.
2 changes: 2 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ port.on('message', (message) => {
manifestSrc,
manifestURL,
publicPort,
hooksPort,
workerData,
} = message;

Expand All @@ -111,6 +112,7 @@ port.on('message', (message) => {
}

require('internal/worker').assignEnvironmentData(environmentData);
require('internal/worker').hooksPort = hooksPort;

if (SharedArrayBuffer !== undefined) {
// The counter is only passed to the workers created by the main thread,
Expand Down
111 changes: 70 additions & 41 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const {
const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors');
const { URL } = require('internal/url');
const { canParse: URLCanParse } = internalBinding('url');
const { receiveMessageOnPort } = require('worker_threads');
const { receiveMessageOnPort, isMainThread } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
Expand Down Expand Up @@ -481,6 +481,7 @@ class HooksProxy {
* The InternalWorker instance, which lets us communicate with the loader thread.
*/
#worker;
#portToHooksThread;

/**
* The last notification ID received from the worker. This is used to detect
Expand All @@ -499,26 +500,43 @@ class HooksProxy {
#isReady = false;

constructor() {
const { InternalWorker } = require('internal/worker');
const { InternalWorker, hooksPort } = require('internal/worker');
MessageChannel ??= require('internal/worker/io').MessageChannel;

const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
this.#lock = new Int32Array(lock);

this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
if (isMainThread) {
// main thread is the only one that creates the internal single hooks worker

Check failure on line 510 in lib/internal/modules/esm/hooks.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

Comments should not begin with a lowercase character
const { port1: portToHooksThread, port2: portFromHooksThread } = new MessageChannel;

Check failure on line 511 in lib/internal/modules/esm/hooks.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

'portToHooksThread' is assigned a value but never used

Check failure on line 511 in lib/internal/modules/esm/hooks.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

Missing '()' invoking a constructor
this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
registrationPort: portFromHooksThread,
lock,
},
transferList: [portFromHooksThread]

Check failure on line 521 in lib/internal/modules/esm/hooks.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

Missing trailing comma
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
this.#portToHooksThread = this.#worker;
} else {
this.#portToHooksThread = hooksPort;

Check failure on line 527 in lib/internal/modules/esm/hooks.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

Trailing spaces not allowed
}
}

waitForWorker() {
// there is one Hooks instance for each worker thread. But only one of these Hooks instances

Check failure on line 532 in lib/internal/modules/esm/hooks.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

Comments should not begin with a lowercase character
// has an InternalWorker. That was the Hooks instance created for the main thread.
// It means for all Hooks instances that are not on the main thread => they are ready because they
// delegate to the single InternalWorker anyway.
if (!isMainThread) {
return;
}

if (!this.#isReady) {
const { kIsOnline } = require('internal/worker');
if (!this.#worker[kIsOnline]) {
Expand All @@ -535,6 +553,22 @@ class HooksProxy {
}
}

#postMessageToWorker(method, type, transferList, ...args) {
this.waitForWorker();
MessageChannel ??= require('internal/worker/io').MessageChannel;
const { port1: fromHooksThread, port2: toHooksThread } = new MessageChannel();

// Pass work to the worker.
debug(`post ${type} message to worker`, { method, args, transferList });
const usedTransferList = [toHooksThread];
if (transferList) {
ArrayPrototypePushApply(usedTransferList, transferList);
}
this.#portToHooksThread.postMessage({ __proto__: null, method, args, lock: this.#lock, port: toHooksThread }, usedTransferList);

Check failure on line 567 in lib/internal/modules/esm/hooks.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

This line has a length of 132. Maximum allowed is 120

return fromHooksThread;
}

/**
* Invoke a remote method asynchronously.
* @param {string} method Method to invoke
Expand All @@ -543,22 +577,7 @@ class HooksProxy {
* @returns {Promise<any>}
*/
async makeAsyncRequest(method, transferList, ...args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;
const asyncCommChannel = new MessageChannel();

// Pass work to the worker.
debug('post async message to worker', { method, args, transferList });
const finalTransferList = [asyncCommChannel.port2];
if (transferList) {
ArrayPrototypePushApply(finalTransferList, transferList);
}
this.#worker.postMessage({
__proto__: null,
method, args,
port: asyncCommChannel.port2,
}, finalTransferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, ...args);

if (this.#numberOfPendingAsyncResponses++ === 0) {
// On the next lines, the main thread will await a response from the worker thread that might
Expand All @@ -567,7 +586,12 @@ class HooksProxy {
// However we want to keep the process alive until the worker thread responds (or until the
// event loop of the worker thread is also empty), so we ref the worker until we get all the
// responses back.
this.#worker.ref();
if (this.#worker) {
this.#worker.ref();
}

Check failure on line 591 in lib/internal/modules/esm/hooks.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

Closing curly brace does not appear on the same line as the subsequent block
else {
this.#portToHooksThread.ref();
}
}

let response;
Expand All @@ -576,18 +600,26 @@ class HooksProxy {
await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value;
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = receiveMessageOnPort(asyncCommChannel.port1);
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got async response from worker', { method, args }, this.#lock);

if (--this.#numberOfPendingAsyncResponses === 0) {
// We got all the responses from the worker, its job is done (until next time).
this.#worker.unref();
if (this.#worker) {
this.#worker.unref();
}

Check failure on line 611 in lib/internal/modules/esm/hooks.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

Closing curly brace does not appear on the same line as the subsequent block
else {
this.#portToHooksThread.unref();
}
}

if (response.message.status === 'exit') {
process.exit(response.message.body);
}

const body = this.#unwrapMessage(response);
asyncCommChannel.port1.close();
return body;
fromHooksThread.close();
return this.#unwrapMessage(response);
}

/**
Expand All @@ -598,11 +630,7 @@ class HooksProxy {
* @returns {any}
*/
makeSyncRequest(method, transferList, ...args) {
this.waitForWorker();

// Pass work to the worker.
debug('post sync message to worker', { method, args, transferList });
this.#worker.postMessage({ __proto__: null, method, args }, transferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, ...args);

let response;
do {
Expand All @@ -611,14 +639,15 @@ class HooksProxy {
AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId);
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = this.#worker.receiveMessageSync();
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got sync response from worker', { method, args });
if (response.message.status === 'never-settle') {
process.exit(kUnsettledTopLevelAwait);
} else if (response.message.status === 'exit') {
process.exit(response.message.body);
}
fromHooksThread.close();
return this.#unwrapMessage(response);
}

Expand Down
19 changes: 16 additions & 3 deletions lib/internal/modules/esm/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
const {
urlToFilename,
} = require('internal/modules/helpers');
const { isMainThread } = require('worker_threads');
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;

/**
Expand Down Expand Up @@ -594,10 +595,11 @@ class CustomizedModuleLoader {
*/
constructor() {
getHooksProxy();
_hasCustomizations = true;
}

/**
* Register some loader specifier.
* Register a loader specifier.
* @param {string} originalSpecifier The specified URL path of the loader to
* be registered.
* @param {string} parentURL The parent URL from where the loader will be
Expand All @@ -608,7 +610,11 @@ class CustomizedModuleLoader {
* @returns {{ format: string, url: URL['href'] }}
*/
register(originalSpecifier, parentURL, data, transferList) {
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
if (isMainThread) {
// only the main thread has a Hooks instance with worker thread. All other Worker threads

Check warning on line 614 in lib/internal/modules/esm/loader.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

JSDoc @returns declaration present but return expression not available in function
// delegate thier hooks to the HooksThread of the main thread.
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
}
}

/**
Expand All @@ -617,7 +623,7 @@ class CustomizedModuleLoader {
* be resolved.
* @param {string} [parentURL] The URL path of the module's parent.
* @param {ImportAttributes} importAttributes Attributes from the import
* statement or expression.
* statement or exp-ression.
* @returns {{ format: string, url: URL['href'] }}

Check failure on line 627 in lib/internal/modules/esm/loader.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

Comments should not begin with a lowercase character
*/
resolve(originalSpecifier, parentURL, importAttributes) {
Expand Down Expand Up @@ -706,6 +712,12 @@ function getHooksProxy() {
return hooksProxy;
}

let _hasCustomizations = false;
function hasCustomizations() {
return _hasCustomizations;
}


let cascadedLoader;

/**
Expand Down Expand Up @@ -767,6 +779,7 @@ function register(specifier, parentURL = undefined, options) {

module.exports = {
createModuleLoader,
hasCustomizations,
getHooksProxy,
getOrInitializeCascadedLoader,
register,
Expand Down
Loading

0 comments on commit 1025b03

Please sign in to comment.