Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server, client: send full ClusterObject on new eio endpoint #1170

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import crypto from 'crypto';
import { MediaObjectOptions, RTCConnectionManagement, RTCSignalingSession, ScryptedStatic } from "@scrypted/types";
import axios, { AxiosRequestConfig, AxiosRequestHeaders } from 'axios';
import * as eio from 'engine.io-client';
Expand Down Expand Up @@ -711,9 +710,9 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
.map(id => systemManager.getDeviceById(id))
.find(device => device.pluginId === '@scrypted/core' && device.nativeId === `user:${username}`);

const clusterPeers = new Map<number, Promise<{ clusterPeer: RpcPeer, clusterSecret: string }>>();
const ensureClusterPeer = (port: number) => {
let clusterPeerPromise = clusterPeers.get(port);
const clusterPeers = new Map<number, Promise<RpcPeer>>();
const ensureClusterPeer = (clusterObject: ClusterObject) => {
let clusterPeerPromise = clusterPeers.get(clusterObject.port);
if (!clusterPeerPromise) {
clusterPeerPromise = (async () => {
const eioPath = 'engine.io/connectRPCObject';
Expand All @@ -722,7 +721,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
path: eioEndpoint,
query: {
cacehBust,
port,
clusterObject: JSON.stringify(clusterObject),
},
withCredentials: true,
extraHeaders,
Expand All @@ -733,19 +732,15 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
const clusterPeerSocket = new eio.Socket(explicitBaseUrl, clusterPeerOptions);
let peerReady = false;
clusterPeerSocket.on('close', () => {
clusterPeers.delete(port);
clusterPeers.delete(clusterObject.port);
if (!peerReady) {
throw new Error("peer disconnected before setup completed");
}
});

try {
const clusterSecretPromise = once(clusterPeerSocket, 'message');

await once(clusterPeerSocket, 'open');

const clusterSecret = await clusterSecretPromise as any as string;

const serializer = createRpcDuplexSerializer({
write: data => clusterPeerSocket.send(data),
});
Expand All @@ -762,21 +757,21 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
serializer.setupRpcPeer(clusterPeer);
clusterPeer.tags.localPort = sourcePeerId;
peerReady = true;
return { clusterPeer, clusterSecret };
return clusterPeer;
}
catch (e) {
console.error('failure ipc connect', e);
clusterPeerSocket.close();
throw e;
}
})();
clusterPeers.set(port, clusterPeerPromise);
clusterPeers.set(clusterObject.port, clusterPeerPromise);
}
return clusterPeerPromise;
};

const resolveObject = async (proxyId: string, sourcePeerPort: number) => {
const sourcePeer = (await clusterPeers.get(sourcePeerPort))?.clusterPeer;
const sourcePeer = await clusterPeers.get(sourcePeerPort);
if (sourcePeer?.remoteWeakProxies) {
return Object.values(sourcePeer.remoteWeakProxies).find(
v => v.deref()?.__cluster?.proxyId == proxyId
Expand All @@ -791,7 +786,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
return value;
}

const { port, proxyId, source } = clusterObject;
const { port, proxyId } = clusterObject;

// check if object is already connected
const resolved = await resolveObject(proxyId, port);
Expand All @@ -800,11 +795,10 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
}

try {
const clusterPeerPromise = ensureClusterPeer(port);
const { clusterPeer, clusterSecret } = await clusterPeerPromise;
const clusterPeerPromise = ensureClusterPeer(clusterObject);
const clusterPeer = await clusterPeerPromise;
const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject');
const portSecret = crypto.createHash('sha256').update(`${port}${clusterSecret}`).digest().toString('hex');
const newValue = await connectRPCObject(proxyId, portSecret, source);
const newValue = await connectRPCObject(clusterObject);
if (!newValue)
throw new Error('ipc object not found?');
return newValue;
Expand Down
32 changes: 21 additions & 11 deletions server/python/plugin_remote.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import base64
import gc
import os
import platform
Expand Down Expand Up @@ -40,6 +41,14 @@
import rpc_reader


class ClusterObject(TypedDict):
id: str
port: int
proxyId: str
sourcePort: str
sha256: str


class SystemDeviceState(TypedDict):
lastEventTime: int
stateTime: int
Expand Down Expand Up @@ -389,16 +398,22 @@ async def loadZipWrapped(self, packageJson, zipData, options: dict = None):
clusterId = options['clusterId']
clusterSecret = options['clusterSecret']

def computeClusterObjectHash(o: ClusterObject) -> str:
m = hashlib.sha256()
m.update(bytes(f"{o['id']}{o['port']}{o.get('sourcePort', '')}{o['proxyId']}{clusterSecret}", 'utf8'))
return base64.b64encode(m.digest()).decode('utf-8')

def onProxySerialization(value: Any, proxyId: str, source: int = None):
properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {}
clusterEntry = properties.get('__cluster', None)
if not properties.get('__cluster', None):
clusterEntry = {
clusterEntry: ClusterObject = {
'id': clusterId,
'proxyId': proxyId,
'port': clusterPort,
'source': source,
}
clusterEntry['sha256'] = computeClusterObjectHash(clusterEntry)
properties['__cluster'] = clusterEntry

# clusterEntry['proxyId'] = proxyId
Expand Down Expand Up @@ -426,13 +441,11 @@ async def handleClusterClient(reader: asyncio.StreamReader, writer: asyncio.Stre
future.set_result(peer)
clusterPeers[clusterPeerPort] = future

async def connectRPCObject(id: str, secret: str, sourcePeerPort: int = None):
m = hashlib.sha256()
m.update(bytes('%s%s' % (clusterPort, clusterSecret), 'utf8'))
portSecret = m.hexdigest()
if secret != portSecret:
async def connectRPCObject(o: ClusterObject):
sha256 = computeClusterObjectHash(o)
if sha256 != o['sha256']:
raise Exception('secret incorrect')
return await resolveObject(id, sourcePeerPort)
return await resolveObject(o['proxyId'], o.get('sourcePort'))

peer.params['connectRPCObject'] = connectRPCObject
try:
Expand Down Expand Up @@ -496,10 +509,7 @@ async def connectRPCObject(value):
if clusterPeer.tags.get('localPort') == source:
return value
c = await clusterPeer.getParam('connectRPCObject')
m = hashlib.sha256()
m.update(bytes('%s%s' % (port, clusterSecret), 'utf8'))
portSecret = m.hexdigest()
newValue = await c(proxyId, portSecret, source)
newValue = await c(clusterObject)
if not newValue:
raise Exception('ipc object not found?')
return newValue
Expand Down
26 changes: 16 additions & 10 deletions server/src/plugin/connect-rpc-object.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import crypto from "crypto";
import net from "net";
import { Socket } from "engine.io";
import { IOSocket } from "../io";
Expand All @@ -6,25 +7,30 @@ export interface ClusterObject {
id: string;
port: number;
proxyId: string;
source: number;
sourcePort: number;
sha256: string;
}

export type ConnectRPCObject = (id: string, secret: string, sourcePeerPort: number) => Promise<any>;
export type ConnectRPCObject = (o: ClusterObject) => Promise<any>;

/*
* Handle incoming connections that will be
* proxied to a connectRPCObject socket.
*
* It is the responsibility of the caller of
* this function to verify the signature of
* clusterObject using the clusterSecret.
*/
export function setupConnectRPCObjectProxy(clusterSecret: string, port: number, connection: Socket & IOSocket) {
if (!port) {
throw new Error("invalid port");
}

connection.send(clusterSecret);

const socket = net.connect(port, '127.0.0.1');
export function setupConnectRPCObjectProxy(clusterObject: ClusterObject, connection: Socket & IOSocket) {
const socket = net.connect(clusterObject.port, '127.0.0.1');
socket.on('close', () => connection.close());
socket.on('data', data => connection.send(data));
connection.on('close', () => socket.destroy());
connection.on('message', message => socket.write(message));
};


export function computeClusterObjectHash(o: ClusterObject, clusterSecret: string) {
const sha256 = crypto.createHash('sha256').update(`${o.id}${o.port}${o.sourcePort || ''}${o.proxyId}${clusterSecret}`).digest().toString('base64');
return sha256;
}
58 changes: 32 additions & 26 deletions server/src/plugin/plugin-remote-worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ScryptedStatic, SystemManager } from '@scrypted/types';
import AdmZip from 'adm-zip';
import crypto from 'crypto';
import { once } from 'events';
import fs from 'fs';
import { Volume } from 'memfs';
Expand All @@ -9,16 +10,15 @@ import { install as installSourceMapSupport } from 'source-map-support';
import { listenZero } from '../listen-zero';
import { RpcMessage, RpcPeer } from '../rpc';
import { createDuplexRpcPeer } from '../rpc-serializer';
import { ClusterObject, ConnectRPCObject, computeClusterObjectHash } from './connect-rpc-object';
import { MediaManagerImpl } from './media';
import { PluginAPI, PluginAPIProxy, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api';
import { prepareConsoles } from './plugin-console';
import { getPluginNodePath, installOptionalDependencies } from './plugin-npm-dependencies';
import { attachPluginRemote, DeviceManagerImpl, PluginReader, setupPluginRemote } from './plugin-remote';
import { DeviceManagerImpl, PluginReader, attachPluginRemote, setupPluginRemote } from './plugin-remote';
import { PluginStats, startStatsUpdater } from './plugin-remote-stats';
import { createREPLServer } from './plugin-repl';
import { NodeThreadWorker } from './runtime/node-thread-worker';
import { ClusterObject, ConnectRPCObject } from './connect-rpc-object';
import crypto from 'crypto';
const { link } = require('linkfs');

const serverVersion = require('../../package.json').version;
Expand Down Expand Up @@ -79,7 +79,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
async onLoadZip(scrypted: ScryptedStatic, params: any, packageJson: any, zipData: Buffer | string, zipOptions: PluginRemoteLoadZipOptions) {
const { clusterId, clusterSecret } = zipOptions;

const onProxySerialization = (value: any, proxyId: string, source?: number) => {
const onProxySerialization = (value: any, proxyId: string, sourcePeerPort?: number) => {
const properties = RpcPeer.prepareProxyProperties(value) || {};
let clusterEntry: ClusterObject = properties.__cluster;

Expand All @@ -89,8 +89,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
id: clusterId,
port: clusterPort,
proxyId,
source,
sourcePort: sourcePeerPort,
sha256: null,
};
clusterEntry.sha256 = computeClusterObjectHash(clusterEntry, clusterSecret);
properties.__cluster = clusterEntry;
}
// always reassign the id and source.
Expand Down Expand Up @@ -118,34 +120,36 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, clusterPeerPort);
clusterPeers.set(clusterPeerPort, Promise.resolve(clusterPeer));
startPluginRemoteOptions?.onClusterPeer?.(clusterPeer);
const portSecret = crypto.createHash('sha256').update(`${clusterPort}${clusterSecret}`).digest().toString('hex');
const connectRPCObject: ConnectRPCObject = async (id, secret, sourcePeerPort) => {
if (secret !== portSecret)
const connectRPCObject: ConnectRPCObject = async (o) => {
const sha256 = computeClusterObjectHash(o, clusterSecret);
if (sha256 !== o.sha256)
throw new Error('secret incorrect');
return resolveObject(id, sourcePeerPort);
return resolveObject(o.proxyId, o.sourcePort);
}
clusterPeer.params['connectRPCObject'] = connectRPCObject;
client.on('close', () => {
clusterPeers.delete(clusterPeerPort);
clusterPeer.kill('cluster socket closed');
});
})
const clusterPort = await listenZero(clusterRpcServer);
const clusterPort = await listenZero(clusterRpcServer, '127.0.0.1');

const ensureClusterPeer = (port: number) => {
let clusterPeerPromise = clusterPeers.get(port);
const ensureClusterPeer = (connectPort: number) => {
let clusterPeerPromise = clusterPeers.get(connectPort);
if (!clusterPeerPromise) {
clusterPeerPromise = (async () => {
const socket = net.connect(port, '127.0.0.1');
socket.on('close', () => clusterPeers.delete(port));
const socket = net.connect(connectPort, '127.0.0.1');
socket.on('close', () => clusterPeers.delete(connectPort));

try {
await once(socket, 'connect');
const clusterPeerPort = (socket.address() as net.AddressInfo).port;
// the sourcePort will be added to all rpc objects created by this peer session and used by resolveObject for later
// resolution when trying to find the peer.
const sourcePort = (socket.address() as net.AddressInfo).port;

const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-server', socket, socket);
clusterPeer.tags.localPort = clusterPeerPort;
clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, clusterPeerPort);
clusterPeer.tags.localPort = sourcePort;
clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, sourcePort);
return clusterPeer;
}
catch (e) {
Expand All @@ -154,7 +158,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
throw e;
}
})();
clusterPeers.set(port, clusterPeerPromise);
clusterPeers.set(connectPort, clusterPeerPromise);
}
return clusterPeerPromise;
};
Expand All @@ -163,25 +167,27 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
const clusterObject: ClusterObject = value?.__cluster;
if (clusterObject?.id !== clusterId)
return value;
const { port, proxyId, source } = clusterObject;
const { port, proxyId, sourcePort } = clusterObject;
// handle the case when trying to connect to an object is on this cluster node,
// returning the actual object, rather than initiating a loopback connection.
if (port === clusterPort)
return resolveObject(proxyId, source);
return resolveObject(proxyId, sourcePort);

try {
const clusterPeerPromise = ensureClusterPeer(port);
const clusterPeer = await clusterPeerPromise;
// this object is already connected
if (clusterPeer.tags.localPort === source)
// if the localPort is the sourcePort, that means the rpc object already exists as it originated from this node.
// so return the existing proxy.
if (clusterPeer.tags.localPort === sourcePort)
return value;
const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject');
const portSecret = crypto.createHash('sha256').update(`${port}${clusterSecret}`).digest().toString('hex');
const newValue = await connectRPCObject(proxyId, portSecret, source);
const newValue = await connectRPCObject(clusterObject);
if (!newValue)
throw new Error('ipc object not found?');
throw new Error('rpc object not found?');
return newValue;
}
catch (e) {
console.error('failure ipc', e);
console.error('failure rpc', e);
return value;
}
}
Expand Down
Loading