From e27c5c5e3aa48c5e354821818dd543fb461e17eb Mon Sep 17 00:00:00 2001 From: Hugh Manning Date: Mon, 3 Oct 2022 17:41:00 +1300 Subject: [PATCH] Use the ProactorEventLoop APIs on Windows Potential solution to #3, based on the fact that we can access the Windows ReadFile/WriteFile API through the ProactorEventLoop by calling its sock_recv and sock_sendall methods, passing a file-ish object as the first argument instead of a socket object. --- serial_asyncio/__init__.py | 83 ++++++++++++++++++++++++++++++-------- test/test_asyncio.py | 2 - 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/serial_asyncio/__init__.py b/serial_asyncio/__init__.py index e173b56..62b3301 100644 --- a/serial_asyncio/__init__.py +++ b/serial_asyncio/__init__.py @@ -10,10 +10,7 @@ """\ Support asyncio with serial ports. -Posix platforms only, Python 3.5+ only. - -Windows event loops can not wait for serial ports with the current -implementation. It should be possible to get that working though. +Python 3.5+ only. """ import asyncio import os @@ -30,6 +27,21 @@ __version__ = '0.6' +class SerialSocketAdaptor: + """Wraps a Windows serial port file handle for use by the socket API. + + The ProactorEventLoop sock_recv() and sock_sendall() methods, when called + with something that isn't actually a socket, will dispatch to the ReadFile + and WriteFile APIs we want to use. We just need to pass a file-like object, + which in this case means something implementing a fileno() method. + """ + def __init__(self, handle): + self.handle = handle + + def fileno(self): + return self.handle + + class SerialTransport(asyncio.Transport): """An asyncio transport model of a serial communication channel. @@ -61,6 +73,10 @@ def __init__(self, loop, protocol, serial_instance): self._poll_wait_time = 0.0005 self._max_out_waiting = 1024 + # TODO: Ask pyserial to implement fileno() on win32 Serial objects + if os.name == "nt": + self._serial_handle = SerialSocketAdaptor(self._serial._port_handle) + # XXX how to support url handlers too # Asynchronous I/O requires non-blocking devices @@ -285,33 +301,66 @@ def _write_ready(self): assert self._has_writer if os.name == "nt": - def _poll_read(self): - if self._has_reader and not self._closing: + async def _await_read(self): + while not self._closing: try: - self._has_reader = self._loop.call_later(self._poll_wait_time, self._poll_read) - if self.serial.in_waiting: - self._read_ready() + self.serial.timeout = None + first = await self._loop.sock_recv(self._serial_handle, 1) + if not first: + continue + self.serial.timeout = 0 + rest = await self._loop.sock_recv(self._serial_handle, + self._max_read_size - 1) + self._protocol.data_received(first + rest) except serial.SerialException as exc: - self._fatal_error(exc, 'Fatal write error on serial transport') + self._fatal_error(exc, 'Fatal read error on serial transport') + + def _cleanup_reader(self, reader): + self._has_reader = False + if not reader.cancelled(): + try: + reader.result() + except Exception: + raise def _ensure_reader(self): if not self._has_reader and not self._closing: - self._has_reader = self._loop.call_later(self._poll_wait_time, self._poll_read) + self._has_reader = self._loop.create_task(self._await_read()) + self._has_reader.add_done_callback(self._cleanup_reader) def _remove_reader(self): if self._has_reader: self._has_reader.cancel() self._has_reader = False - def _poll_write(self): - if self._has_writer and not self._closing: - self._has_writer = self._loop.call_later(self._poll_wait_time, self._poll_write) - if self.serial.out_waiting < self._max_out_waiting: - self._write_ready() + async def _await_write(self): + while self._write_buffer and not self._closing: + data = b"".join(self._write_buffer) + self._write_buffer.clear() + try: + await self._loop.sock_sendall(self._serial_handle, data) + except BlockingIOError: + self._write_buffer.append(data) + except asyncio.CancelledError: + self._write_buffer.append(data) + raise + else: + self._maybe_resume_protocol() + if self._closing and self._flushed(): + self._close() + + def _cleanup_writer(self, writer): + self._has_writer = False + if not writer.cancelled(): + try: + writer.result() + except Exception: + raise def _ensure_writer(self): if not self._has_writer and not self._closing: - self._has_writer = self._loop.call_soon(self._poll_write) + self._has_writer = self._loop.create_task(self._await_write()) + self._has_writer.add_done_callback(self._cleanup_writer) def _remove_writer(self): if self._has_writer: diff --git a/test/test_asyncio.py b/test/test_asyncio.py index 6fa4fd9..f093f97 100644 --- a/test/test_asyncio.py +++ b/test/test_asyncio.py @@ -15,7 +15,6 @@ """ -import os import unittest import asyncio @@ -28,7 +27,6 @@ PORT = 'socket://%s:%s' % (HOST, _PORT) -@unittest.skipIf(os.name != 'posix', "asyncio not supported on platform") class Test_asyncio(unittest.TestCase): """Test asyncio related functionality"""