Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve synchronization between Node.js parent process and worker thread #4603

Merged
merged 11 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cirrus/nodejs.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM ${CIRRUS_AWS_ACCOUNT}.dkr.ecr.eu-central-1.amazonaws.com/base:j17-latest

USER root

ARG NODE_VERSION=16
ARG NODE_VERSION=20

RUN curl -fsSL https://deb.nodesource.com/setup_${NODE_VERSION}.x | bash - \
&& apt-get install -y nodejs=${NODE_VERSION}.* \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,5 @@ void test() throws Exception {
new String(Files.readAllBytes(Paths.get("target/differences")), StandardCharsets.UTF_8)
)
.isEmpty();
// assertPerfMonitoringAvailable(perfMonitoringDir);
}
}
62 changes: 38 additions & 24 deletions packages/bridge/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ export function start(
port = 0,
host = '127.0.0.1',
timeout = SHUTDOWN_TIMEOUT,
): Promise<http.Server> {
): Promise<{ server: http.Server; serverClosed: Promise<void>; worker: Worker }> {
const pendingCloseRequests: express.Response[] = [];
let resolveClosed: () => void;
const serverClosed: Promise<void> = new Promise(resolve => {
resolveClosed = resolve;
});

logMemoryConfiguration();
if (getContext().debugMemory) {
registerGarbageCollectionObserver();
Expand All @@ -103,23 +109,15 @@ export function start(

worker.on('exit', code => {
debug(`The worker thread exited with code ${code}`);
closeServer();
});

worker.on('error', err => {
debug(`The worker thread failed: ${err}`);

debug('Shutting down the bridge server due to failure');
logMemoryError(err);

/**
* At this point, the worker thread can no longer respond to any request from the plugin.
* However, existing requests are stalled until they time out. Since the bridge server is
* about to be shut down in an unexpected manner anyway, we can close all connections and
* avoid waiting unnecessarily for them to eventually close.
*/
server.closeAllConnections();

debug('Shutting down the bridge server due to failure');
shutdown();
closeServer();
});

const app = express();
Expand All @@ -130,9 +128,7 @@ export function start(
* in case the process becomes orphan.
*/
const orphanTimeout = timeoutMiddleware(() => {
if (server.listening) {
shutdown();
}
closeWorker();
}, timeout);

/**
Expand All @@ -145,15 +141,14 @@ export function start(
app.use(errorMiddleware);

app.post('/close', (_: express.Request, response: express.Response) => {
debug('Shutting down the bridge server');
response.end(() => {
shutdown();
});
pendingCloseRequests.push(response);
closeWorker();
});

server.on('close', () => {
debug('The bridge server shut down');
orphanTimeout.stop();
orphanTimeout.cancel();
resolveClosed();
});

server.on('error', err => {
Expand All @@ -166,17 +161,36 @@ export function start(
* which we get using server.address().
*/
debug(`The bridge server is listening on port ${(server.address() as AddressInfo)?.port}`);
resolve(server);
resolve({ server, serverClosed, worker });
});

server.listen(port, host);

/**
* Shutdown the server and the worker thread
*/
function shutdown() {
worker.terminate().catch(reason => debug(`Failed to terminate the worker thread: ${reason}`));
server.close();
function closeWorker() {
debug('Shutting down the worker');
worker.postMessage({ type: 'close' });
}

/**
* Shutdown the server and the worker thread
*/
function closeServer() {
if (server.listening) {
while (pendingCloseRequests.length) {
pendingCloseRequests.pop()?.end();
}
/**
* At this point, the worker thread can no longer respond to any request from the plugin.
* If we reached this due to worker failure, existing requests are stalled until they time out.
* Since the bridge server is about to be shut down in an unexpected manner anyway, we can
* close all connections and avoid waiting unnecessarily for them to eventually close.
*/
server.closeAllConnections();
server.close();
}
}
});
}
14 changes: 9 additions & 5 deletions packages/bridge/src/timeout/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,21 @@ import Timeout from './timeout';
export function timeoutMiddleware(f: () => void, delay: number) {
const timeout = new Timeout(f, delay);
timeout.start();
let cancelled = false;

return {
middleware(_request: express.Request, response: express.Response, next: express.NextFunction) {
timeout.stop();
if (!cancelled) {
timeout.stop();

response.on('finish', function () {
timeout.start();
});
response.on('finish', function () {
timeout.start();
});
}
next();
},
stop() {
cancel() {
cancelled = true;
timeout.stop();
},
};
Expand Down
3 changes: 3 additions & 0 deletions packages/bridge/src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ if (parentPort) {
try {
const { type, data } = message;
switch (type) {
case 'close':
parentThread.close();
break;
case 'on-analyze-css': {
await readFileLazily(data);

Expand Down
11 changes: 6 additions & 5 deletions packages/bridge/tests/router.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ import http from 'http';
import { createAndSaveProgram, ProjectAnalysisInput, RuleConfig } from '@sonar/jsts';
import path from 'path';
import { start } from '../src/server';
import { promisify } from 'util';
import { request } from './tools';
import * as fs from 'fs';

describe('router', () => {
const fixtures = path.join(__dirname, 'fixtures', 'router');
const port = 0;
let closePromise: Promise<void>;

let server: http.Server;
let close: () => Promise<void>;

beforeEach(async () => {
setContext({
Expand All @@ -41,12 +40,14 @@ describe('router', () => {
bundles: [],
});
jest.setTimeout(60 * 1000);
server = await start(port, '127.0.0.1', 60 * 60 * 1000);
close = promisify(server.close.bind(server));
const { server: serverInstance, serverClosed } = await start(port, '127.0.0.1', 60 * 60 * 1000);
server = serverInstance;
closePromise = serverClosed;
});

afterEach(async () => {
await close();
await request(server, '/close', 'POST');
await closePromise;
vdiez marked this conversation as resolved.
Show resolved Hide resolved
});

it('should route /analyze-project requests', async () => {
Expand Down
51 changes: 31 additions & 20 deletions packages/bridge/tests/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
import { start } from '../src/server';
import { promisify } from 'util';
import path from 'path';
import { setContext } from '@sonar/shared';
import { AddressInfo } from 'net';
Expand All @@ -42,8 +41,7 @@ describe('server', () => {

console.log = jest.fn();

const server = await start(undefined, undefined);
const close = promisify(server.close.bind(server));
const { server, serverClosed } = await start(undefined, undefined);

expect(server.listening).toBeTruthy();
expect(console.log).toHaveBeenCalledTimes(3);
Expand All @@ -57,14 +55,14 @@ describe('server', () => {
`DEBUG The bridge server is listening on port ${(server.address() as AddressInfo)?.port}`,
);

await close();
await request(server, '/close', 'POST');
await serverClosed;
});

it('should fail when linter is not initialized', async () => {
expect.assertions(3);

const server = await start(port);
const close = promisify(server.close.bind(server));
const { server, serverClosed } = await start(port);

const ruleId = 'no-extra-semi';
const fileType = 'MAIN';
Expand All @@ -86,15 +84,14 @@ describe('server', () => {
ruleId,
}),
);

await close();
await request(server, '/close', 'POST');
await serverClosed;
});

it('should route service requests', async () => {
expect.assertions(2);

const server = await start(port);
const close = promisify(server.close.bind(server));
const { server, serverClosed } = await start(port);

expect(server.listening).toBeTruthy();

Expand All @@ -113,27 +110,42 @@ describe('server', () => {
}),
);

await close();
await request(server, '/close', 'POST');
await serverClosed;
});

it('should shut down', async () => {
expect.assertions(2);
expect.assertions(3);

console.log = jest.fn();

const { server, serverClosed } = await start(port);
expect(server.listening).toBeTruthy();

await request(server, '/close', 'POST');

expect(server.listening).toBeFalsy();
expect(console.log).toHaveBeenCalledWith('DEBUG Shutting down the worker');
await serverClosed;
});

it('worker crashing should close server', async () => {
console.log = jest.fn();

const server = await start(port);
const { server, serverClosed, worker } = await start(port);
expect(server.listening).toBeTruthy();

const closeRequest = request(server, '/close', 'POST');
await closeRequest;
await worker.terminate();

expect(server.listening).toBeFalsy();
expect(console.log).toHaveBeenCalledWith('DEBUG Shutting down the bridge server');
expect(console.log).toHaveBeenCalledWith('DEBUG The bridge server shut down');
await serverClosed;
});

it('should timeout', async () => {
console.log = jest.fn();

const server = await start(port, '127.0.0.1', 500);
const { server, serverClosed } = await start(port, '127.0.0.1', 500);

await new Promise(r => setTimeout(r, 100));
expect(server.listening).toBeTruthy();
Expand All @@ -143,10 +155,9 @@ describe('server', () => {
expect(server.listening).toBeTruthy();
await request(server, '/status', 'GET');

await new Promise(r => setTimeout(r, 600));
expect(server.listening).toBeFalsy();

await serverClosed;
vdiez marked this conversation as resolved.
Show resolved Hide resolved
expect(console.log).toHaveBeenCalledWith('DEBUG The bridge server shut down');
expect(server.listening).toBeFalsy();
});
});

Expand Down
28 changes: 5 additions & 23 deletions packages/bridge/tests/tools/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,12 @@ import http from 'http';
/**
* Sends an HTTP request to a server's endpoint running on localhost.
*/
export function request(server: http.Server, path: string, method: string, data: any = {}) {
const options = {
host: '127.0.0.1',
path,
method,
port: (server.address() as AddressInfo).port,
export async function request(server: http.Server, path: string, method: string, body: any = {}) {
return await fetch(`http://127.0.0.1:${(server.address() as AddressInfo).port}${path}`, {
headers: {
'Content-Type': 'application/json',
},
timeout: 10000,
};

return new Promise((resolve, reject) => {
const request = http.request(options, res => {
let response = '';
res.on('data', chunk => {
response += chunk;
});

res.on('end', () => resolve(response));
});
request.on('error', reject);

request.write(JSON.stringify(data));
request.end();
});
method,
body: method !== 'GET' ? JSON.stringify(body) : undefined,
}).then(response => response.text());
}
2 changes: 1 addition & 1 deletion packages/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"target": "ES2021",
"module": "nodenext",
"moduleResolution": "nodenext",
"lib": ["ES2021"],
"lib": ["ES2021", "dom"],
vdiez marked this conversation as resolved.
Show resolved Hide resolved
"declaration": true,
"outDir": "../lib",
"strict": true,
Expand Down