Skip to content

Commit

Permalink
Shutdown fixes (mainly) for TCP and WebSockets
Browse files Browse the repository at this point in the history
Both TCP and WebSockets now do the RFC8323 shutdown procedure of
notifying the peer of their intention to shut down, and letting them do
the actual shutdown.
  • Loading branch information
chrysn committed Aug 17, 2022
2 parents e076e74 + 757ebad commit 8fe5e12
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 37 deletions.
3 changes: 3 additions & 0 deletions aiocoap/numbers/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,7 @@
This number is not explicitly named in RFC7641.
"""

SHUTDOWN_TIMEOUT = 3
"""Maximum time, in seconds, for which the process is kept around during shutdown"""

__all__ = [k for k in dir() if not k.startswith('_')]
11 changes: 7 additions & 4 deletions aiocoap/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from . import interfaces
from . import error
from .numbers import (INTERNAL_SERVER_ERROR, NOT_FOUND,
CONTINUE, OBSERVATION_RESET_TIME)
CONTINUE, OBSERVATION_RESET_TIME, SHUTDOWN_TIMEOUT)
from .util.asyncio import py38args

import warnings
Expand Down Expand Up @@ -297,8 +297,11 @@ async def shutdown(self):
After this coroutine terminates, and once all external references to
the object are dropped, it should be garbage-collectable.
This method may take the time to inform communications partners of
stopped observations (but currently does not)."""
This method takes up to
:const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing
transports to perform any cleanup implemented in them (such as orderly
connection shutdown and cancelling observations, where the latter is
currently not implemented)."""

self.log.debug("Shutting down context")

Expand All @@ -309,7 +312,7 @@ async def shutdown(self):
)
for ri
in self.request_interfaces],
timeout=3)
timeout=SHUTDOWN_TIMEOUT)
for item in done:
await item

Expand Down
2 changes: 1 addition & 1 deletion aiocoap/tokenmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def dispatch_error(self, exception, remote):
# Not entirely sure where it is so far; better just raise a warning
# than an exception later, nothing terminally bad should come of
# this error.
self.log.warning("Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutown")
self.log.warning("Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutdown")
return

# NetworkError is what we promise users to raise from request etc; if
Expand Down
42 changes: 40 additions & 2 deletions aiocoap/transports/rfc8323common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,21 @@
(asyncio stream vs. websockets module) far enough that they only share small
portions of their code"""

import asyncio
from typing import Optional
from aiocoap import Message
from aiocoap import optiontypes, util
from aiocoap.numbers.codes import CSM, PING, PONG, RELEASE, ABORT
from aiocoap import error

class CloseConnection(Exception):
"""Raised in RFC8323 common processing to trigger a connection shutdown on
the TCP / WebSocket side.
The TCP / WebSocket side should send the exception's argument on to the
token manager, close the connection, and does not need to perform further
logging."""

class RFC8323Remote:
"""Mixin for Remotes for all the common RFC8323 processing
Expand Down Expand Up @@ -144,9 +153,14 @@ def _process_signaling(self, msg):
elif msg.code == PONG:
pass
elif msg.code == RELEASE:
raise NotImplementedError
# The behavior SHOULD be enhanced to answer outstanding
# requests, but it is unclear to which extent this side may
# still use the connection.
self.log.info("Received Release, closing on this end (options: %s)", msg.opt)
raise CloseConnection(error.RemoteServerShutdown("Peer released connection"))
elif msg.code == ABORT:
raise NotImplementedError
self.log.warning("Received Abort (options: %s)", msg.opt)
raise CloseConnection(error.RemoteServerShutdown("Peer aborted connection"))
else:
self.abort("Unknown signalling code")

Expand All @@ -159,3 +173,27 @@ def abort(self, errormessage=None, bad_csm_option=None):
bad_csm_option_option = optiontypes.UintOption(2, bad_csm_option)
abort_msg.opt.add_option(bad_csm_option_option)
self._abort_with(abort_msg)

async def release(self):
"""Send Release message, (not implemented:) wait for connection to be
actually closed by the peer.
Subclasses should extend this to await closing of the connection,
especially if they'd get into lock-up states otherwise (was would
WebSockets).
"""
self.log.info("Releasing connection %s", self)
release_msg = Message(code=RELEASE)
self._send_message(release_msg)

try:
# FIXME: we could wait for the peer to close the connection, but a)
# that'll need some work on the interface between this module and
# ws/tcp, and b) we have no peers to test this with that would
# produce any sensible data (as aiocoap on release just closes).
pass
except asyncio.CancelledError:
self.log.warning(
"Connection %s was not closed by peer in time after release",
self
)
50 changes: 40 additions & 10 deletions aiocoap/transports/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from aiocoap import interfaces, error, util
from aiocoap import COAP_PORT, Message
from aiocoap import defaults
from ..util.asyncio import py38args

def _extract_message_size(data: bytes):
"""Read out the full length of a CoAP messsage represented by data.
Expand Down Expand Up @@ -200,7 +201,11 @@ def data_received(self, data):
self._spool = self._spool[msglen:]

if msg.code.is_signalling():
self._process_signaling(msg)
try:
self._process_signaling(msg)
except rfc8323common.CloseConnection as e:
self._ctx._dispatch_error(self, e.args[0])
self._transport.close()
continue

if self._remote_settings is None:
Expand All @@ -222,6 +227,9 @@ def resume_writing(self):
# FIXME: do something ;-)
pass

# RFC8323Remote.release recommends subclassing this, but there's no easy
# awaitable here yet, and no important business to finish, timeout-wise.

class _TCPPooling:
# implementing TokenInterface

Expand Down Expand Up @@ -298,7 +306,10 @@ def new_connection():
return self

def _evict_from_pool(self, connection):
self._pool.remove(connection)
# May easily happen twice, once when an error comes in and once when
# the connection is (subsequently) closed.
if connection in self._pool:
self._pool.remove(connection)

# implementing TokenInterface

Expand All @@ -311,12 +322,24 @@ async def fill_or_recognize_remote(self, message):
return False

async def shutdown(self):
self.log.debug("Shutting down server %r", self)
self._tokenmanager = None
self.server.close()
for c in self._pool:
# FIXME: it would be nicer to release them
c.abort("Server shutdown")
# This should be quick, no need to make a task of it -- and now all
# previously accepted connections should be in to receive their proper
# release
await self.server.wait_closed()
self._tokenmanager = None
shutdowns = [
asyncio.create_task(
c.release(),
**py38args(name="Close client %s" % c))
for c
in self._pool
]
if not shutdowns:
# wait is documented to require a non-empty set
return
await asyncio.wait(shutdowns)

class TCPClient(_TCPPooling, interfaces.TokenInterface):
def __init__(self):
Expand Down Expand Up @@ -400,7 +423,14 @@ async def fill_or_recognize_remote(self, message):
return False

async def shutdown(self):
for c in self._pool.values():
# FIXME: it would be nicer to release them
c.abort("Server shutdown")
del self._tokenmanager
self.log.debug("Shutting down any outgoing connections on on %r", self)
self._tokenmanager = None

shutdowns = [asyncio.create_task(
c.release(),
**py38args(name="Close client %s" % c))
for c in self._pool.values()]
if not shutdowns:
# wait is documented to require a non-empty set
return
await asyncio.wait(shutdowns)
68 changes: 60 additions & 8 deletions aiocoap/transports/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ def __init__(self, pool, connection, loop, log, *, scheme, local_hostinfo=None,

self.scheme = scheme

# Goes both for client and for server ends; on the server end, it
# ensures that role reversal URIs can be used even when passed as URIs
# and not as remotes (although that's of course only possible locally).
self._poolkey = PoolKey(self.scheme, self.hostinfo)

# Necessary for RFC8323Remote

def _abort_with(self, msg, *, close_code=1002):
Expand Down Expand Up @@ -164,6 +169,16 @@ async def send():
**py38args(name="WebSocket sending of %r" % msg)
)

async def release(self):
await super().release()
try:
await self._connection.wait_closed()
except asyncio.CancelledError:
self.log.warning(
"Connection %s was not closed by peer in time after release",
self
)

class WSPool(interfaces.TokenInterface):
_outgoing_starting: Dict[PoolKey, asyncio.Task]
_pool: Dict[PoolKey, WSRemote]
Expand All @@ -178,6 +193,9 @@ def __init__(self, tman, log, loop):

self._servers = []

# See where it is used for documentation, remove when not needed any more
self._in_shutdown = False

self._tokenmanager = tman
self.log = log

Expand Down Expand Up @@ -236,6 +254,7 @@ async def _new_connection(self, websocket, path=None, *, scheme):
local_hostinfo = util.hostportsplit(hostheader)

remote = WSRemote(self, websocket, self.loop, self.log, scheme=scheme, local_hostinfo=local_hostinfo)
self._pool[remote._poolkey] = remote

await self._run_recv_loop(remote)

Expand Down Expand Up @@ -272,7 +291,8 @@ async def _connect_task(self, key: PoolKey):
)

remote = WSRemote(self, websocket, self.loop, self.log, scheme=key.scheme, remote_hostinfo=hostinfo_split)
self._pool[remote] = remote
assert remote._poolkey == key, "Pool key construction is inconsistent"
self._pool[key] = remote

self.loop.create_task(
self._run_recv_loop(remote),
Expand Down Expand Up @@ -325,8 +345,18 @@ def send_message(self, message, messageerror_monitor):
message.remote._send_message(message)

async def shutdown(self):
self._in_shutdown = True
self.log.debug("Shutting down any connections on %r", self)

client_shutdowns = [
asyncio.create_task(
c.release(),
**py38args(name="Close connection %s" % c)
)
for c in self._pool.values()]

server_shutdowns = []
while self._servers:
# could be parallelized, but what are the chances there'll actually be multiple
s = self._servers.pop()
# We could do something like
# >>> for websocket in s.websockets:
Expand All @@ -336,9 +366,18 @@ async def shutdown(self):
# tests actually do run a GC collection once and that gets broken
# up, it's not worth adding fragilty here
s.close()
await s.wait_closed()

# FIXME any handling needed for outgoing connections?
server_shutdowns.append(asyncio.create_task(
s.wait_closed(),
**py38args(name="Close server %s" % s)))

# Placing client shutdowns before server shutdowns to give them a
# chance to send out Abort messages; the .close() method could be more
# helpful here by stopping new connections but letting us finish off
# the old ones
shutdowns = client_shutdowns + server_shutdowns
if shutdowns:
# wait is documented to require a non-empty set
await asyncio.wait(shutdowns)

# Incoming message processing

Expand All @@ -349,8 +388,16 @@ async def _run_recv_loop(self, remote):
try:
received = await remote._connection.recv()
except websockets.exceptions.ConnectionClosed:
# FIXME if deposited somewhere, mark that as stale?
self._tokenmanager.dispatch_error(error.RemoteServerShutdown(), remote)
# This check is purely needed to silence the warning printed
# from tokenmanager, "Internal shutdown sequence msismatch:
# error dispatched through tokenmanager after shutdown" -- and
# is a symptom of https://github.com/chrysn/aiocoap/issues/284
# and of the odd circumstance that we can't easily cancel the
# _run_recv_loop tasks (as we should while that issue is
# unresolved) in the shutdown handler.
if not self._in_shutdown:
# FIXME if deposited somewhere, mark that as stale?
self._tokenmanager.dispatch_error(error.RemoteServerShutdown("Peer closed connection"), remote)
return

if not isinstance(received, bytes):
Expand All @@ -366,7 +413,12 @@ async def _run_recv_loop(self, remote):
msg.remote = remote

if msg.code.is_signalling():
remote._process_signaling(msg)
try:
remote._process_signaling(msg)
except rfc8323common.CloseConnection as e:
self._tokenmanager.dispatch_error(e.args[0], msg.remote)
self._pool.pop(remote._poolkey)
await remote._connection.close()
continue

if remote._remote_settings is None:
Expand Down
6 changes: 0 additions & 6 deletions tests/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ def tearDown(self):
super(WithProxyServer, self).tearDown()
self.loop.run_until_complete(self.forwardproxy.shutdown())

# creating a reference loop between the cli instance and its contexts,
# so that the cli instance's gc-ing is linked o the contexts'.
# TODO how can we handle this more smoothly?
self.forwardproxy.outgoing_context._cli = self.reverseproxy
self.forwardproxy.proxy_context._cli = self.reverseproxy

self._del_to_be_sure('forwardproxy')

self.loop.run_until_complete(asyncio.sleep(CLEANUPTIME))
Expand Down
6 changes: 0 additions & 6 deletions tests/test_reverseproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ def tearDown(self):
super(WithReverseProxy, self).tearDown()
self.loop.run_until_complete(self.reverseproxy.shutdown())

# creating a reference loop between the cli instance and its contexts,
# so that the cli instance's gc-ing is linked o the contexts'.
# TODO how can we handle this more smoothly?
self.reverseproxy.outgoing_context._cli = self.reverseproxy
self.reverseproxy.proxy_context._cli = self.reverseproxy

self._del_to_be_sure('reverseproxy')

self.loop.run_until_complete(asyncio.sleep(CLEANUPTIME))
Expand Down

0 comments on commit 8fe5e12

Please sign in to comment.