Skip to content

Commit

Permalink
server: cluster cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Sep 4, 2024
1 parent ebd56b8 commit 1e1755f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
37 changes: 22 additions & 15 deletions server/python/plugin_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import scrypted_python.scrypted_sdk.types
from plugin_pip import install_with_pip, need_requirements, remove_pip_dirs
from scrypted_python.scrypted_sdk import PluginFork, ScryptedStatic
from scrypted_python.scrypted_sdk.types import (Device, DeviceManifest,
EventDetails,
ScryptedInterface,
ScryptedInterfaceMethods,
ScryptedInterfaceProperty,
Storage)
from scrypted_python.scrypted_sdk.types import (
Device,
DeviceManifest,
EventDetails,
ScryptedInterface,
ScryptedInterfaceMethods,
ScryptedInterfaceProperty,
Storage,
)

SCRYPTED_REQUIREMENTS = """
ptpython
Expand Down Expand Up @@ -509,19 +512,23 @@ def computeClusterObjectHash(o: ClusterObject) -> str:
)
return base64.b64encode(m.digest()).decode("utf-8")

def isClusterAddress(address: str):
return not address or address == SCRYPTED_CLUSTER_ADDRESS

def onProxySerialization(value: Any, sourceKey: str = None):
properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {}
clusterEntry = properties.get("__cluster", None)
proxyId: str = (
clusterEntry and clusterEntry.get("proxyId", None)
) or rpc.RpcPeer.generateId()

if (
clusterEntry
and clusterPort == clusterEntry["port"]
and sourceKey != clusterEntry.get("sourceKey", None)
):
clusterEntry = None
if clusterEntry:
if (
isClusterAddress(clusterEntry.get("address", None))
and clusterPort == clusterEntry["port"]
and sourceKey != clusterEntry.get("sourceKey", None)
):
clusterEntry = None

if not clusterEntry:
clusterEntry: ClusterObject = {
Expand Down Expand Up @@ -593,7 +600,7 @@ async def connectRPCObject(o: ClusterObject):
clusterPort = clusterRpcServer.sockets[0].getsockname()[1]

def ensureClusterPeer(address: str, port: int):
if not address or address == SCRYPTED_CLUSTER_ADDRESS:
if isClusterAddress(address):
address = "127.0.0.1"
clusterPeerKey = getClusterPeerKey(address, port)
clusterPeerPromise = clusterPeers.get(clusterPeerKey)
Expand All @@ -613,8 +620,8 @@ async def connectClusterPeer():
clusterPeer, peerReadLoop = await rpc_reader.prepare_peer_readloop(
self.loop, rpcTransport
)
clusterPeer.onProxySerialization = lambda value: onProxySerialization(
value, clusterPeerKey
clusterPeer.onProxySerialization = (
lambda value: onProxySerialization(value, clusterPeerKey)
)
except:
clusterPeers.pop(clusterPeerKey)
Expand Down
16 changes: 10 additions & 6 deletions server/src/plugin/plugin-remote-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe

const SCRYPTED_CLUSTER_ADDRESS = process.env.SCRYPTED_CLUSTER_ADDRESS;

function isClusterAddress(address: string) {
return !address || address === SCRYPTED_CLUSTER_ADDRESS;
}

const onProxySerialization = (value: any, sourceKey?: string) => {
const properties = RpcPeer.prepareProxyProperties(value) || {};
let clusterEntry: ClusterObject = properties.__cluster;
Expand All @@ -121,8 +125,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
// if the cluster entry already exists, check if it belongs to this node.
// if it belongs to this node, the entry must also be for this peer.
// relying on the liveness/gc of a different peer may cause race conditions.
if (clusterEntry && clusterPort === clusterEntry.port && sourceKey !== clusterEntry.sourceKey)
clusterEntry = undefined;
if (clusterEntry) {
if (isClusterAddress(clusterEntry?.address) && clusterPort === clusterEntry.port && sourceKey !== clusterEntry.sourceKey)
clusterEntry = undefined;
}

if (!clusterEntry) {
clusterEntry = {
Expand Down Expand Up @@ -196,7 +202,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
const clusterPort = await listenZero(clusterRpcServer, listenAddress);

const ensureClusterPeer = (address: string, connectPort: number) => {
if (!address || address === SCRYPTED_CLUSTER_ADDRESS)
if (isClusterAddress(address))
address = '127.0.0.1';

const clusterPeerKey = getClusterPeerKey(address, connectPort);
Expand Down Expand Up @@ -265,9 +271,6 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
clusterPeers.delete(threadPeerKey);
}
peerPromise = tidDeferred.promise.then(port => {
port.on('close', () => peerCleanup());
port.on('messageerror', () => peerCleanup());

const threadPeer = NodeThreadWorker.createRpcPeer(peer.selfName, threadPeerKey, port);
threadPeer.onProxySerialization = value => onProxySerialization(value, threadPeerKey);

Expand All @@ -280,6 +283,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
threadPeer.params['connectRPCObject'] = connectRPCObject;

function cleanup(message: string) {
peerCleanup();
tidChannels.delete(tid);
tidPeers.delete(tid);
threadPeer.kill(message);
Expand Down

0 comments on commit 1e1755f

Please sign in to comment.