From f24c04eb9903946f27570e758ac285c94eff465e Mon Sep 17 00:00:00 2001 From: chrysn Date: Wed, 17 Aug 2022 11:35:12 +0200 Subject: [PATCH 1/6] TCP: Fix shutdown All "on shutdown" checks expect this to be None and not absent. Closes: https://github.com/chrysn/aiocoap/issues/279 --- aiocoap/transports/tcp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiocoap/transports/tcp.py b/aiocoap/transports/tcp.py index e26a133d..eb97560d 100644 --- a/aiocoap/transports/tcp.py +++ b/aiocoap/transports/tcp.py @@ -403,4 +403,4 @@ async def shutdown(self): for c in self._pool.values(): # FIXME: it would be nicer to release them c.abort("Server shutdown") - del self._tokenmanager + self._tokenmanager = None From 0f26d838c8c5bc9704372357922731b5faedf198 Mon Sep 17 00:00:00 2001 From: chrysn Date: Wed, 17 Aug 2022 11:49:57 +0200 Subject: [PATCH 2/6] protocol: Document and parametrize shutdown timeout --- aiocoap/numbers/constants.py | 3 +++ aiocoap/protocol.py | 11 +++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/aiocoap/numbers/constants.py b/aiocoap/numbers/constants.py index 5b27fe57..93376823 100644 --- a/aiocoap/numbers/constants.py +++ b/aiocoap/numbers/constants.py @@ -117,4 +117,7 @@ This number is not explicitly named in RFC7641. """ +SHUTDOWN_TIMEOUT = 3 +"""Maximum time, in seconds, for which the process is kept around during shutdown""" + __all__ = [k for k in dir() if not k.startswith('_')] diff --git a/aiocoap/protocol.py b/aiocoap/protocol.py index ac922fef..961cc0e1 100644 --- a/aiocoap/protocol.py +++ b/aiocoap/protocol.py @@ -33,7 +33,7 @@ from . import interfaces from . import error from .numbers import (INTERNAL_SERVER_ERROR, NOT_FOUND, - CONTINUE, OBSERVATION_RESET_TIME) + CONTINUE, OBSERVATION_RESET_TIME, SHUTDOWN_TIMEOUT) from .util.asyncio import py38args import warnings @@ -297,8 +297,11 @@ async def shutdown(self): After this coroutine terminates, and once all external references to the object are dropped, it should be garbage-collectable. - This method may take the time to inform communications partners of - stopped observations (but currently does not).""" + This method takes up to + :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing + transports to perform any cleanup implemented in them (such as orderly + connection shutdown and cancelling observations, where the latter is + currently not implemented).""" self.log.debug("Shutting down context") @@ -309,7 +312,7 @@ async def shutdown(self): ) for ri in self.request_interfaces], - timeout=3) + timeout=SHUTDOWN_TIMEOUT) for item in done: await item From e152f90a647a1b5eaeaf7427d993cff1a9cad5b0 Mon Sep 17 00:00:00 2001 From: chrysn Date: Wed, 17 Aug 2022 12:38:26 +0200 Subject: [PATCH 3/6] TCP, WS: Server closes connection on Abort and Release --- aiocoap/transports/rfc8323common.py | 17 +++++++++++++++-- aiocoap/transports/tcp.py | 5 ++++- aiocoap/transports/ws.py | 5 ++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/aiocoap/transports/rfc8323common.py b/aiocoap/transports/rfc8323common.py index 02ada2b0..8b528ee9 100644 --- a/aiocoap/transports/rfc8323common.py +++ b/aiocoap/transports/rfc8323common.py @@ -17,6 +17,14 @@ from aiocoap.numbers.codes import CSM, PING, PONG, RELEASE, ABORT from aiocoap import error +class CloseConnection(Exception): + """Raised in RFC8323 common processing to trigger a connection shutdown on + the TCP / WebSocket side. + + The TCP / WebSocket side does not need to do anything further in terms of + shutdown (including logging), it can just silently close the connectgion, + as the common code takes care of everything else.""" + class RFC8323Remote: """Mixin for Remotes for all the common RFC8323 processing @@ -144,9 +152,14 @@ def _process_signaling(self, msg): elif msg.code == PONG: pass elif msg.code == RELEASE: - raise NotImplementedError + # The behavior SHOULD be enhanced to answer outstanding + # requests, but it is unclear to which extent this side may + # still use the connection. + self.log.info("Received Release, closing on this end (options: %s)", msg.opt) + raise CloseConnection elif msg.code == ABORT: - raise NotImplementedError + self.log.warning("Received Abort (options: %s)", msg.opt) + raise CloseConnection else: self.abort("Unknown signalling code") diff --git a/aiocoap/transports/tcp.py b/aiocoap/transports/tcp.py index eb97560d..050cbef6 100644 --- a/aiocoap/transports/tcp.py +++ b/aiocoap/transports/tcp.py @@ -200,7 +200,10 @@ def data_received(self, data): self._spool = self._spool[msglen:] if msg.code.is_signalling(): - self._process_signaling(msg) + try: + self._process_signaling(msg) + except rfc8323common.CloseConnection: + self._transport.close() continue if self._remote_settings is None: diff --git a/aiocoap/transports/ws.py b/aiocoap/transports/ws.py index 2d0170a2..87ce69c5 100644 --- a/aiocoap/transports/ws.py +++ b/aiocoap/transports/ws.py @@ -366,7 +366,10 @@ async def _run_recv_loop(self, remote): msg.remote = remote if msg.code.is_signalling(): - remote._process_signaling(msg) + try: + remote._process_signaling(msg) + except rfc8323common.CloseConnection: + await remote._connection.close() continue if remote._remote_settings is None: From 6c2e944e408e3a3ea87fcd31513a5bba6b72e45f Mon Sep 17 00:00:00 2001 From: chrysn Date: Wed, 17 Aug 2022 13:36:43 +0200 Subject: [PATCH 4/6] WS: Fix matching of multiple requests to a single connection --- aiocoap/transports/ws.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/aiocoap/transports/ws.py b/aiocoap/transports/ws.py index 87ce69c5..91e95e5a 100644 --- a/aiocoap/transports/ws.py +++ b/aiocoap/transports/ws.py @@ -131,6 +131,11 @@ def __init__(self, pool, connection, loop, log, *, scheme, local_hostinfo=None, self.scheme = scheme + # Goes both for client and for server ends; on the server end, it + # ensures that role reversal URIs can be used even when passed as URIs + # and not as remotes (although that's of course only possible locally). + self._poolkey = PoolKey(self.scheme, self.hostinfo) + # Necessary for RFC8323Remote def _abort_with(self, msg, *, close_code=1002): @@ -236,6 +241,7 @@ async def _new_connection(self, websocket, path=None, *, scheme): local_hostinfo = util.hostportsplit(hostheader) remote = WSRemote(self, websocket, self.loop, self.log, scheme=scheme, local_hostinfo=local_hostinfo) + self._pool[remote._poolkey] = remote await self._run_recv_loop(remote) @@ -272,7 +278,8 @@ async def _connect_task(self, key: PoolKey): ) remote = WSRemote(self, websocket, self.loop, self.log, scheme=key.scheme, remote_hostinfo=hostinfo_split) - self._pool[remote] = remote + assert remote._poolkey == key, "Pool key construction is inconsistent" + self._pool[key] = remote self.loop.create_task( self._run_recv_loop(remote), From 01f951ba527f346d87e9c77c0cf684952556234c Mon Sep 17 00:00:00 2001 From: chrysn Date: Wed, 17 Aug 2022 15:55:37 +0200 Subject: [PATCH 5/6] TCP, WS: Go through release shutdown --- aiocoap/tokenmanager.py | 2 +- aiocoap/transports/rfc8323common.py | 35 +++++++++++++++--- aiocoap/transports/tcp.py | 45 ++++++++++++++++++----- aiocoap/transports/ws.py | 56 +++++++++++++++++++++++++---- 4 files changed, 116 insertions(+), 22 deletions(-) diff --git a/aiocoap/tokenmanager.py b/aiocoap/tokenmanager.py index 6d2e1217..1f3bec03 100644 --- a/aiocoap/tokenmanager.py +++ b/aiocoap/tokenmanager.py @@ -79,7 +79,7 @@ def dispatch_error(self, exception, remote): # Not entirely sure where it is so far; better just raise a warning # than an exception later, nothing terminally bad should come of # this error. - self.log.warning("Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutown") + self.log.warning("Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutdown") return # NetworkError is what we promise users to raise from request etc; if diff --git a/aiocoap/transports/rfc8323common.py b/aiocoap/transports/rfc8323common.py index 8b528ee9..5e3ad826 100644 --- a/aiocoap/transports/rfc8323common.py +++ b/aiocoap/transports/rfc8323common.py @@ -11,6 +11,7 @@ (asyncio stream vs. websockets module) far enough that they only share small portions of their code""" +import asyncio from typing import Optional from aiocoap import Message from aiocoap import optiontypes, util @@ -21,9 +22,9 @@ class CloseConnection(Exception): """Raised in RFC8323 common processing to trigger a connection shutdown on the TCP / WebSocket side. - The TCP / WebSocket side does not need to do anything further in terms of - shutdown (including logging), it can just silently close the connectgion, - as the common code takes care of everything else.""" + The TCP / WebSocket side should send the exception's argument on to the + token manager, close the connection, and does not need to perform further + logging.""" class RFC8323Remote: """Mixin for Remotes for all the common RFC8323 processing @@ -156,10 +157,10 @@ def _process_signaling(self, msg): # requests, but it is unclear to which extent this side may # still use the connection. self.log.info("Received Release, closing on this end (options: %s)", msg.opt) - raise CloseConnection + raise CloseConnection(error.RemoteServerShutdown("Peer released connection")) elif msg.code == ABORT: self.log.warning("Received Abort (options: %s)", msg.opt) - raise CloseConnection + raise CloseConnection(error.RemoteServerShutdown("Peer aborted connection")) else: self.abort("Unknown signalling code") @@ -172,3 +173,27 @@ def abort(self, errormessage=None, bad_csm_option=None): bad_csm_option_option = optiontypes.UintOption(2, bad_csm_option) abort_msg.opt.add_option(bad_csm_option_option) self._abort_with(abort_msg) + + async def release(self): + """Send Release message, (not implemented:) wait for connection to be + actually closed by the peer. + + Subclasses should extend this to await closing of the connection, + especially if they'd get into lock-up states otherwise (was would + WebSockets). + """ + self.log.info("Releasing connection %s", self) + release_msg = Message(code=RELEASE) + self._send_message(release_msg) + + try: + # FIXME: we could wait for the peer to close the connection, but a) + # that'll need some work on the interface between this module and + # ws/tcp, and b) we have no peers to test this with that would + # produce any sensible data (as aiocoap on release just closes). + pass + except asyncio.CancelledError: + self.log.warning( + "Connection %s was not closed by peer in time after release", + self + ) diff --git a/aiocoap/transports/tcp.py b/aiocoap/transports/tcp.py index 050cbef6..4c00b18a 100644 --- a/aiocoap/transports/tcp.py +++ b/aiocoap/transports/tcp.py @@ -13,6 +13,7 @@ from aiocoap import interfaces, error, util from aiocoap import COAP_PORT, Message from aiocoap import defaults +from ..util.asyncio import py38args def _extract_message_size(data: bytes): """Read out the full length of a CoAP messsage represented by data. @@ -202,7 +203,8 @@ def data_received(self, data): if msg.code.is_signalling(): try: self._process_signaling(msg) - except rfc8323common.CloseConnection: + except rfc8323common.CloseConnection as e: + self._ctx._dispatch_error(self, e.args[0]) self._transport.close() continue @@ -225,6 +227,9 @@ def resume_writing(self): # FIXME: do something ;-) pass + # RFC8323Remote.release recommends subclassing this, but there's no easy + # awaitable here yet, and no important business to finish, timeout-wise. + class _TCPPooling: # implementing TokenInterface @@ -301,7 +306,10 @@ def new_connection(): return self def _evict_from_pool(self, connection): - self._pool.remove(connection) + # May easily happen twice, once when an error comes in and once when + # the connection is (subsequently) closed. + if connection in self._pool: + self._pool.remove(connection) # implementing TokenInterface @@ -314,12 +322,24 @@ async def fill_or_recognize_remote(self, message): return False async def shutdown(self): + self.log.debug("Shutting down server %r", self) + self._tokenmanager = None self.server.close() - for c in self._pool: - # FIXME: it would be nicer to release them - c.abort("Server shutdown") + # This should be quick, no need to make a task of it -- and now all + # previously accepted connections should be in to receive their proper + # release await self.server.wait_closed() - self._tokenmanager = None + shutdowns = [ + asyncio.create_task( + c.release(), + **py38args(name="Close client %s" % c)) + for c + in self._pool + ] + if not shutdowns: + # wait is documented to require a non-empty set + return + await asyncio.wait(shutdowns) class TCPClient(_TCPPooling, interfaces.TokenInterface): def __init__(self): @@ -403,7 +423,14 @@ async def fill_or_recognize_remote(self, message): return False async def shutdown(self): - for c in self._pool.values(): - # FIXME: it would be nicer to release them - c.abort("Server shutdown") + self.log.debug("Shutting down any outgoing connections on on %r", self) self._tokenmanager = None + + shutdowns = [asyncio.create_task( + c.release(), + **py38args(name="Close client %s" % c)) + for c in self._pool.values()] + if not shutdowns: + # wait is documented to require a non-empty set + return + await asyncio.wait(shutdowns) diff --git a/aiocoap/transports/ws.py b/aiocoap/transports/ws.py index 91e95e5a..3ae13611 100644 --- a/aiocoap/transports/ws.py +++ b/aiocoap/transports/ws.py @@ -169,6 +169,16 @@ async def send(): **py38args(name="WebSocket sending of %r" % msg) ) + async def release(self): + await super().release() + try: + await self._connection.wait_closed() + except asyncio.CancelledError: + self.log.warning( + "Connection %s was not closed by peer in time after release", + self + ) + class WSPool(interfaces.TokenInterface): _outgoing_starting: Dict[PoolKey, asyncio.Task] _pool: Dict[PoolKey, WSRemote] @@ -183,6 +193,9 @@ def __init__(self, tman, log, loop): self._servers = [] + # See where it is used for documentation, remove when not needed any more + self._in_shutdown = False + self._tokenmanager = tman self.log = log @@ -332,8 +345,18 @@ def send_message(self, message, messageerror_monitor): message.remote._send_message(message) async def shutdown(self): + self._in_shutdown = True + self.log.debug("Shutting down any connections on %r", self) + + client_shutdowns = [ + asyncio.create_task( + c.release(), + **py38args(name="Close connection %s" % c) + ) + for c in self._pool.values()] + + server_shutdowns = [] while self._servers: - # could be parallelized, but what are the chances there'll actually be multiple s = self._servers.pop() # We could do something like # >>> for websocket in s.websockets: @@ -343,9 +366,18 @@ async def shutdown(self): # tests actually do run a GC collection once and that gets broken # up, it's not worth adding fragilty here s.close() - await s.wait_closed() - - # FIXME any handling needed for outgoing connections? + server_shutdowns.append(asyncio.create_task( + s.wait_closed(), + **py38args(name="Close server %s" % s))) + + # Placing client shutdowns before server shutdowns to give them a + # chance to send out Abort messages; the .close() method could be more + # helpful here by stopping new connections but letting us finish off + # the old ones + shutdowns = client_shutdowns + server_shutdowns + if shutdowns: + # wait is documented to require a non-empty set + await asyncio.wait(shutdowns) # Incoming message processing @@ -356,8 +388,16 @@ async def _run_recv_loop(self, remote): try: received = await remote._connection.recv() except websockets.exceptions.ConnectionClosed: - # FIXME if deposited somewhere, mark that as stale? - self._tokenmanager.dispatch_error(error.RemoteServerShutdown(), remote) + # This check is purely needed to silence the warning printed + # from tokenmanager, "Internal shutdown sequence msismatch: + # error dispatched through tokenmanager after shutdown" -- and + # is a symptom of https://github.com/chrysn/aiocoap/issues/284 + # and of the odd circumstance that we can't easily cancel the + # _run_recv_loop tasks (as we should while that issue is + # unresolved) in the shutdown handler. + if not self._in_shutdown: + # FIXME if deposited somewhere, mark that as stale? + self._tokenmanager.dispatch_error(error.RemoteServerShutdown("Peer closed connection"), remote) return if not isinstance(received, bytes): @@ -375,7 +415,9 @@ async def _run_recv_loop(self, remote): if msg.code.is_signalling(): try: remote._process_signaling(msg) - except rfc8323common.CloseConnection: + except rfc8323common.CloseConnection as e: + self._tokenmanager.dispatch_error(e.args[0], msg.remote) + self._pool.pop(remote._poolkey) await remote._connection.close() continue From 757ebad34905e2ac1ed4a89fa9bbfebcf2da9d25 Mon Sep 17 00:00:00 2001 From: chrysn Date: Wed, 17 Aug 2022 16:25:57 +0200 Subject: [PATCH 6/6] tests: Remove odd reference cycle that doesn't seem to match up with the current setup any more No hint was found at the introduction time (across moves) as to why that loop needs to be present to prove any better behavior. It is admittedly odd that that very test would start failing with the most recent tests; still treating it as a GC regression at an irrelevant test because the test's documentation failed to show its relevance. --- tests/test_proxy.py | 6 ------ tests/test_reverseproxy.py | 6 ------ 2 files changed, 12 deletions(-) diff --git a/tests/test_proxy.py b/tests/test_proxy.py index c65ba805..28428563 100644 --- a/tests/test_proxy.py +++ b/tests/test_proxy.py @@ -27,12 +27,6 @@ def tearDown(self): super(WithProxyServer, self).tearDown() self.loop.run_until_complete(self.forwardproxy.shutdown()) - # creating a reference loop between the cli instance and its contexts, - # so that the cli instance's gc-ing is linked o the contexts'. - # TODO how can we handle this more smoothly? - self.forwardproxy.outgoing_context._cli = self.reverseproxy - self.forwardproxy.proxy_context._cli = self.reverseproxy - self._del_to_be_sure('forwardproxy') self.loop.run_until_complete(asyncio.sleep(CLEANUPTIME)) diff --git a/tests/test_reverseproxy.py b/tests/test_reverseproxy.py index c93c3521..dfdc1bbb 100644 --- a/tests/test_reverseproxy.py +++ b/tests/test_reverseproxy.py @@ -29,12 +29,6 @@ def tearDown(self): super(WithReverseProxy, self).tearDown() self.loop.run_until_complete(self.reverseproxy.shutdown()) - # creating a reference loop between the cli instance and its contexts, - # so that the cli instance's gc-ing is linked o the contexts'. - # TODO how can we handle this more smoothly? - self.reverseproxy.outgoing_context._cli = self.reverseproxy - self.reverseproxy.proxy_context._cli = self.reverseproxy - self._del_to_be_sure('reverseproxy') self.loop.run_until_complete(asyncio.sleep(CLEANUPTIME))