Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio.create_subprocess_exec does not respond properly to asyncio.CancelledError #103847

Open
DarkArc opened this issue Apr 25, 2023 · 10 comments
Labels
topic-asyncio type-bug An unexpected behavior, bug, or error

Comments

@DarkArc
Copy link

DarkArc commented Apr 25, 2023

Bug report

asyncio programs that call proc = await asyncio.create_subprocess_exec but do not reach the call to await proc.communicate are not properly cancelled.

This can be observed in the following script (it may take a few runs to observe):

import asyncio
import functools
import signal

counter = 0

async def run_bash_sleep():
  global counter
  counter += 1
  local_counter = counter
  try:
    print(f"Started - {local_counter}")
    proc = await asyncio.create_subprocess_exec(
      'bash', '-c', 'sleep .001',
      stdout = asyncio.subprocess.PIPE,
      stderr = asyncio.subprocess.PIPE,
      start_new_session = True
    )

    print(f"Waiting - {local_counter}")
    stdout, stderr = await proc.communicate()
    print(f"Done - {local_counter}!")
  except asyncio.CancelledError:
    print(f"Canceled - {local_counter}!")

async def run_loop(loop):
  max_jobs = 8
  active_tasks = []
  while True:
    try:
      # Add jobs to the list of active jobs
      while len(active_tasks) < max_jobs:
        active_tasks.append(loop.create_task(run_bash_sleep()))

      # All tasks have finished, end the loop
      if len(active_tasks) == 0:
        break

      # Wait for a test to finish (or a 1 second timeout)
      done, pending = await asyncio.wait(
        active_tasks,
        timeout = 1,
        return_when = asyncio.FIRST_COMPLETED
      )

      print(f"Running jobs: {len(active_tasks)}")

      # Update the active jobs
      active_tasks = list(pending)
    except asyncio.CancelledError:
      max_jobs = 0

def stop_asyncio_loop(signame, loop):
  for task in asyncio.all_tasks(loop):
    task.cancel()

def main():
  loop = asyncio.new_event_loop()

  asyncio.set_event_loop(loop)

  for signame in {'SIGINT', 'SIGTERM'}:
    loop.add_signal_handler(
      getattr(signal, signame),
      functools.partial(stop_asyncio_loop, signame, loop)
    )

  loop.run_until_complete(loop.create_task(run_loop(loop)))

main()

When the signal handler cancels the tasks, any task that hasn't made it to await proc.communicate() will never complete.

A subsequent SIGTERM to the script can then actually terminate the task; however, I'd expect the first call to cancel() to disrupt the coroutine.

Your environment

  • CPython versions tested on: 3.11.2, 3.11.3
  • Operating system and architecture: Fedora 38, x86_64
@DarkArc DarkArc added the type-bug An unexpected behavior, bug, or error label Apr 25, 2023
@DarkArc
Copy link
Author

DarkArc commented Apr 25, 2023

Curiously this doesn't seem to affect Python 3.6.8 (running on 3.6 requires a change to the stop_asyncio_loop function, switching to asyncio.Task.all_tasks instead of asyncio.all_tasks).

Instead of an indefinite hang, a message is printed akin to Unknown child process pid 37535, will report returncode 255

@gvanrossum
Copy link
Member

That looks like a nasty problem. Do you want to help by checking the logic of asyncio signals and subprocess creation? There might even be a simple fix.

@DarkArc
Copy link
Author

DarkArc commented May 1, 2023

I don't currently have the bandwidth personally or professionally at the moment unfortunately. :(

I took a quick look through the create_subprocess_exec logic before reporting the issue, and didn't see anything that jumped out to me, but that doesn't mean there's nothing there.

@gvanrossum
Copy link
Member

I’m in a similar situation, so we‘ll have to leave this open for a while.

@graingert
Copy link
Contributor

graingert commented Oct 19, 2024

Copying my analysis from: #125502 (comment)

the problem occurs when asyncio.runners._cancel_all_tasks is run at an inopportune instant when connecting pipes:

This task gets cancelled:

self._loop.create_task(self._connect_pipes(waiter))

which means self._pending_calls is never run:

loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
loop.call_soon(callback, *data)
self._pending_calls = None

so when _try_finish appends self._call_connection_lost to self._pending_calls:

self._call(self._call_connection_lost, None)

call_connection_lost is never called, which means self._exit_waiters are never woken:

def _call_connection_lost(self, exc):
try:
self._protocol.connection_lost(exc)
finally:
# wake up futures waiting for wait()
for waiter in self._exit_waiters:
if not waiter.cancelled():
waiter.set_result(self._returncode)
self._exit_waiters = None
self._loop = None
self._proc = None
self._protocol = None

Here's a demo that hangs every time for me:

import sys
import inspect
import asyncio
from subprocess import PIPE


async def run_sleep():
    proc = await asyncio.create_subprocess_exec(
        "sleep",
        "0.002",
        stdout=PIPE,
    )
    await proc.communicate()


async def amain():
    loop = asyncio.get_running_loop()
    task = asyncio.current_task(loop)
    coro = task.get_coro()

    called_cancel = False

    def cancel_eventually():
        my_coro = coro
        while inspect.iscoroutine(my_coro.cr_await):
            my_coro = my_coro.cr_await
        if my_coro.cr_code is loop._make_subprocess_transport.__code__:
            print("_cancel_all_tasks")
            tasks = asyncio.all_tasks()
            for task in tasks:
                task.cancel()
        else:
            loop.call_soon(cancel_eventually)

    loop.call_soon(cancel_eventually)
    await run_sleep()


def main():
    asyncio.run(amain())


if __name__ == "__main__":
    sys.exit(main())

@graingert
Copy link
Contributor

graingert commented Oct 19, 2024

@kumaraditya303 I think the fix is to use:

self._waiter = waiter
self._task = task = self._loop.create_task(self._connect_pipes())
task.add_done_callback(self._wake_waiter_and_call_pending_calls_or_close)

Then in def _wake_waiter_and_call_pending_calls_or_close(self, task): ... check the exception/done/cancelled state of task and do the right thing

I've spent a little time fiddling around but can't get the tests to pass and not spray a bunch of errors!

@graingert
Copy link
Contributor

Are these the same issue?

#115787
#105288

@savannahostrowski
Copy link
Member

@graingert Happy to take a look at this if no one else has started!

@kumaraditya303
Copy link
Contributor

@kumaraditya303 I think the fix is to use:

I think the same although I don't think it's that simple, we possibly need to rework and audit whole cancelation of subprocess, I am pretty sure it is broken at more places than this.

@1st1
Copy link
Member

1st1 commented Oct 29, 2024

@graingert @kumaraditya303

I think the fix is to use:
self._waiter = waiter
self._task = task = self._loop.create_task(self._connect_pipes())
task.add_done_callback(self._wake_waiter_and_call_pending_calls_or_close)

I'm not so sure. This might fix the problem now, but IMO lead to code that's hard to maintain. If, say, connect_pipes task in time gets some other code that mustn't be cancelled and instead should be put into the callback.

In general, I think that separating logic between async/await code and done callbacks is a big antipattern.

Instead I'd think about shielding the connect_pipes/init code from cancellation, or handling it explicitly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-asyncio type-bug An unexpected behavior, bug, or error
Projects
Status: Todo
Development

No branches or pull requests

7 participants