Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup for #8495 #8541

Merged
merged 13 commits into from
Jul 29, 2024
1 change: 1 addition & 0 deletions CHANGES/8495.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TDB
1 change: 1 addition & 0 deletions aiohttp/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def pre_shutdown(self) -> None:
async def shutdown(self, timeout: Optional[float] = None) -> None:
coros = (conn.shutdown(timeout) for conn in self._connections)
await asyncio.gather(*coros)
print("LENGTH", len(self._connections))
self._connections.clear()

def __call__(self) -> RequestHandler:
Expand Down
58 changes: 31 additions & 27 deletions docs/web_advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,41 @@ Sometimes it is a desirable behavior: on processing ``GET`` request the
code might fetch data from a database or other web resource, the
fetching is potentially slow.

Canceling this fetch is a good idea: the peer dropped connection
Canceling this fetch is a good idea: the client dropped the connection
already, so there is no reason to waste time and resources (memory etc)
by getting data from a DB without any chance to send it back to peer.
by getting data from a DB without any chance to send it back to the client.

But sometimes the cancellation is bad: on ``POST`` request very often
it is needed to save data to a DB regardless of peer closing.
But sometimes the cancellation is bad: on ``POST`` requests very often
it is needed to save data to a DB regardless of connection closing.

Cancellation prevention could be implemented in several ways:

* Applying :func:`asyncio.shield` to a coroutine that saves data.
* Using aiojobs_ or another third party library.
* Applying :func:`aiojobs.aiohttp.shield` to a coroutine that saves data.
* Using aiojobs_ or another third party library to run a task in the background.

:func:`asyncio.shield` can work well. The only disadvantage is you
need to split web handler into exactly two async functions: one
for handler itself and other for protected code.
:func:`aiojobs.aiohttp.shield` can work well. The only disadvantage is you
need to split the web handler into two async functions: one for the handler
itself and another for protected code.

.. warning::

We don't recommend using :func:`asyncio.shield` for this because the shielded
task cannot be tracked by the application and therefore there is a risk that
the task will get cancelled during application shutdown. The function provided
by aiojobs_ operates in the same way except the inner task will be tracked
by the Scheduler and will get waited on during the cleanup phase.

For example the following snippet is not safe::

from aiojobs.aiohttp import shield

async def handler(request):
await asyncio.shield(write_to_redis(request))
await asyncio.shield(write_to_postgres(request))
await shield(request, write_to_redis(request))
await shield(request, write_to_postgres(request))
return web.Response(text="OK")

Cancellation might occur while saving data in REDIS, so
``write_to_postgres`` will not be called, potentially
Cancellation might occur while saving data in REDIS, so the
``write_to_postgres`` function will not be called, potentially
leaving your data in an inconsistent state.

Instead, you would need to write something like::
Expand All @@ -108,7 +118,7 @@ Instead, you would need to write something like::
await write_to_postgres(request)

async def handler(request):
await asyncio.shield(write_data(request))
await shield(request, write_data(request))
return web.Response(text="OK")

Alternatively, if you want to spawn a task without waiting for
Expand Down Expand Up @@ -940,30 +950,24 @@ always satisfactory.
When aiohttp is run with :func:`run_app`, it will attempt a graceful shutdown
by following these steps (if using a :ref:`runner <aiohttp-web-app-runners>`,
then calling :meth:`AppRunner.cleanup` will perform these steps, excluding
steps 4 and 7).
step 7).

1. Stop each site listening on sockets, so new connections will be rejected.
2. Close idle keep-alive connections (and set active ones to close upon completion).
3. Call the :attr:`Application.on_shutdown` signal. This should be used to shutdown
long-lived connections, such as websockets (see below).
4. Wait a short time for running tasks to complete. This allows any pending handlers
or background tasks to complete successfully. The timeout can be adjusted with
``shutdown_timeout`` in :func:`run_app`.
4. Wait a short time for running handlers to complete. This allows any pending handlers
to complete successfully. The timeout can be adjusted with ``shutdown_timeout``
in :func:`run_app`.
5. Close any remaining connections and cancel their handlers. It will wait on the
canceling handlers for a short time, again adjustable with ``shutdown_timeout``.
6. Call the :attr:`Application.on_cleanup` signal. This should be used to cleanup any
resources (such as DB connections). This includes completing the
:ref:`cleanup contexts<aiohttp-web-cleanup-ctx>`.
:ref:`cleanup contexts<aiohttp-web-cleanup-ctx>` which may be used to ensure
background tasks are completed successfully (see
:ref:`handler cancellation<web-handler-cancellation>` or aiojobs_ for example).
7. Cancel any remaining tasks and wait on them to complete.

.. note::

When creating new tasks in a handler which _should_ be cancelled on server shutdown,
then it is important to keep track of those tasks and explicitly cancel them in a
:attr:`Application.on_shutdown` callback. As we can see from the above steps,
without this the server will wait on those new tasks to complete before it continues
with server shutdown.

Websocket shutdown
^^^^^^^^^^^^^^^^^^

Expand Down
51 changes: 51 additions & 0 deletions tests/test_run_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,3 +1183,54 @@
assert time.time() - start < 5
assert client_finished
assert server_finished

def test_shutdown_handler_cancellation_suppressed(
self, aiohttp_unused_port: Callable[[], int]
) -> None:
port = aiohttp_unused_port()
actions = []

async def test() -> None:
async def test_resp(sess):
t = ClientTimeout(total=0.4)
with pytest.raises(asyncio.TimeoutError):
async with sess.get(f"http://localhost:{port}/", timeout=t) as resp:
assert await resp.text() == "FOO"
actions.append("CANCELLED")

async with ClientSession() as sess:
t = asyncio.create_task(test_resp(sess))
await asyncio.sleep(0.5)
# Handler is in-progress while we trigger server shutdown.
actions.append("PRESTOP")
async with sess.get(f"http://localhost:{port}/stop"):
pass

actions.append("STOPPING")
# Handler should still complete and produce a response.
await t
Dismissed Show dismissed Hide dismissed

async def run_test(app: web.Application) -> None:
nonlocal t
t = asyncio.create_task(test())
yield
await t
Dismissed Show dismissed Hide dismissed

async def handler(request: web.Request) -> web.Response:
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
actions.append("SUPPRESSED")
await asyncio.sleep(2)
actions.append("DONE")
return web.Response(text="FOO")

t = None
app = web.Application()
app.cleanup_ctx.append(run_test)
app.router.add_get("/", handler)
app.router.add_get("/stop", self.stop)

web.run_app(app, port=port, shutdown_timeout=2, handler_cancellation=True)
assert t.exception() is None
assert actions == ["CANCELLED", "SUPPRESSED", "PRESTOP", "STOPPING", "DONE"]
Loading