Skip to content

Commit

Permalink
Use the ProactorEventLoop APIs on Windows
Browse files Browse the repository at this point in the history
Potential solution to pyserial#3, based on the fact that we can access the
Windows ReadFile/WriteFile API through the ProactorEventLoop by calling
its sock_recv and sock_sendall methods, passing a file-ish object as the
first argument instead of a socket object.
  • Loading branch information
hugh-manning committed Nov 18, 2022
1 parent 13694c5 commit e27c5c5
Showing 2 changed files with 66 additions and 19 deletions.
83 changes: 66 additions & 17 deletions serial_asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -10,10 +10,7 @@
"""\
Support asyncio with serial ports.
Posix platforms only, Python 3.5+ only.
Windows event loops can not wait for serial ports with the current
implementation. It should be possible to get that working though.
Python 3.5+ only.
"""
import asyncio
import os
@@ -30,6 +27,21 @@
__version__ = '0.6'


class SerialSocketAdaptor:
"""Wraps a Windows serial port file handle for use by the socket API.
The ProactorEventLoop sock_recv() and sock_sendall() methods, when called
with something that isn't actually a socket, will dispatch to the ReadFile
and WriteFile APIs we want to use. We just need to pass a file-like object,
which in this case means something implementing a fileno() method.
"""
def __init__(self, handle):
self.handle = handle

def fileno(self):
return self.handle


class SerialTransport(asyncio.Transport):
"""An asyncio transport model of a serial communication channel.
@@ -61,6 +73,10 @@ def __init__(self, loop, protocol, serial_instance):
self._poll_wait_time = 0.0005
self._max_out_waiting = 1024

# TODO: Ask pyserial to implement fileno() on win32 Serial objects
if os.name == "nt":
self._serial_handle = SerialSocketAdaptor(self._serial._port_handle)

# XXX how to support url handlers too

# Asynchronous I/O requires non-blocking devices
@@ -285,33 +301,66 @@ def _write_ready(self):
assert self._has_writer

if os.name == "nt":
def _poll_read(self):
if self._has_reader and not self._closing:
async def _await_read(self):
while not self._closing:
try:
self._has_reader = self._loop.call_later(self._poll_wait_time, self._poll_read)
if self.serial.in_waiting:
self._read_ready()
self.serial.timeout = None
first = await self._loop.sock_recv(self._serial_handle, 1)
if not first:
continue
self.serial.timeout = 0
rest = await self._loop.sock_recv(self._serial_handle,
self._max_read_size - 1)
self._protocol.data_received(first + rest)
except serial.SerialException as exc:
self._fatal_error(exc, 'Fatal write error on serial transport')
self._fatal_error(exc, 'Fatal read error on serial transport')

def _cleanup_reader(self, reader):
self._has_reader = False
if not reader.cancelled():
try:
reader.result()
except Exception:
raise

def _ensure_reader(self):
if not self._has_reader and not self._closing:
self._has_reader = self._loop.call_later(self._poll_wait_time, self._poll_read)
self._has_reader = self._loop.create_task(self._await_read())
self._has_reader.add_done_callback(self._cleanup_reader)

def _remove_reader(self):
if self._has_reader:
self._has_reader.cancel()
self._has_reader = False

def _poll_write(self):
if self._has_writer and not self._closing:
self._has_writer = self._loop.call_later(self._poll_wait_time, self._poll_write)
if self.serial.out_waiting < self._max_out_waiting:
self._write_ready()
async def _await_write(self):
while self._write_buffer and not self._closing:
data = b"".join(self._write_buffer)
self._write_buffer.clear()
try:
await self._loop.sock_sendall(self._serial_handle, data)
except BlockingIOError:
self._write_buffer.append(data)
except asyncio.CancelledError:
self._write_buffer.append(data)
raise
else:
self._maybe_resume_protocol()
if self._closing and self._flushed():
self._close()

def _cleanup_writer(self, writer):
self._has_writer = False
if not writer.cancelled():
try:
writer.result()
except Exception:
raise

def _ensure_writer(self):
if not self._has_writer and not self._closing:
self._has_writer = self._loop.call_soon(self._poll_write)
self._has_writer = self._loop.create_task(self._await_write())
self._has_writer.add_done_callback(self._cleanup_writer)

def _remove_writer(self):
if self._has_writer:
2 changes: 0 additions & 2 deletions test/test_asyncio.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@
"""

import os
import unittest
import asyncio

@@ -28,7 +27,6 @@
PORT = 'socket://%s:%s' % (HOST, _PORT)


@unittest.skipIf(os.name != 'posix', "asyncio not supported on platform")
class Test_asyncio(unittest.TestCase):
"""Test asyncio related functionality"""

0 comments on commit e27c5c5

Please sign in to comment.