diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 70133b983..10c1ddfdc 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -393,6 +393,10 @@ Wait queue abstraction .. autoclass:: ParkingLotStatistics :members: +.. autofunction:: add_parking_lot_breaker + +.. autofunction:: remove_parking_lot_breaker + Low-level checkpoint functions ------------------------------ diff --git a/newsfragments/3035.feature.rst b/newsfragments/3035.feature.rst new file mode 100644 index 000000000..c25841c47 --- /dev/null +++ b/newsfragments/3035.feature.rst @@ -0,0 +1 @@ +:class:`trio.Lock` and :class:`trio.StrictFIFOLock` will now raise :exc:`trio.BrokenResourceError` when :meth:`trio.Lock.acquire` would previously stall due to the owner of the lock having exited without releasing the lock. diff --git a/newsfragments/3081.feature.rst b/newsfragments/3081.feature.rst new file mode 100644 index 000000000..34a073b26 --- /dev/null +++ b/newsfragments/3081.feature.rst @@ -0,0 +1 @@ +Added :func:`trio.lowlevel.add_parking_lot_breaker` and :func:`trio.lowlevel.remove_parking_lot_breaker` to allow creating custom lock/semaphore implementations that will break their underlying parking lot if a task exits unexpectedly. :meth:`trio.lowlevel.ParkingLot.break_lot` is also added, to allow breaking a parking lot intentionally. diff --git a/src/trio/_core/__init__.py b/src/trio/_core/__init__.py index 71f5f17eb..fdef90292 100644 --- a/src/trio/_core/__init__.py +++ b/src/trio/_core/__init__.py @@ -20,7 +20,12 @@ from ._ki import currently_ki_protected, disable_ki_protection, enable_ki_protection from ._local import RunVar, RunVarToken from ._mock_clock import MockClock -from ._parking_lot import ParkingLot, ParkingLotStatistics +from ._parking_lot import ( + ParkingLot, + ParkingLotStatistics, + add_parking_lot_breaker, + remove_parking_lot_breaker, +) # Imports that always exist from ._run import ( diff --git a/src/trio/_core/_parking_lot.py b/src/trio/_core/_parking_lot.py index 916e6a6e9..e7be7913e 100644 --- a/src/trio/_core/_parking_lot.py +++ b/src/trio/_core/_parking_lot.py @@ -72,10 +72,12 @@ from __future__ import annotations import math +import warnings from collections import OrderedDict from typing import TYPE_CHECKING import attrs +import outcome from .. import _core from .._util import final @@ -86,6 +88,33 @@ from ._run import Task +GLOBAL_PARKING_LOT_BREAKER: dict[Task, list[ParkingLot]] = {} + + +def add_parking_lot_breaker(task: Task, lot: ParkingLot) -> None: + """Register a task as a breaker for a lot. If this task exits without being removed + as a breaker, the lot will break. This will cause an error to be raised for all + tasks currently parked in the lot, as well as any future tasks that attempt to + park in it. + """ + if task not in GLOBAL_PARKING_LOT_BREAKER: + GLOBAL_PARKING_LOT_BREAKER[task] = [lot] + else: + GLOBAL_PARKING_LOT_BREAKER[task].append(lot) + + +def remove_parking_lot_breaker(task: Task, lot: ParkingLot) -> None: + """Deregister a task as a breaker for a lot. See :func:`add_parking_lot_breaker`.""" + try: + GLOBAL_PARKING_LOT_BREAKER[task].remove(lot) + except (KeyError, ValueError): + raise RuntimeError( + "Attempted to remove task as breaker for a lot it is not registered for", + ) from None + if not GLOBAL_PARKING_LOT_BREAKER[task]: + del GLOBAL_PARKING_LOT_BREAKER[task] + + @attrs.frozen class ParkingLotStatistics: """An object containing debugging information for a ParkingLot. @@ -118,6 +147,7 @@ class ParkingLot: # {task: None}, we just want a deque where we can quickly delete random # items _parked: OrderedDict[Task, None] = attrs.field(factory=OrderedDict, init=False) + broken_by: Task | None = None def __len__(self) -> int: """Returns the number of parked tasks.""" @@ -136,7 +166,15 @@ async def park(self) -> None: """Park the current task until woken by a call to :meth:`unpark` or :meth:`unpark_all`. + Raises: + BrokenResourceError: if attempting to park in a broken lot, or the lot + breaks before we get to unpark. + """ + if self.broken_by is not None: + raise _core.BrokenResourceError( + f"Attempted to park in parking lot broken by {self.broken_by}", + ) task = _core.current_task() self._parked[task] = None task.custom_sleep_data = self @@ -234,6 +272,34 @@ def repark_all(self, new_lot: ParkingLot) -> None: """ return self.repark(new_lot, count=len(self)) + def break_lot(self, task: Task | None = None) -> None: + """Break this lot, causing all parked tasks to raise an error, and any + future tasks attempting to park to error. Unpark & repark become no-ops as the + parking lot is empty. + The error raised contains a reference to the task sent as a parameter. + """ + if task is None: + task = _core.current_task() + if self.broken_by is not None: + if self.broken_by != task: + warnings.warn( + RuntimeWarning( + f"{task} attempted to break parking lot {self} already broken by {self.broken_by}", + ), + stacklevel=2, + ) + return + self.broken_by = task + + for parked_task in self._parked: + _core.reschedule( + parked_task, + outcome.Error( + _core.BrokenResourceError(f"Parking lot broken by {task}"), + ), + ) + self._parked.clear() + def statistics(self) -> ParkingLotStatistics: """Return an object containing debugging information. diff --git a/src/trio/_core/_run.py b/src/trio/_core/_run.py index 8921ed7f1..3ea7c269b 100644 --- a/src/trio/_core/_run.py +++ b/src/trio/_core/_run.py @@ -40,6 +40,7 @@ from ._exceptions import Cancelled, RunFinishedError, TrioInternalError from ._instrumentation import Instruments from ._ki import LOCALS_KEY_KI_PROTECTION_ENABLED, KIManager, enable_ki_protection +from ._parking_lot import GLOBAL_PARKING_LOT_BREAKER from ._thread_cache import start_thread_soon from ._traps import ( Abort, @@ -1820,6 +1821,11 @@ async def python_wrapper(orig_coro: Awaitable[RetT]) -> RetT: return task def task_exited(self, task: Task, outcome: Outcome[Any]) -> None: + if task in GLOBAL_PARKING_LOT_BREAKER: + for lot in GLOBAL_PARKING_LOT_BREAKER[task]: + lot.break_lot(task) + del GLOBAL_PARKING_LOT_BREAKER[task] + if ( task._cancel_status is not None and task._cancel_status.abandoned_by_misnesting diff --git a/src/trio/_core/_tests/test_parking_lot.py b/src/trio/_core/_tests/test_parking_lot.py index ed6a17012..59c4a1a4c 100644 --- a/src/trio/_core/_tests/test_parking_lot.py +++ b/src/trio/_core/_tests/test_parking_lot.py @@ -4,6 +4,9 @@ import pytest +import trio.lowlevel +from trio.testing import Matcher, RaisesGroup + from ... import _core from ...testing import wait_all_tasks_blocked from .._parking_lot import ParkingLot @@ -215,3 +218,99 @@ async def test_parking_lot_repark_with_count() -> None: "wake 2", ] lot1.unpark_all() + + +async def test_parking_lot_breaker_basic() -> None: + lot = ParkingLot() + task = trio.lowlevel.current_task() + + with pytest.raises( + RuntimeError, + match="Attempted to remove task as breaker for a lot it is not registered for", + ): + trio.lowlevel.remove_parking_lot_breaker(task, lot) + + # check that a task can be registered as breaker for the same lot multiple times + trio.lowlevel.add_parking_lot_breaker(task, lot) + trio.lowlevel.add_parking_lot_breaker(task, lot) + trio.lowlevel.remove_parking_lot_breaker(task, lot) + trio.lowlevel.remove_parking_lot_breaker(task, lot) + + with pytest.raises( + RuntimeError, + match="Attempted to remove task as breaker for a lot it is not registered for", + ): + trio.lowlevel.remove_parking_lot_breaker(task, lot) + + # defaults to current task + lot.break_lot() + assert lot.broken_by == task + + # breaking the lot again with the same task is a no-op + lot.break_lot() + + # but with a different task it gives a warning + async def dummy_task( + task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED, + ) -> None: + task_status.started(_core.current_task()) + + # The nursery is only to create a task we can pass to lot.break_lot + # and has no effect on the test otherwise. + async with trio.open_nursery() as nursery: + child_task = await nursery.start(dummy_task) + with pytest.warns( + RuntimeWarning, + match="attempted to break parking .* already broken by .*", + ): + lot.break_lot(child_task) + nursery.cancel_scope.cancel() + + # and doesn't change broken_by + assert lot.broken_by == task + + +async def test_parking_lot_breaker() -> None: + async def bad_parker(lot: ParkingLot, scope: _core.CancelScope) -> None: + trio.lowlevel.add_parking_lot_breaker(trio.lowlevel.current_task(), lot) + with scope: + await trio.sleep_forever() + + lot = ParkingLot() + cs = _core.CancelScope() + + # check that parked task errors + with RaisesGroup( + Matcher(_core.BrokenResourceError, match="^Parking lot broken by"), + ): + async with _core.open_nursery() as nursery: + nursery.start_soon(bad_parker, lot, cs) + await wait_all_tasks_blocked() + + nursery.start_soon(lot.park) + await wait_all_tasks_blocked() + + cs.cancel() + + # check that trying to park in broken lot errors + with pytest.raises(_core.BrokenResourceError): + await lot.park() + + +async def test_parking_lot_weird() -> None: + """break a parking lot, where the breakee is parked. Doing this is weird, but should probably be supported?? + Although the message makes less sense""" + + async def return_me_and_park( + lot: ParkingLot, + *, + task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED, + ) -> None: + task_status.started(_core.current_task()) + await lot.park() + + lot = ParkingLot() + with RaisesGroup(Matcher(_core.BrokenResourceError, match="Parking lot broken by")): + async with _core.open_nursery() as nursery: + task = await nursery.start(return_me_and_park, lot) + lot.break_lot(task) diff --git a/src/trio/_sync.py b/src/trio/_sync.py index 698716ea3..1b72d7ec7 100644 --- a/src/trio/_sync.py +++ b/src/trio/_sync.py @@ -9,6 +9,7 @@ from . import _core from ._core import Abort, ParkingLot, RaiseCancelT, enable_ki_protection +from ._core._parking_lot import add_parking_lot_breaker, remove_parking_lot_breaker from ._util import final if TYPE_CHECKING: @@ -576,20 +577,30 @@ def acquire_nowait(self) -> None: elif self._owner is None and not self._lot: # No-one owns it self._owner = task + add_parking_lot_breaker(task, self._lot) else: raise trio.WouldBlock @enable_ki_protection async def acquire(self) -> None: - """Acquire the lock, blocking if necessary.""" + """Acquire the lock, blocking if necessary. + + Raises: + BrokenResourceError: if the owner of the lock exits without releasing. + """ await trio.lowlevel.checkpoint_if_cancelled() try: self.acquire_nowait() except trio.WouldBlock: - # NOTE: it's important that the contended acquire path is just - # "_lot.park()", because that's how Condition.wait() acquires the - # lock as well. - await self._lot.park() + try: + # NOTE: it's important that the contended acquire path is just + # "_lot.park()", because that's how Condition.wait() acquires the + # lock as well. + await self._lot.park() + except trio.BrokenResourceError: + raise trio.BrokenResourceError( + "Owner of this lock exited without releasing: {self._owner}", + ) from None else: await trio.lowlevel.cancel_shielded_checkpoint() @@ -604,8 +615,10 @@ def release(self) -> None: task = trio.lowlevel.current_task() if task is not self._owner: raise RuntimeError("can't release a Lock you don't own") + remove_parking_lot_breaker(self._owner, self._lot) if self._lot: (self._owner,) = self._lot.unpark(count=1) + add_parking_lot_breaker(self._owner, self._lot) else: self._owner = None @@ -767,7 +780,11 @@ def acquire_nowait(self) -> None: return self._lock.acquire_nowait() async def acquire(self) -> None: - """Acquire the underlying lock, blocking if necessary.""" + """Acquire the underlying lock, blocking if necessary. + + Raises: + BrokenResourceError: if the owner of the underlying lock exits without releasing. + """ await self._lock.acquire() def release(self) -> None: @@ -796,6 +813,7 @@ async def wait(self) -> None: Raises: RuntimeError: if the calling task does not hold the lock. + BrokenResourceError: if the owner of the lock exits without releasing, when attempting to re-acquire. """ if trio.lowlevel.current_task() is not self._lock._owner: diff --git a/src/trio/_tests/test_sync.py b/src/trio/_tests/test_sync.py index caf3f04f5..401ab0f55 100644 --- a/src/trio/_tests/test_sync.py +++ b/src/trio/_tests/test_sync.py @@ -5,7 +5,10 @@ import pytest +from trio.testing import Matcher, RaisesGroup + from .. import _core +from .._core._parking_lot import GLOBAL_PARKING_LOT_BREAKER from .._sync import * from .._timeouts import sleep_forever from ..testing import assert_checkpoints, wait_all_tasks_blocked @@ -586,3 +589,59 @@ async def lock_taker() -> None: await wait_all_tasks_blocked() assert record == ["started"] lock_like.release() + + +async def test_lock_acquire_unowned_lock() -> None: + """Test that trying to acquire a lock whose owner has exited raises an error. + Partial fix for https://github.com/python-trio/trio/issues/3035 + """ + assert not GLOBAL_PARKING_LOT_BREAKER + lock = trio.Lock() + async with trio.open_nursery() as nursery: + nursery.start_soon(lock.acquire) + with pytest.raises( + trio.BrokenResourceError, + match="^Owner of this lock exited without releasing", + ): + await lock.acquire() + assert not GLOBAL_PARKING_LOT_BREAKER + + +async def test_lock_multiple_acquire() -> None: + assert not GLOBAL_PARKING_LOT_BREAKER + lock = trio.Lock() + with RaisesGroup( + Matcher( + trio.BrokenResourceError, + match="^Owner of this lock exited without releasing", + ), + ): + async with trio.open_nursery() as nursery: + nursery.start_soon(lock.acquire) + nursery.start_soon(lock.acquire) + assert not GLOBAL_PARKING_LOT_BREAKER + + +async def test_lock_handover() -> None: + assert not GLOBAL_PARKING_LOT_BREAKER + lock = trio.Lock() + lock.acquire_nowait() + child_task: Task | None = None + assert GLOBAL_PARKING_LOT_BREAKER == { + _core.current_task(): [ + lock._lot, + ], + } + + async with trio.open_nursery() as nursery: + nursery.start_soon(lock.acquire) + await wait_all_tasks_blocked() + + lock.release() + + assert len(GLOBAL_PARKING_LOT_BREAKER) == 1 + child_task = next(iter(GLOBAL_PARKING_LOT_BREAKER)) + assert GLOBAL_PARKING_LOT_BREAKER[child_task] == [lock._lot] + + assert lock._lot.broken_by == child_task + assert not GLOBAL_PARKING_LOT_BREAKER diff --git a/src/trio/lowlevel.py b/src/trio/lowlevel.py index 1df701963..9e385a004 100644 --- a/src/trio/lowlevel.py +++ b/src/trio/lowlevel.py @@ -25,6 +25,7 @@ UnboundedQueue as UnboundedQueue, UnboundedQueueStatistics as UnboundedQueueStatistics, add_instrument as add_instrument, + add_parking_lot_breaker as add_parking_lot_breaker, cancel_shielded_checkpoint as cancel_shielded_checkpoint, checkpoint as checkpoint, checkpoint_if_cancelled as checkpoint_if_cancelled, @@ -40,6 +41,7 @@ permanently_detach_coroutine_object as permanently_detach_coroutine_object, reattach_detached_coroutine_object as reattach_detached_coroutine_object, remove_instrument as remove_instrument, + remove_parking_lot_breaker as remove_parking_lot_breaker, reschedule as reschedule, spawn_system_task as spawn_system_task, start_guest_run as start_guest_run,