Skip to content

Commit

Permalink
WIP: debugging issue #695
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Aug 7, 2024
1 parent 278d4b6 commit 437c507
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
47 changes: 35 additions & 12 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ def __enter__(self) -> CancelScope:
if self._cancel_called:
self._deliver_cancellation(self)

print(f"entered cancel scope {id(self):x}")
return self

def __exit__(
Expand All @@ -401,6 +402,7 @@ def __exit__(
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None:
print(f"exiting cancel scope {id(self):x} with {exc_type}")
if not self._active:
raise RuntimeError("This cancel scope is not active")
if current_task() is not self._host_task:
Expand Down Expand Up @@ -467,7 +469,7 @@ def _timeout(self) -> None:
else:
self._timeout_handle = loop.call_at(self._deadline, self._timeout)

def _deliver_cancellation(self, origin: CancelScope) -> bool:
def _deliver_cancellation(self, origin: CancelScope, indent: str = "") -> bool:
"""
Deliver cancellation to directly contained tasks and nested cancel scopes.
Expand All @@ -478,10 +480,14 @@ def _deliver_cancellation(self, origin: CancelScope) -> bool:
:return: ``True`` if the delivery needs to be retried on the next cycle
"""
print(f"{indent}scope {id(self):x}:")
indent += " "
should_retry = False
current = current_task()
for task in self._tasks:
print(f"{indent}task {task.get_name()}: ", end="")
if task._must_cancel: # type: ignore[attr-defined]
print("must_cancel flag already set")
continue

# The task is eligible for cancellation if it has started
Expand All @@ -490,24 +496,37 @@ def _deliver_cancellation(self, origin: CancelScope) -> bool:
waiter = task._fut_waiter # type: ignore[attr-defined]
if not isinstance(waiter, asyncio.Future) or not waiter.done():
origin._cancel_calls += 1
print("cancelling")
if sys.version_info >= (3, 9):
task.cancel(f"Cancelled by cancel scope {id(origin):x}")
else:
task.cancel()
else:
print("waiter is not a future or waiter is done")
else:
if task is current:
print("is the current task")
else:
assert task is not self._host_task and not _task_started(task)
print("is not the host task, and has not started")

# Deliver cancellation to child scopes that aren't shielded or running their own
# cancellation callbacks
for scope in self._child_scopes:
if not scope._shield and not scope.cancel_called:
should_retry = scope._deliver_cancellation(origin) or should_retry
should_retry = (
scope._deliver_cancellation(origin, indent) or should_retry
)

# Schedule another callback if there are still tasks left
if origin is self:
if should_retry:
print("scheduling a retry")
self._cancel_handle = get_running_loop().call_soon(
self._deliver_cancellation, origin
)
else:
print("stopping cancellation")
self._cancel_handle = None

return should_retry
Expand Down Expand Up @@ -658,24 +677,28 @@ async def __aexit__(
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None:
ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
if exc_val is not None:
self.cancel_scope.cancel()
if not isinstance(exc_val, CancelledError):
self._exceptions.append(exc_val)

cancelled_exc_while_waiting_tasks: CancelledError | None = None
while self._tasks:
try:
await asyncio.wait(self._tasks)
except CancelledError as exc:
# This task was cancelled natively; reraise the CancelledError later
# unless this task was already interrupted by another exception
self.cancel_scope.cancel()
if cancelled_exc_while_waiting_tasks is None:
cancelled_exc_while_waiting_tasks = exc
with CancelScope(shield=True) as scope:
print(
f"task group {id(self):x} waiting for tasks to finish in cancel scope {id(scope):x}"
)
while self._tasks:
try:
await asyncio.wait(self._tasks)
except CancelledError as exc:
# This task was cancelled natively; reraise the CancelledError later
# unless this task was already interrupted by another exception
self.cancel_scope.cancel()
if cancelled_exc_while_waiting_tasks is None:
cancelled_exc_while_waiting_tasks = exc

self._active = False
ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
if self._exceptions:
raise BaseExceptionGroup(
"unhandled errors in a TaskGroup", self._exceptions
Expand Down
7 changes: 5 additions & 2 deletions tests/test_taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,12 +797,15 @@ async def child(fail: bool) -> None:
async def test_cancel_cascade() -> None:
async def do_something() -> NoReturn:
async with create_task_group() as tg2:
tg2.start_soon(sleep, 1)
print(f"tg2 ({id(tg2):x}) cancel scope: {id(tg2.cancel_scope):x}\n")
tg2.start_soon(sleep, 1, name="sleep")

print("exited task group tg2")
raise Exception("foo")

async with create_task_group() as tg:
tg.start_soon(do_something)
print(f"tg ({id(tg):x}) cancel scope: {id(tg.cancel_scope):x}")
tg.start_soon(do_something, name="do_something")
await wait_all_tasks_blocked()
tg.cancel_scope.cancel()

Expand Down

0 comments on commit 437c507

Please sign in to comment.