Skip to content

Commit

Permalink
Merge branch 'main' of github.com:koush/scrypted
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Mar 15, 2023
2 parents a4f37bd + e2e1c7b commit 9f2fabf
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 13 deletions.
50 changes: 38 additions & 12 deletions server/python/plugin_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil
import subprocess
import threading
import concurrent.futures
import time
import traceback
import zipfile
Expand Down Expand Up @@ -35,6 +36,21 @@ class SystemDeviceState(TypedDict):
value: any


class StreamPipeReader:
def __init__(self, conn: multiprocessing.connection.Connection) -> None:
self.conn = conn
self.executor = concurrent.futures.ThreadPoolExecutor()

def readBlocking(self, n):
b = bytes(0)
while len(b) < n:
self.conn.poll()
b += os.read(self.conn.fileno(), n - len(b))
return b

async def read(self, n):
return await asyncio.get_event_loop().run_in_executor(self.executor, lambda: self.readBlocking(n))

class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager):
def __init__(self, api: Any, systemState: Mapping[str, Mapping[str, SystemDeviceState]]) -> None:
super().__init__()
Expand Down Expand Up @@ -467,14 +483,17 @@ def exit_check():

async def getFork():
fd = os.dup(parent_conn.fileno())
forkPeer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, fd, fd)
reader = StreamPipeReader(parent_conn)
forkPeer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, reader = reader, writeFd = fd)
forkPeer.peerName = 'thread'
async def forkReadLoop():
try:
await readLoop()
except:
# traceback.print_exc()
print('fork read loop exited')
pass
finally:
reader.executor.shutdown()
asyncio.run_coroutine_threadsafe(forkReadLoop(), loop=self.loop)
getRemote = await forkPeer.getParam('getRemote')
remote: PluginRemote = await getRemote(self.api, self.pluginId, self.hostInfo)
Expand Down Expand Up @@ -563,13 +582,15 @@ async def createDeviceState(self, id, setState):
async def getServicePort(self, name):
pass

async def plugin_async_main(loop: AbstractEventLoop, readFd: int, writeFd: int):
peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, readFd, writeFd)
async def plugin_async_main(loop: AbstractEventLoop, readFd: int = None, writeFd: int = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamWriter = None):
peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, readFd=readFd, writeFd=writeFd, reader=reader, writer=writer)
peer.params['print'] = print
peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(peer, api, pluginId, hostInfo, loop)

async def get_update_stats():
update_stats = await peer.getParam('updateStats')
if not update_stats:
return

def stats_runner():
ptime = round(time.process_time() * 1000000)
Expand Down Expand Up @@ -601,21 +622,25 @@ def stats_runner():

asyncio.run_coroutine_threadsafe(get_update_stats(), loop)

await readLoop()

try:
await readLoop()
finally:
if reader and hasattr(reader, 'executor'):
r: StreamPipeReader = reader
r.executor.shutdown()

def main(readFd: int, writeFd: int):
def main(readFd: int = None, writeFd: int = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamWriter = None):
loop = asyncio.new_event_loop()

def gc_runner():
gc.collect()
loop.call_later(10, gc_runner)
gc_runner()

loop.run_until_complete(plugin_async_main(loop, readFd, writeFd))
loop.run_until_complete(plugin_async_main(loop, readFd=readFd, writeFd=writeFd, reader=reader, writer=writer))
loop.close()

def plugin_main(readFd: int, writeFd: int):
def plugin_main(readFd: int = None, writeFd: int = None, reader: asyncio.StreamReader = None, writer: asyncio.StreamWriter = None):
try:
import gi
gi.require_version('Gst', '1.0')
Expand All @@ -624,17 +649,18 @@ def plugin_main(readFd: int, writeFd: int):

loop = GLib.MainLoop()

worker = threading.Thread(target=main, args=(readFd, writeFd), name="asyncio-main")
worker = threading.Thread(target=main, args=(readFd, writeFd, reader, writer), name="asyncio-main")
worker.start()

loop.run()
except:
main(readFd, writeFd)
main(readFd=readFd, writeFd=writeFd, reader=reader, writer=writer)


def plugin_fork(conn: multiprocessing.connection.Connection):
fd = os.dup(conn.fileno())
plugin_main(fd, fd)
reader = StreamPipeReader(conn)
plugin_main(reader=reader, writeFd=fd)

if __name__ == "__main__":
plugin_main(3, 4)
2 changes: 1 addition & 1 deletion server/src/plugin/media.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ export abstract class MediaManagerBase implements MediaManager {
node[candidateId] = inputWeight + outputWeight;
}
catch (e) {
console.warn(converter.name, 'skipping converter due to error', e)
console.warn(candidate.name, 'skipping converter due to error', e)
}
}

Expand Down

0 comments on commit 9f2fabf

Please sign in to comment.