Skip to content

Commit

Permalink
fix(core): shutdown plugin workers when disconnected (#28857)
Browse files Browse the repository at this point in the history
Currently we send a message to the plugin to shutdown when the parent
process either dies or is ready for them to shutdown. This works, but
fails when the host process is killed with a sigterm as the cleanup
can't run. This PR shifts the strategy such that whenever the one and
only socket connection ends, the plugin automatically shuts down
  • Loading branch information
AgentEnder authored Nov 8, 2024
1 parent 7f39dc1 commit 0d7f226
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 53 deletions.
10 changes: 0 additions & 10 deletions packages/nx/src/project-graph/plugins/internal-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {
isAggregateCreateNodesError,
} from '../error-types';
import { IS_WASM } from '../../native';
import { output } from '../../utils/output';

export class LoadedNxPlugin {
readonly name: string;
Expand Down Expand Up @@ -127,15 +126,6 @@ export type CreateNodesResultWithContext = CreateNodesResult & {
pluginName: string;
};

// Short lived cache (cleared between cmd runs)
// holding resolved nx plugin objects.
// Allows loaded plugins to not be reloaded when
// referenced multiple times.
export const nxPluginCache: Map<
unknown,
[Promise<LoadedNxPlugin>, () => void]
> = new Map();

function isIsolationEnabled() {
// Explicitly enabled, regardless of further conditions
if (process.env.NX_ISOLATE_PLUGINS === 'true') {
Expand Down
6 changes: 0 additions & 6 deletions packages/nx/src/project-graph/plugins/isolation/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,8 @@ export interface PluginCreateMetadataResult {
};
}

export interface PluginWorkerShutdownMessage {
type: 'shutdown';
payload: {};
}

export type PluginWorkerMessage =
| PluginWorkerLoadMessage
| PluginWorkerShutdownMessage
| PluginWorkerCreateNodesMessage
| PluginCreateDependenciesMessage
| PluginCreateMetadataMessage;
Expand Down
24 changes: 1 addition & 23 deletions packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import { PluginConfiguration } from '../../../config/nx-json';
// TODO (@AgentEnder): After scoped verbose logging is implemented, re-add verbose logs here.
// import { logger } from '../../utils/logger';

import { LoadedNxPlugin, nxPluginCache } from '../internal-api';
import { LoadedNxPlugin } from '../internal-api';
import { getPluginOsSocketPath } from '../../../daemon/socket-utils';
import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket';
import { signalToCode } from '../../../utils/exit-codes';

import {
consumeMessage,
Expand Down Expand Up @@ -61,7 +60,6 @@ export async function loadRemoteNxPlugin(

const cleanupFunction = () => {
worker.off('exit', exitHandler);
shutdownPluginWorker(socket);
socket.destroy();
nxPluginWorkerCache.delete(cacheKey);
};
Expand Down Expand Up @@ -108,10 +106,6 @@ export async function loadRemoteNxPlugin(
return [pluginPromise, cleanupFunction];
}

function shutdownPluginWorker(socket: Socket) {
sendMessageOverSocket(socket, { type: 'shutdown', payload: {} });
}

/**
* Creates a message handler for the given worker.
* @param worker Instance of plugin-worker
Expand Down Expand Up @@ -260,22 +254,6 @@ function createWorkerExitHandler(
};
}

let cleanedUp = false;
const exitHandler = () => {
nxPluginCache.clear();
for (const fn of cleanupFunctions) {
fn();
}
cleanedUp = true;
};

process.on('exit', exitHandler);
process.on('SIGINT', () => {
exitHandler();
process.exit(signalToCode('SIGINT'));
});
process.on('SIGTERM', exitHandler);

function registerPendingPromise(
tx: string,
pending: Map<string, PendingPromise>,
Expand Down
30 changes: 16 additions & 14 deletions packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,6 @@ const server = createServer((socket) => {
};
}
},
shutdown: async () => {
// Stops accepting new connections, but existing connections are
// not closed immediately.
server.close(() => {
try {
unlinkSync(socketPath);
} catch (e) {}
process.exit(0);
});
// Closes existing connection.
socket.end();
// Destroys the socket once it's fully closed.
socket.destroySoon();
},
createNodes: async ({ configFiles, context, tx }) => {
try {
const result = await plugin.createNodes[1](configFiles, context);
Expand Down Expand Up @@ -129,6 +115,22 @@ const server = createServer((socket) => {
});
})
);

// There should only ever be one host -> worker connection
// since the worker is spawned per host process. As such,
// we can safely close the worker when the host disconnects.
socket.on('end', () => {
// Stops accepting new connections, but existing connections are
// not closed immediately.
server.close(() => {
try {
unlinkSync(socketPath);
} catch (e) {}
process.exit(0);
});
// Destroys the socket once it's fully closed.
socket.destroySoon();
});
});

server.listen(socketPath);
Expand Down

0 comments on commit 0d7f226

Please sign in to comment.