Skip to content

Commit

Permalink
server: plugin worker cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Mar 3, 2023
1 parent 096c036 commit 445581e
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 184 deletions.
160 changes: 154 additions & 6 deletions server/src/plugin/plugin-console.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { ScryptedNativeId } from '@scrypted/types'
import { listenZero } from '../listen-zero';
import { Server } from 'net';
import { once } from 'events';
import net from 'net'
import { Readable, PassThrough } from 'stream';
import { DeviceManager, ScryptedNativeId, SystemManager } from '@scrypted/types';
import { Console } from 'console';
import { once } from 'events';
import net, { Server } from 'net';
import { PassThrough, Readable } from 'stream';
import { listenZero } from '../listen-zero';

export interface ConsoleServer {
pluginConsole: Console;
Expand All @@ -20,6 +19,155 @@ export interface StdPassThroughs {
buffers: Buffer[];
}

export function getConsole(hook: (stdout: PassThrough, stderr: PassThrough) => Promise<void>,
also?: Console, alsoPrefix?: string) {

const stdout = new PassThrough();
const stderr = new PassThrough();

hook(stdout, stderr);

const ret = new Console(stdout, stderr);

const methods = [
'log', 'warn',
'dir', 'timeLog',
'trace', 'assert',
'clear', 'count',
'countReset', 'group',
'groupEnd', 'table',
'debug', 'info',
'dirxml', 'error',
'groupCollapsed',
];

const printers = ['log', 'info', 'debug', 'trace', 'warn', 'error'];
for (const m of methods) {
const old = (ret as any)[m].bind(ret);
(ret as any)[m] = (...args: any[]) => {
// prefer the mixin version for local/remote console dump.
if (also && alsoPrefix && printers.includes(m)) {
(also as any)[m](alsoPrefix, ...args);
}
else {
(console as any)[m](...args);
}
// call through to old method to ensure it gets written
// to log buffer.
old(...args);
}
}

return ret;
}

export function prepareConsoles(consoleName: string, systemManager: () => SystemManager, deviceManager: () => DeviceManager, getPlugins: () => Promise<any>) {
const deviceConsoles = new Map<string, Console>();
function getDeviceConsole (nativeId?: ScryptedNativeId) {
// the the plugin console is simply the default console
// and gets read from stderr/stdout.
if (!nativeId)
return console;

let ret = deviceConsoles.get(nativeId);
if (ret)
return ret;

ret = getConsole(async (stdout, stderr) => {
const connect = async () => {
const plugins = await getPlugins();
const port = await plugins.getRemoteServicePort(consoleName, 'console-writer');
const socket = net.connect(port);
socket.write(nativeId + '\n');
const writer = (data: Buffer) => {
socket.write(data);
};
stdout.on('data', writer);
stderr.on('data', writer);
socket.on('error', () => {
stdout.removeAllListeners();
stderr.removeAllListeners();
stdout.pause();
stderr.pause();
setTimeout(connect, 10000);
});
};
connect();
}, undefined, undefined);

deviceConsoles.set(nativeId, ret);
return ret;
}

const mixinConsoles = new Map<string, Map<string, Console>>();

function getMixinConsole(mixinId: string, nativeId: ScryptedNativeId) {
let nativeIdConsoles = mixinConsoles.get(nativeId);
if (!nativeIdConsoles) {
nativeIdConsoles = new Map();
mixinConsoles.set(nativeId, nativeIdConsoles);
}

let ret = nativeIdConsoles.get(mixinId);
if (ret)
return ret;

ret = getConsole(async (stdout, stderr) => {
if (!mixinId) {
return;
}
const reconnect = () => {
stdout.removeAllListeners();
stderr.removeAllListeners();
stdout.pause();
stderr.pause();
setTimeout(tryConnect, 10000);
};

const connect = async () => {
const ds = deviceManager().getDeviceState(nativeId);
if (!ds) {
// deleted?
return;
}

const plugins = await getPlugins();
const { pluginId, nativeId: mixinNativeId } = await plugins.getDeviceInfo(mixinId);
const port = await plugins.getRemoteServicePort(pluginId, 'console-writer');
const socket = net.connect(port);
socket.write(mixinNativeId + '\n');
const writer = (data: Buffer) => {
let str = data.toString().trim();
str = str.replaceAll('\n', `\n[${ds.name}]: `);
str = `[${ds.name}]: ` + str + '\n';
socket.write(str);
};
stdout.on('data', writer);
stderr.on('data', writer);
socket.on('close', reconnect);
};

const tryConnect = async () => {
try {
await connect();
}
catch (e) {
reconnect();
}
}
tryConnect();
}, getDeviceConsole(nativeId), `[${systemManager().getDeviceById(mixinId)?.name}]`);

nativeIdConsoles.set(mixinId, ret);
return ret;
}

return {
getDeviceConsole,
getMixinConsole,
}
}

export async function createConsoleServer(remoteStdout: Readable, remoteStderr: Readable, header: string) {
const outputs = new Map<string, StdPassThroughs>();

Expand Down
36 changes: 36 additions & 0 deletions server/src/plugin/plugin-remote-stats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { NodeThreadWorker } from "./runtime/node-thread-worker";

export interface PluginStats {
type: 'stats',
cpu: NodeJS.CpuUsage;
memoryUsage: NodeJS.MemoryUsage;
}

export function startStatsUpdater(allMemoryStats: Map<NodeThreadWorker, NodeJS.MemoryUsage>, updateStats: (stats: PluginStats) => void) {
setInterval(() => {
const cpuUsage = process.cpuUsage();
allMemoryStats.set(undefined, process.memoryUsage());

const memoryUsage: NodeJS.MemoryUsage = {
rss: 0,
heapTotal: 0,
heapUsed: 0,
external: 0,
arrayBuffers: 0,
}

for (const mu of allMemoryStats.values()) {
memoryUsage.rss += mu.rss;
memoryUsage.heapTotal += mu.heapTotal;
memoryUsage.heapUsed += mu.heapUsed;
memoryUsage.external += mu.external;
memoryUsage.arrayBuffers += mu.arrayBuffers;
}

updateStats({
type: 'stats',
cpu: cpuUsage,
memoryUsage,
});
}, 10000);
}
Loading

0 comments on commit 445581e

Please sign in to comment.