Skip to content

Commit

Permalink
async: Do more inside coroutines
Browse files Browse the repository at this point in the history
Notably, this als clarifies how a AsyncCLIDaemon can be used outside of
``sync_main``.
  • Loading branch information
chrysn committed Nov 29, 2021
1 parent 9c00b31 commit 5db1387
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 41 deletions.
69 changes: 48 additions & 21 deletions aiocoap/util/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,55 +51,82 @@ class AsyncCLIDaemon:
* Outside of an async context, run run ``MyClass.sync_main()``, typically
in the program's ``if __name__ == "__main__":`` section.
In this mode, the loop that is started is configured to safely shut down
the loop when SIGINT is received.
* To run a subclass of this in an existing loop, start it with
``MyClass(...)`` (possibly passing in the loop to run it on if not already
in an async context), and then awaiting its ``.initializing`` future. To
stop it, await its ``.shutdown()`` method.
This pattern is going to be deprecated or removed entirely when ported to
async context managers.
Note that with this usage pattern, the :meth:`.stop()` method has no
effect; servers that ``.stop()`` themselves need to signal their desire
to be shut down through other channels (but that is an atypical case).
"""

def __init__(self, *args, **kwargs):
self.__loop = kwargs.pop('loop', None)
if self.__loop is None:
self.__loop = asyncio.get_running_loop()
self.__exitcode = self.__loop.create_future()
self.initializing = self.__loop.create_task(self.start(*args, **kwargs))
loop = kwargs.pop('loop', None)
if loop is None:
loop = asyncio.get_running_loop()
self.__exitcode = loop.create_future()
self.initializing = loop.create_task(self.start(*args, **kwargs))

def stop(self, exitcode):
"""Stop the operation (and exit sync_main) at the next convenience."""
self.__exitcode.set_result(exitcode)

@classmethod
def sync_main(cls, *args, **kwargs):
async def _async_main(cls, *args, **kwargs):
"""Run the application in an AsyncIO main loop, shutting down cleanly
on keyboard interrupt."""
main = cls(*args, loop=asyncio.new_event_loop(), **kwargs)
on keyboard interrupt.
This is not exposed publicly as it messes with the loop, and we only do
that with loops created in sync_main.
"""
main = cls(*args, **kwargs)

try:
asyncio.get_running_loop().add_signal_handler(
signal.SIGTERM,
lambda: main.__exitcode.set_result(143),
)
except NotImplementedError:
# Impossible on win32 -- just won't make that clean of a shutdown.
pass

try:
main.__loop.run_until_complete(main.initializing)
await main.initializing
# This is the time when we'd signal setup completion by the parent
# exiting in case of a daemon setup, or to any other process
# management.
logging.info("Application ready.")
# Common options are 143 or 0
# (<https://github.com/go-task/task/issues/75#issuecomment-339466142> and
# <https://unix.stackexchange.com/questions/10231/when-does-the-system-send-a-sigterm-to-a-process>)
try:
main.__loop.add_signal_handler(signal.SIGTERM, lambda: main.__exitcode.set_result(143))
except NotImplementedError:
# Impossible on win32 -- just won't make that clean of a shutdown.
pass
exitcode = main.__loop.run_until_complete(main.__exitcode)
exitcode = await main.__exitcode
except KeyboardInterrupt:
logging.info("Keyboard interupt received, shutting down")
sys.exit(3)
else:
sys.exit(exitcode)
finally:
if main.initializing.done() and main.initializing.exception():
pass # will raise from run_until_complete
# The exception if initializing is what we are just watching
# fly by. No need to trigger it again, and running shutdown
# would be even weirder.
pass
else:
main.__loop.run_until_complete(main.initializing)
main.__loop.run_until_complete(main.shutdown())
main.__loop.stop()
# May be done, then it's a no-op, or we might have received a
# signal during startup in which case we better fetch the
# result and shut down cleanly again
await main.initializing

# And no matter whether that happened during initialization
# (which now has finished) or due to a regular signal...
await main.shutdown()

@classmethod
def sync_main(cls, *args, **kwargs):
"""Run the application in an AsyncIO main loop, shutting down cleanly
on keyboard interrupt."""
asyncio.run(cls._async_main(*args, **kwargs))
8 changes: 4 additions & 4 deletions contrib/deterministic-demo/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
logging.basicConfig(level=logging.INFO)
logging.getLogger("coap-server").setLevel(logging.DEBUG)

def main():
async def main():
# Resource tree creation
root = resource.Site()

Expand All @@ -28,7 +28,7 @@ def main():

root = oscore_sitewrapper.OscoreSiteWrapper(root, server_credentials)

protocol = asyncio.get_event_loop().run_until_complete(aiocoap.Context.create_server_context(root))
protocol = await aiocoap.Context.create_server_context(root)

# Keys from IETF109 plug test: Rikard Test 2 Entity 1
server_credentials[":a"] = \
Expand All @@ -48,7 +48,7 @@ def main():
},
)

asyncio.get_event_loop().run_forever()
await asyncio.get_running_loop().create_future()

if __name__ == "__main__":
main()
asyncio.run(main())
21 changes: 9 additions & 12 deletions contrib/oscore-plugtest/plugtest-client
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ class PlugtestClientProgram:
ctx = None

async def run_with_shutdown(self):
# Having SIGTERM cause a more graceful shutdown (even if it's not
# asynchronously awaiting the shutdown, which would be impractical
# since we're likely inside some unintended timeout already) allow for
# output buffers to be flushed when the unit test program instrumenting
# it terminates it.
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, loop.close)

try:
await self.run()
finally:
Expand All @@ -115,17 +123,6 @@ class PlugtestClientProgram:
else:
self.ctx.client_credentials["coap://%s/*" % self.host] = ":" + which

@classmethod
def sync_run(cls):
loop = asyncio.new_event_loop()
task = loop.create_task(cls().run_with_shutdown())
# Having SIGTERM cause a more graceful shutdown allow for output
# buffers to be flushed when the unit test program instrumenting it
# terminates it.
signal.signal(signal.SIGTERM,
lambda signo, frame: task.throw(KeyboardInterrupt))
loop.run_until_complete(task)

async def run_test(self, testno):
self.testno = testno
testfun = self.__methods[testno]
Expand Down Expand Up @@ -408,4 +405,4 @@ class PlugtestClientProgram:
# additional_verify("Options as expected", unprotected_response.opt, Message(**expected).opt)

if __name__ == "__main__":
PlugtestClientProgram.sync_run()
asyncio.run(PlugtestClientProgram().run_with_shutdown())
9 changes: 5 additions & 4 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def render_get(self, request):
logging.basicConfig(level=logging.INFO)
logging.getLogger("coap-server").setLevel(logging.DEBUG)

def main():
async def main():
# Resource tree creation
root = resource.Site()

Expand All @@ -126,9 +126,10 @@ def main():
root.add_resource(['other', 'separate'], SeparateLargeResource())
root.add_resource(['whoami'], WhoAmI())

asyncio.Task(aiocoap.Context.create_server_context(root))
await aiocoap.Context.create_server_context(root)

asyncio.get_event_loop().run_forever()
# Run forever
await asyncio.get_running_loop().create_future()

if __name__ == "__main__":
main()
asyncio.run(main())

0 comments on commit 5db1387

Please sign in to comment.