Skip to content

Commit

Permalink
Merge branch 'main' of github.com:koush/scrypted
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Nov 9, 2023
2 parents 9edc63b + 7dec399 commit 7eca7f6
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 50 deletions.
30 changes: 12 additions & 18 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import crypto from 'crypto';
import { MediaObjectOptions, RTCConnectionManagement, RTCSignalingSession, ScryptedStatic } from "@scrypted/types";
import axios, { AxiosRequestConfig, AxiosRequestHeaders } from 'axios';
import * as eio from 'engine.io-client';
Expand Down Expand Up @@ -711,9 +710,9 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
.map(id => systemManager.getDeviceById(id))
.find(device => device.pluginId === '@scrypted/core' && device.nativeId === `user:${username}`);

const clusterPeers = new Map<number, Promise<{ clusterPeer: RpcPeer, clusterSecret: string }>>();
const ensureClusterPeer = (port: number) => {
let clusterPeerPromise = clusterPeers.get(port);
const clusterPeers = new Map<number, Promise<RpcPeer>>();
const ensureClusterPeer = (clusterObject: ClusterObject) => {
let clusterPeerPromise = clusterPeers.get(clusterObject.port);
if (!clusterPeerPromise) {
clusterPeerPromise = (async () => {
const eioPath = 'engine.io/connectRPCObject';
Expand All @@ -722,7 +721,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
path: eioEndpoint,
query: {
cacehBust,
port,
clusterObject: JSON.stringify(clusterObject),
},
withCredentials: true,
extraHeaders,
Expand All @@ -733,19 +732,15 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
const clusterPeerSocket = new eio.Socket(explicitBaseUrl, clusterPeerOptions);
let peerReady = false;
clusterPeerSocket.on('close', () => {
clusterPeers.delete(port);
clusterPeers.delete(clusterObject.port);
if (!peerReady) {
throw new Error("peer disconnected before setup completed");
}
});

try {
const clusterSecretPromise = once(clusterPeerSocket, 'message');

await once(clusterPeerSocket, 'open');

const clusterSecret = await clusterSecretPromise as any as string;

const serializer = createRpcDuplexSerializer({
write: data => clusterPeerSocket.send(data),
});
Expand All @@ -762,21 +757,21 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
serializer.setupRpcPeer(clusterPeer);
clusterPeer.tags.localPort = sourcePeerId;
peerReady = true;
return { clusterPeer, clusterSecret };
return clusterPeer;
}
catch (e) {
console.error('failure ipc connect', e);
clusterPeerSocket.close();
throw e;
}
})();
clusterPeers.set(port, clusterPeerPromise);
clusterPeers.set(clusterObject.port, clusterPeerPromise);
}
return clusterPeerPromise;
};

const resolveObject = async (proxyId: string, sourcePeerPort: number) => {
const sourcePeer = (await clusterPeers.get(sourcePeerPort))?.clusterPeer;
const sourcePeer = await clusterPeers.get(sourcePeerPort);
if (sourcePeer?.remoteWeakProxies) {
return Object.values(sourcePeer.remoteWeakProxies).find(
v => v.deref()?.__cluster?.proxyId == proxyId
Expand All @@ -791,7 +786,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
return value;
}

const { port, proxyId, source } = clusterObject;
const { port, proxyId } = clusterObject;

// check if object is already connected
const resolved = await resolveObject(proxyId, port);
Expand All @@ -800,11 +795,10 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
}

try {
const clusterPeerPromise = ensureClusterPeer(port);
const { clusterPeer, clusterSecret } = await clusterPeerPromise;
const clusterPeerPromise = ensureClusterPeer(clusterObject);
const clusterPeer = await clusterPeerPromise;
const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject');
const portSecret = crypto.createHash('sha256').update(`${port}${clusterSecret}`).digest().toString('hex');
const newValue = await connectRPCObject(proxyId, portSecret, source);
const newValue = await connectRPCObject(clusterObject);
if (!newValue)
throw new Error('ipc object not found?');
return newValue;
Expand Down
32 changes: 21 additions & 11 deletions server/python/plugin_remote.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import base64
import gc
import os
import platform
Expand Down Expand Up @@ -40,6 +41,14 @@
import rpc_reader


class ClusterObject(TypedDict):
id: str
port: int
proxyId: str
sourcePort: str
sha256: str


class SystemDeviceState(TypedDict):
lastEventTime: int
stateTime: int
Expand Down Expand Up @@ -389,16 +398,22 @@ async def loadZipWrapped(self, packageJson, zipData, options: dict = None):
clusterId = options['clusterId']
clusterSecret = options['clusterSecret']

def computeClusterObjectHash(o: ClusterObject) -> str:
m = hashlib.sha256()
m.update(bytes(f"{o['id']}{o['port']}{o.get('sourcePort', '')}{o['proxyId']}{clusterSecret}", 'utf8'))
return base64.b64encode(m.digest()).decode('utf-8')

def onProxySerialization(value: Any, proxyId: str, source: int = None):
properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {}
clusterEntry = properties.get('__cluster', None)
if not properties.get('__cluster', None):
clusterEntry = {
clusterEntry: ClusterObject = {
'id': clusterId,
'proxyId': proxyId,
'port': clusterPort,
'source': source,
}
clusterEntry['sha256'] = computeClusterObjectHash(clusterEntry)
properties['__cluster'] = clusterEntry

# clusterEntry['proxyId'] = proxyId
Expand Down Expand Up @@ -426,13 +441,11 @@ async def handleClusterClient(reader: asyncio.StreamReader, writer: asyncio.Stre
future.set_result(peer)
clusterPeers[clusterPeerPort] = future

async def connectRPCObject(id: str, secret: str, sourcePeerPort: int = None):
m = hashlib.sha256()
m.update(bytes('%s%s' % (clusterPort, clusterSecret), 'utf8'))
portSecret = m.hexdigest()
if secret != portSecret:
async def connectRPCObject(o: ClusterObject):
sha256 = computeClusterObjectHash(o)
if sha256 != o['sha256']:
raise Exception('secret incorrect')
return await resolveObject(id, sourcePeerPort)
return await resolveObject(o['proxyId'], o.get('sourcePort'))

peer.params['connectRPCObject'] = connectRPCObject
try:
Expand Down Expand Up @@ -496,10 +509,7 @@ async def connectRPCObject(value):
if clusterPeer.tags.get('localPort') == source:
return value
c = await clusterPeer.getParam('connectRPCObject')
m = hashlib.sha256()
m.update(bytes('%s%s' % (port, clusterSecret), 'utf8'))
portSecret = m.hexdigest()
newValue = await c(proxyId, portSecret, source)
newValue = await c(clusterObject)
if not newValue:
raise Exception('ipc object not found?')
return newValue
Expand Down
21 changes: 13 additions & 8 deletions server/src/plugin/connect-rpc-object.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import crypto from "crypto";
import net from "net";
import { Socket } from "engine.io";
import { IOSocket } from "../io";
Expand All @@ -15,17 +16,21 @@ export type ConnectRPCObject = (o: ClusterObject) => Promise<any>;
/*
* Handle incoming connections that will be
* proxied to a connectRPCObject socket.
*
* It is the responsibility of the caller of
* this function to verify the signature of
* clusterObject using the clusterSecret.
*/
export function setupConnectRPCObjectProxy(clusterSecret: string, port: number, connection: Socket & IOSocket) {
if (!port) {
throw new Error("invalid port");
}

connection.send(clusterSecret);

const socket = net.connect(port, '127.0.0.1');
export function setupConnectRPCObjectProxy(clusterObject: ClusterObject, connection: Socket & IOSocket) {
const socket = net.connect(clusterObject.port, '127.0.0.1');
socket.on('close', () => connection.close());
socket.on('data', data => connection.send(data));
connection.on('close', () => socket.destroy());
connection.on('message', message => socket.write(message));
};


export function computeClusterObjectHash(o: ClusterObject, clusterSecret: string) {
const sha256 = crypto.createHash('sha256').update(`${o.id}${o.port}${o.sourcePort || ''}${o.proxyId}${clusterSecret}`).digest().toString('base64');
return sha256;
}
13 changes: 4 additions & 9 deletions server/src/plugin/plugin-remote-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { install as installSourceMapSupport } from 'source-map-support';
import { listenZero } from '../listen-zero';
import { RpcMessage, RpcPeer } from '../rpc';
import { createDuplexRpcPeer } from '../rpc-serializer';
import { ClusterObject, ConnectRPCObject } from './connect-rpc-object';
import { ClusterObject, ConnectRPCObject, computeClusterObjectHash } from './connect-rpc-object';
import { MediaManagerImpl } from './media';
import { PluginAPI, PluginAPIProxy, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api';
import { prepareConsoles } from './plugin-console';
Expand All @@ -27,11 +27,6 @@ export interface StartPluginRemoteOptions {
onClusterPeer(peer: RpcPeer): void;
}

function computeClusterObjectHash(clusterId: string, clusterPort: number, sourcePeerPort: number, proxyId: string, clusterSecret: string) {
const sha256 = crypto.createHash('sha256').update(`${clusterId}${clusterPort}${sourcePeerPort}${proxyId}${clusterSecret}`).digest().toString('base64');
return sha256;
}

export function startPluginRemote(mainFilename: string, pluginId: string, peerSend: (message: RpcMessage, reject?: (e: Error) => void, serializationContext?: any) => void, startPluginRemoteOptions?: StartPluginRemoteOptions) {
const peer = new RpcPeer('unknown', 'host', peerSend);

Expand Down Expand Up @@ -90,14 +85,14 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe

// set the cluster identity if it does not exist.
if (!clusterEntry) {
const sha256 = computeClusterObjectHash(clusterId, clusterPort, sourcePeerPort, proxyId, clusterSecret);
clusterEntry = {
id: clusterId,
port: clusterPort,
proxyId,
sourcePort: sourcePeerPort,
sha256,
sha256: null,
};
clusterEntry.sha256 = computeClusterObjectHash(clusterEntry, clusterSecret);
properties.__cluster = clusterEntry;
}
// always reassign the id and source.
Expand Down Expand Up @@ -126,7 +121,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
clusterPeers.set(clusterPeerPort, Promise.resolve(clusterPeer));
startPluginRemoteOptions?.onClusterPeer?.(clusterPeer);
const connectRPCObject: ConnectRPCObject = async (o) => {
const sha256 = computeClusterObjectHash(o.id, o.port, o.sourcePort, o.proxyId, clusterSecret);
const sha256 = computeClusterObjectHash(o, clusterSecret);
if (sha256 !== o.sha256)
throw new Error('secret incorrect');
return resolveObject(o.proxyId, o.sourcePort);
Expand Down
19 changes: 15 additions & 4 deletions server/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { getPluginVolume } from './plugin/plugin-volume';
import { NodeForkWorker } from './plugin/runtime/node-fork-worker';
import { PythonRuntimeWorker } from './plugin/runtime/python-worker';
import { RuntimeWorker, RuntimeWorkerOptions } from './plugin/runtime/runtime-worker';
import { setupConnectRPCObjectProxy } from './plugin/connect-rpc-object';
import { ClusterObject, computeClusterObjectHash, setupConnectRPCObjectProxy } from './plugin/connect-rpc-object';
import { getIpAddress, SCRYPTED_INSECURE_PORT, SCRYPTED_SECURE_PORT } from './server-settings';
import { AddressSettings } from './services/addresses';
import { Alerts } from './services/alerts';
Expand Down Expand Up @@ -163,7 +163,18 @@ export class ScryptedRuntime extends PluginHttp<HttpPluginData> {
res.end();
return;
}
if (!req.query.port) {
if (!req.query.clusterObject) {
res.writeHead(404);
res.end();
return;
}
try {
const clusterObject: ClusterObject = JSON.parse(req.query.clusterObject as string);
const sha256 = computeClusterObjectHash(clusterObject, this.clusterSecret);
if (sha256 != clusterObject.sha256) {
throw Error("invalid signature");
}
} catch {
res.writeHead(404);
res.end();
return;
Expand All @@ -173,8 +184,8 @@ export class ScryptedRuntime extends PluginHttp<HttpPluginData> {

this.connectRPCObjectIO.on('connection', connection => {
try {
const clusterObjectPortHeader = (connection.request as Request).query.port as string;
setupConnectRPCObjectProxy(this.clusterSecret, parseInt(clusterObjectPortHeader), connection);
const clusterObject: ClusterObject = JSON.parse((connection.request as Request).query.clusterObject as string);
setupConnectRPCObjectProxy(clusterObject, connection);
} catch {
connection.close();
}
Expand Down

0 comments on commit 7eca7f6

Please sign in to comment.