From c980d3dd8891b8e5e97c8e3db65f9699733dae25 Mon Sep 17 00:00:00 2001 From: xtne6f Date: Tue, 30 May 2023 19:33:39 +0900 Subject: [PATCH] =?UTF-8?q?Update:=20[Server][EDCBTuner]=20ProactorEventLo?= =?UTF-8?q?op.create=5Fpipe=5Fconnection()=20=E3=81=AE=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E3=82=92=E3=82=84=E3=82=81=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/app/streams/LiveEncodingTask.py | 6 +-- server/app/utils/EDCB.py | 59 +++++++++++++++++--------- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/server/app/streams/LiveEncodingTask.py b/server/app/streams/LiveEncodingTask.py index 03a10336..9edd8c5c 100644 --- a/server/app/streams/LiveEncodingTask.py +++ b/server/app/streams/LiveEncodingTask.py @@ -12,7 +12,7 @@ from app.models import Channel from app.models import LiveStream from app.utils import Logging -from app.utils.EDCB import EDCBTuner +from app.utils.EDCB import EDCBTuner, PipeStreamReader class LiveEncodingTask: @@ -529,7 +529,7 @@ async def run(self) -> None: is_running: bool = True # 放送波の MPEG2-TS を受信する StreamReader - stream_reader: asyncio.StreamReader | aiohttp.StreamReader + stream_reader: asyncio.StreamReader | PipeStreamReader | aiohttp.StreamReader # EDCB のチューナーインスタンス (Mirakurun バックエンド利用時は常に None) tuner: EDCBTuner | None = None @@ -672,7 +672,7 @@ async def Reader(): # 受信した放送波が入るイテレータを作成 # R/W バッファ: 188B (TS Packet Size) * 256 = 48128B - async def GetIterator(stream_reader: asyncio.StreamReader | aiohttp.StreamReader, chunk_size: int = 48128) -> AsyncIterator[bytes]: + async def GetIterator(stream_reader: asyncio.StreamReader | PipeStreamReader | aiohttp.StreamReader, chunk_size: int = 48128) -> AsyncIterator[bytes]: while True: try: yield await stream_reader.readexactly(chunk_size) diff --git a/server/app/utils/EDCB.py b/server/app/utils/EDCB.py index 39c5241f..99b8f710 100644 --- a/server/app/utils/EDCB.py +++ b/server/app/utils/EDCB.py @@ -8,11 +8,35 @@ import sys import time import urllib.parse +from concurrent.futures import ThreadPoolExecutor from typing import Any, Callable, cast, ClassVar from app.constants import CONFIG +class PipeStreamReader: + """ + パイプのファイルオブジェクトを非同期で読み込むクラス + ProactorEventLoop のパイプサポートは未だ不十分で、ドキュメントされていない create_pipe_connection メソッドも + 内部で Win32API の CreateFile に渡すフラグが不適切で使い物にならないためつなぎとして用意したもの + """ + + def __init__(self, pipe: Any, executor: ThreadPoolExecutor, loop: Any): + self.__pipe = pipe + self.__executor = executor + self.__loop = loop + self.__buffer = bytearray() + + async def readexactly(self, n: int) -> bytes: + self.__buffer.clear() + while len(self.__buffer) < n: + data = await self.__loop.run_in_executor(self.__executor, lambda: self.__pipe.read(n - len(self.__buffer))) + if len(data) == 0: + raise asyncio.IncompleteReadError(bytes(self.__buffer), None) + self.__buffer += data + return bytes(self.__buffer) + + class EDCBTuner: """ EDCB バックエンドのチューナーを制御するクラス """ @@ -173,34 +197,37 @@ async def open(self) -> bool: return True - async def connect(self) -> asyncio.StreamReader | None: + async def connect(self) -> asyncio.StreamReader | PipeStreamReader | None: """ チューナーに接続し、放送波を受け取るための TCP ソケットまたは名前付きパイプを返す Returns: - asyncio.StreamReader | None: TCP ソケットまたは名前付きパイプの StreamReader (取得できなかった場合は None を返す) + asyncio.StreamReader | PipeStreamReader | None: TCP ソケットまたは名前付きパイプの StreamReader (取得できなかった場合は None を返す) """ # プロセス ID が取得できている(チューナーが起動している)ことが前提 if self._edcb_process_id is None: return None + stream_reader: asyncio.StreamReader | PipeStreamReader | None = None + # チューナーに接続する if EDCBUtil.getEDCBHost() != 'edcb-namedpipe': ## EpgDataCap_Bon で受信した放送波を受け取るための名前付きパイプの出力を、 ## EpgTimerSrv の CtrlCmd インターフェイス (TCP API) 経由で受信するための TCP ソケット (StreamReader / StreamWriter) result = await EDCBUtil.openViewStream(self._edcb_process_id) + stream_reader, stream_writer = (None, None) if result is None else result else: - ## EpgDataCap_Bon で受信した放送波を受け取るための名前付きパイプ (StreamReader / StreamWriter) - result = await EDCBUtil.openPipeStream(self._edcb_process_id) + ## EpgDataCap_Bon で受信した放送波を受け取るための名前付きパイプ (PipeStreamReader) + stream_reader = await EDCBUtil.openPipeStream(self._edcb_process_id) + stream_writer = None # チューナーへの接続に失敗した ## チューナーを閉じてからエラーを返す - if result is None: + if stream_reader is None: await self.close() # チューナーを閉じる return None - stream_reader, stream_writer = result self._edcb_stream_writer = stream_writer return stream_reader @@ -402,8 +429,8 @@ async def openViewStream(process_id: int, timeout_sec: float = 10.0) -> tuple[as return None @staticmethod - async def openPipeStream(process_id: int, timeout_sec: float = 10.0) -> tuple[asyncio.StreamReader, asyncio.StreamWriter] | None: - """ システムに存在する SrvPipe ストリームを開き、StreamReader / StreamWriter を返す """ + async def openPipeStream(process_id: int, timeout_sec: float = 10.0) -> PipeStreamReader | None: + """ システムに存在する SrvPipe ストリームを開く """ if sys.platform != 'win32': raise NotImplementedError('Windows Only') @@ -413,21 +440,11 @@ async def openPipeStream(process_id: int, timeout_sec: float = 10.0) -> tuple[as while time.monotonic() < to: # ポートは必ず 0 から 29 まで for port in range(30): - # asyncio.ProactorEventLoop.create_pipe_connection() を使う (Windows 専用のプライベート API) - # ref: https://github.com/qwertyquerty/pypresence/blob/4.2.1/pypresence/baseclient.py#L105-L123 - path = '\\\\.\\pipe\\SendTSTCP_' + str(port) + '_' + str(process_id) - reader = asyncio.StreamReader(loop=loop) - reader_protocol = asyncio.StreamReaderProtocol(reader, loop=loop) try: - transport, _ = await cast(asyncio.ProactorEventLoop, loop).create_pipe_connection(lambda: reader_protocol, path) - writer = asyncio.StreamWriter(transport, reader_protocol, reader, loop) - return (reader, writer) + path = '\\\\.\\pipe\\SendTSTCP_' + str(port) + '_' + str(process_id) + pipe = open(path, mode = 'rb') + return PipeStreamReader(pipe, ThreadPoolExecutor(), loop) except: - # TODO: エラーを解消できたら削除 - import traceback - from app.utils import Logging - Logging.error('openPipeStream: failed to connect to ' + path) - Logging.error(traceback.format_exc()) pass await asyncio.sleep(wait) # 初期に成功しなければ見込みは薄いので問い合わせを疎にしていく