diff --git a/server/src/plugin/plugin-console.ts b/server/src/plugin/plugin-console.ts index dd83342989..0f6925a8da 100644 --- a/server/src/plugin/plugin-console.ts +++ b/server/src/plugin/plugin-console.ts @@ -1,10 +1,9 @@ -import { ScryptedNativeId } from '@scrypted/types' -import { listenZero } from '../listen-zero'; -import { Server } from 'net'; -import { once } from 'events'; -import net from 'net' -import { Readable, PassThrough } from 'stream'; +import { DeviceManager, ScryptedNativeId, SystemManager } from '@scrypted/types'; import { Console } from 'console'; +import { once } from 'events'; +import net, { Server } from 'net'; +import { PassThrough, Readable } from 'stream'; +import { listenZero } from '../listen-zero'; export interface ConsoleServer { pluginConsole: Console; @@ -20,6 +19,155 @@ export interface StdPassThroughs { buffers: Buffer[]; } +export function getConsole(hook: (stdout: PassThrough, stderr: PassThrough) => Promise, + also?: Console, alsoPrefix?: string) { + + const stdout = new PassThrough(); + const stderr = new PassThrough(); + + hook(stdout, stderr); + + const ret = new Console(stdout, stderr); + + const methods = [ + 'log', 'warn', + 'dir', 'timeLog', + 'trace', 'assert', + 'clear', 'count', + 'countReset', 'group', + 'groupEnd', 'table', + 'debug', 'info', + 'dirxml', 'error', + 'groupCollapsed', + ]; + + const printers = ['log', 'info', 'debug', 'trace', 'warn', 'error']; + for (const m of methods) { + const old = (ret as any)[m].bind(ret); + (ret as any)[m] = (...args: any[]) => { + // prefer the mixin version for local/remote console dump. + if (also && alsoPrefix && printers.includes(m)) { + (also as any)[m](alsoPrefix, ...args); + } + else { + (console as any)[m](...args); + } + // call through to old method to ensure it gets written + // to log buffer. + old(...args); + } + } + + return ret; +} + +export function prepareConsoles(consoleName: string, systemManager: () => SystemManager, deviceManager: () => DeviceManager, getPlugins: () => Promise) { + const deviceConsoles = new Map(); + function getDeviceConsole (nativeId?: ScryptedNativeId) { + // the the plugin console is simply the default console + // and gets read from stderr/stdout. + if (!nativeId) + return console; + + let ret = deviceConsoles.get(nativeId); + if (ret) + return ret; + + ret = getConsole(async (stdout, stderr) => { + const connect = async () => { + const plugins = await getPlugins(); + const port = await plugins.getRemoteServicePort(consoleName, 'console-writer'); + const socket = net.connect(port); + socket.write(nativeId + '\n'); + const writer = (data: Buffer) => { + socket.write(data); + }; + stdout.on('data', writer); + stderr.on('data', writer); + socket.on('error', () => { + stdout.removeAllListeners(); + stderr.removeAllListeners(); + stdout.pause(); + stderr.pause(); + setTimeout(connect, 10000); + }); + }; + connect(); + }, undefined, undefined); + + deviceConsoles.set(nativeId, ret); + return ret; + } + + const mixinConsoles = new Map>(); + + function getMixinConsole(mixinId: string, nativeId: ScryptedNativeId) { + let nativeIdConsoles = mixinConsoles.get(nativeId); + if (!nativeIdConsoles) { + nativeIdConsoles = new Map(); + mixinConsoles.set(nativeId, nativeIdConsoles); + } + + let ret = nativeIdConsoles.get(mixinId); + if (ret) + return ret; + + ret = getConsole(async (stdout, stderr) => { + if (!mixinId) { + return; + } + const reconnect = () => { + stdout.removeAllListeners(); + stderr.removeAllListeners(); + stdout.pause(); + stderr.pause(); + setTimeout(tryConnect, 10000); + }; + + const connect = async () => { + const ds = deviceManager().getDeviceState(nativeId); + if (!ds) { + // deleted? + return; + } + + const plugins = await getPlugins(); + const { pluginId, nativeId: mixinNativeId } = await plugins.getDeviceInfo(mixinId); + const port = await plugins.getRemoteServicePort(pluginId, 'console-writer'); + const socket = net.connect(port); + socket.write(mixinNativeId + '\n'); + const writer = (data: Buffer) => { + let str = data.toString().trim(); + str = str.replaceAll('\n', `\n[${ds.name}]: `); + str = `[${ds.name}]: ` + str + '\n'; + socket.write(str); + }; + stdout.on('data', writer); + stderr.on('data', writer); + socket.on('close', reconnect); + }; + + const tryConnect = async () => { + try { + await connect(); + } + catch (e) { + reconnect(); + } + } + tryConnect(); + }, getDeviceConsole(nativeId), `[${systemManager().getDeviceById(mixinId)?.name}]`); + + nativeIdConsoles.set(mixinId, ret); + return ret; + } + + return { + getDeviceConsole, + getMixinConsole, + } +} + export async function createConsoleServer(remoteStdout: Readable, remoteStderr: Readable, header: string) { const outputs = new Map(); diff --git a/server/src/plugin/plugin-remote-stats.ts b/server/src/plugin/plugin-remote-stats.ts new file mode 100644 index 0000000000..cf1350a3d0 --- /dev/null +++ b/server/src/plugin/plugin-remote-stats.ts @@ -0,0 +1,36 @@ +import { NodeThreadWorker } from "./runtime/node-thread-worker"; + +export interface PluginStats { + type: 'stats', + cpu: NodeJS.CpuUsage; + memoryUsage: NodeJS.MemoryUsage; +} + +export function startStatsUpdater(allMemoryStats: Map, updateStats: (stats: PluginStats) => void) { + setInterval(() => { + const cpuUsage = process.cpuUsage(); + allMemoryStats.set(undefined, process.memoryUsage()); + + const memoryUsage: NodeJS.MemoryUsage = { + rss: 0, + heapTotal: 0, + heapUsed: 0, + external: 0, + arrayBuffers: 0, + } + + for (const mu of allMemoryStats.values()) { + memoryUsage.rss += mu.rss; + memoryUsage.heapTotal += mu.heapTotal; + memoryUsage.heapUsed += mu.heapUsed; + memoryUsage.external += mu.external; + memoryUsage.arrayBuffers += mu.arrayBuffers; + } + + updateStats({ + type: 'stats', + cpu: cpuUsage, + memoryUsage, + }); + }, 10000); +} \ No newline at end of file diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index 42e613c6fd..e58684f9b0 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -1,27 +1,20 @@ -import { DeviceManager, ScryptedNativeId, ScryptedStatic, SystemManager } from '@scrypted/types'; +import { ScryptedStatic, SystemManager } from '@scrypted/types'; import AdmZip from 'adm-zip'; -import { Console } from 'console'; import fs from 'fs'; import { Volume } from 'memfs'; -import net from 'net'; import path from 'path'; import { install as installSourceMapSupport } from 'source-map-support'; -import { PassThrough } from 'stream'; import { RpcMessage, RpcPeer } from '../rpc'; import { MediaManagerImpl } from './media'; import { PluginAPI, PluginAPIProxy, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api'; +import { prepareConsoles } from './plugin-console'; import { installOptionalDependencies } from './plugin-npm-dependencies'; import { attachPluginRemote, DeviceManagerImpl, PluginReader, setupPluginRemote } from './plugin-remote'; +import { PluginStats, startStatsUpdater } from './plugin-remote-stats'; import { createREPLServer } from './plugin-repl'; import { NodeThreadWorker } from './runtime/node-thread-worker'; const { link } = require('linkfs'); -interface PluginStats { - type: 'stats', - cpu: NodeJS.CpuUsage; - memoryUsage: NodeJS.MemoryUsage; -} - const serverVersion = require('../../package.json').version; export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessage, reject?: (e: Error) => void, serializationContext?: any) => void) { @@ -31,47 +24,6 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa let deviceManager: DeviceManagerImpl; let api: PluginAPI; - const getConsole = (hook: (stdout: PassThrough, stderr: PassThrough) => Promise, - also?: Console, alsoPrefix?: string) => { - - const stdout = new PassThrough(); - const stderr = new PassThrough(); - - hook(stdout, stderr); - - const ret = new Console(stdout, stderr); - - const methods = [ - 'log', 'warn', - 'dir', 'timeLog', - 'trace', 'assert', - 'clear', 'count', - 'countReset', 'group', - 'groupEnd', 'table', - 'debug', 'info', - 'dirxml', 'error', - 'groupCollapsed', - ]; - - const printers = ['log', 'info', 'debug', 'trace', 'warn', 'error']; - for (const m of methods) { - const old = (ret as any)[m].bind(ret); - (ret as any)[m] = (...args: any[]) => { - // prefer the mixin version for local/remote console dump. - if (also && alsoPrefix && printers.includes(m)) { - (also as any)[m](alsoPrefix, ...args); - } - else { - (console as any)[m](...args); - } - // call through to old method to ensure it gets written - // to log buffer. - old(...args); - } - } - - return ret; - } let pluginsPromise: Promise; function getPlugins() { @@ -80,138 +32,13 @@ export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessa return pluginsPromise; } - const deviceConsoles = new Map(); - const getDeviceConsole = (nativeId?: ScryptedNativeId) => { - // the the plugin console is simply the default console - // and gets read from stderr/stdout. - if (!nativeId) - return console; - - let ret = deviceConsoles.get(nativeId); - if (ret) - return ret; - - ret = getConsole(async (stdout, stderr) => { - const connect = async () => { - const plugins = await getPlugins(); - const port = await plugins.getRemoteServicePort(peer.selfName, 'console-writer'); - const socket = net.connect(port); - socket.write(nativeId + '\n'); - const writer = (data: Buffer) => { - socket.write(data); - }; - stdout.on('data', writer); - stderr.on('data', writer); - socket.on('error', () => { - stdout.removeAllListeners(); - stderr.removeAllListeners(); - stdout.pause(); - stderr.pause(); - setTimeout(connect, 10000); - }); - }; - connect(); - }, undefined, undefined); - - deviceConsoles.set(nativeId, ret); - return ret; - } - - const mixinConsoles = new Map>(); - - const getMixinConsole = (mixinId: string, nativeId: ScryptedNativeId) => { - let nativeIdConsoles = mixinConsoles.get(nativeId); - if (!nativeIdConsoles) { - nativeIdConsoles = new Map(); - mixinConsoles.set(nativeId, nativeIdConsoles); - } - - let ret = nativeIdConsoles.get(mixinId); - if (ret) - return ret; - - ret = getConsole(async (stdout, stderr) => { - if (!mixinId) { - return; - } - const reconnect = () => { - stdout.removeAllListeners(); - stderr.removeAllListeners(); - stdout.pause(); - stderr.pause(); - setTimeout(tryConnect, 10000); - }; - - const connect = async () => { - const ds = deviceManager.getDeviceState(nativeId); - if (!ds) { - // deleted? - return; - } - - const plugins = await getPlugins(); - const { pluginId, nativeId: mixinNativeId } = await plugins.getDeviceInfo(mixinId); - const port = await plugins.getRemoteServicePort(pluginId, 'console-writer'); - const socket = net.connect(port); - socket.write(mixinNativeId + '\n'); - const writer = (data: Buffer) => { - let str = data.toString().trim(); - str = str.replaceAll('\n', `\n[${ds.name}]: `); - str = `[${ds.name}]: ` + str + '\n'; - socket.write(str); - }; - stdout.on('data', writer); - stderr.on('data', writer); - socket.on('close', reconnect); - }; - - const tryConnect = async () => { - try { - await connect(); - } - catch (e) { - reconnect(); - } - } - tryConnect(); - }, getDeviceConsole(nativeId), `[${systemManager.getDeviceById(mixinId)?.name}]`); - - nativeIdConsoles.set(mixinId, ret); - return ret; - } + const { getDeviceConsole, getMixinConsole } = prepareConsoles(peer.selfName, () => systemManager, () => deviceManager, getPlugins); // process.cpuUsage is for the entire process. // process.memoryUsage is per thread. const allMemoryStats = new Map(); - peer.getParam('updateStats').then((updateStats: (stats: PluginStats) => void) => { - setInterval(() => { - const cpuUsage = process.cpuUsage(); - allMemoryStats.set(undefined, process.memoryUsage()); - - const memoryUsage: NodeJS.MemoryUsage = { - rss: 0, - heapTotal: 0, - heapUsed: 0, - external: 0, - arrayBuffers: 0, - } - - for (const mu of allMemoryStats.values()) { - memoryUsage.rss += mu.rss; - memoryUsage.heapTotal += mu.heapTotal; - memoryUsage.heapUsed += mu.heapUsed; - memoryUsage.external += mu.external; - memoryUsage.arrayBuffers += mu.arrayBuffers; - } - - updateStats({ - type: 'stats', - cpu: cpuUsage, - memoryUsage, - }); - }, 10000); - }); + peer.getParam('updateStats').then(updateStats => startStatsUpdater(allMemoryStats, updateStats)); let replPort: Promise;