Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-42130: Fix for explicit suppressing of cancellations in wait_for() #28149

Closed
wants to merge 20 commits into from

Conversation

Dreamsorcerer
Copy link
Contributor

@Dreamsorcerer Dreamsorcerer commented Sep 3, 2021

Fix wait_for() cancelling when the inner task suppresses a cancellation.

@Dreamsorcerer
Copy link
Contributor Author

That bot probably shouldn't add the awaiting review tag when it's a draft PR...

@Dreamsorcerer Dreamsorcerer marked this pull request as ready for review September 4, 2021 15:06
Lib/asyncio/tasks.py Outdated Show resolved Hide resolved
Lib/test/test_asyncio/test_tasks.py Outdated Show resolved Hide resolved
Lib/test/test_asyncio/test_tasks.py Outdated Show resolved Hide resolved
Lib/asyncio/tasks.py Outdated Show resolved Hide resolved
Lib/test/test_asyncio/test_tasks.py Outdated Show resolved Hide resolved
@github-actions
Copy link

This PR is stale because it has been open for 30 days with no activity.

@Dreamsorcerer
Copy link
Contributor Author

@asvetlov Updated this PR, so should be ready to merge now.

@asvetlov asvetlov removed the stale Stale PR or inactive for long period of time. label Dec 1, 2021
Copy link
Contributor

@asvetlov asvetlov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in general, but please address small improvements if you agree with them.

Lib/asyncio/tasks.py Outdated Show resolved Hide resolved
Lib/asyncio/tasks.py Outdated Show resolved Hide resolved
task = loop.create_task(asyncio.wait_for(fut, timeout=1))
loop.call_later(0.1, task.cancel)
res = loop.run_until_complete(task)
self.assertEqual(res, "ok")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting this test back in, with a comment. It does manage to trigger the race condition aaliddel mentioned.

fut = self.new_future(loop)
loop.call_later(0.1, fut.set_result, "ok")
res = loop.run_until_complete(asyncio.wait_for(inner(), timeout=0.1))
self.assertEqual(res, "ok")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted my last test to use new_test_loop(), which allows to reproduce the issue without suppressing cancellation.

@@ -439,7 +439,7 @@ async def wait_for(fut, timeout):
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise
return fut.result()
Copy link
Contributor Author

@Dreamsorcerer Dreamsorcerer Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that a cancellation suppressed in the inner future, also suppresses it in wait_for() as one would expect (see test_wait_for_suppress_cancellation).

@Dreamsorcerer Dreamsorcerer changed the title bpo-42130: Fix for suppressing cancellations in wait_for() bpo-42130: Fix for explicit suppressing of cancellations in wait_for() Dec 15, 2021
@Traktormaster
Copy link

The changes hardly improve the behaviour of wait_for if at all. I've done substantially more testing and research which I've summarized in the readme here: https://github.com/Traktormaster/wait-for2/tree/main

I've also tested the current version from this PR and it ignores the cancellation in edge cases.

At this point I'm not sure what the goal is here, but just tweaking the existing implementation seems to be a fruitless endeavour.

@Dreamsorcerer
Copy link
Contributor Author

The changes hardly improve the behaviour of wait_for if at all. I've done substantially more testing and research which I've summarized in the readme here: https://github.com/Traktormaster/wait-for2/tree/main

I've also tested the current version from this PR and it ignores the cancellation in edge cases.

At this point I'm not sure what the goal is here, but just tweaking the existing implementation seems to be a fruitless endeavour.

See my comments above, which explain the change to the PR and a proposal for how the race condition can actually be fixed: #28149 (comment)
#28149 (comment)

I believe you said that wait-for2 still does not fix the problem, so I don't see how that's going to help either. To actually fix these race conditions requires changes outside of the wait_for() function.

@Traktormaster
Copy link

Traktormaster commented Dec 20, 2021

See my comments above, which explain the change to the PR and a proposal for how the race condition can actually be fixed

The idea to fix the order in which different coroutines are scheduled is interesting.

Obsolete/incorrect points

Although I can easily come up with an example that would not be solved by it:

import asyncio


async def main():
    evt = asyncio.Event()

    async def coroutine_function():
        try:
            resource = object()
            evt.set()  # to synchronize execution to an edge-case
            return resource
        finally:
            # simulate some time-consuming cleanup, during which:
            #   - this coroutine shall expect the result to be returned successfully
            #   - the coroutine is not done, so cancelling it is possible
            await asyncio.sleep(0.5)

    async def with_for_coro():
        await asyncio.wait_for(coroutine_function(), timeout=1)
        assert False, 'End of with_for_coro. Should not be reached!'

    task = asyncio.create_task(with_for_coro())
    await evt.wait()  # to synchronize execution to an edge-case
    assert not task.done()
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass
    else:
        assert False, "cancellation ignored"

asyncio.run(main())

Here the coroutine_function() that is being waited for will initiate a successful return, but a long cleanup will give plenty time for a simultaneous cancellation to take place. This makes any kind of scheduling after the cancellation irrelevant to the problem because it happens before/at the cancellation. Ultimately the returned resource gets lost. I could complicate the return path of the coroutine even more, by repeatedly initiating return and aborting it:

class Retry(Exception):
    pass


def x():
    i = 0
    while True:
        try:
            i += 1
            try:
                return i
            finally:
                if i < 10:
                    raise Retry()  # simulate failed cleanup or something
        except Retry:
            pass


print('x', x())  # prints 10

To make a scheduling based solution work, the return-state of a coroutine would need to be protected, so until a return or raise is in progress, a cancellation will block or fail. And that is still not enough, because as the second snippet demonstrates, a return can be aborted, which means a task may fail cancellation because it is ending, but later it becomes running again.

I believe you said that wait-for2 still does not fix the problem, so I don't see how that's going to help either.

I provides a way to actually resolve the race-condition, so for now we can write well behaving applications.

To actually fix these race conditions requires changes outside of the wait_for() function.

Yes, when a waiting construct is cancelled explicitly, that cancellation is produced in a (usually) different execution context than what is being waited for a result. Unless you can absolutely avoid these two execution contexts (coroutine/task/thread by implementation) to reach the point of cancellation/result-production simultaneously, you can't fix the wait-for with any other changes outside of it. I'd argue (though I could be wrong) that such is not possible since by definition the two execution contexts are executed in parallel, independently. So to reach an ideal wait-for behaviour I agree, we need more changes outside of it, but I doubt that they are possible to do.

This problem appears to be deceptively easy by a glance, but it is very complex to get right. I've made an alternative wait-for implementation that does not adhere to the infeasible design goals of the original as it stands right now. That is why it can work better.

At this point I've already pointed out that ignoring the cancellation or the result is the problem and we're not any closer to fixing that. This PR in its current state is more likely to ignore cancellations than before. I can see it without any testing, there are two returns in the explicit cancellation handling, while none should be there.

...
        except exceptions.CancelledError:
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
                return fut.result()
...

If anyone has doubts that the current wait-for is much more broken that it appears to be, see the new behaviour comparison tables I've put into the readme. They are compiled from the tests in my repository: https://github.com/Traktormaster/wait-for2

I'm not sure what else to say, I'd like to use wait-for from the builtin library, but as it stands now, it is unusably broken, and this PR does not change that.

@Dreamsorcerer
Copy link
Contributor Author

Dreamsorcerer commented Dec 20, 2021

The idea to fix the order in which different coroutines are scheduled is interesting, although I can easily come up with an example that would not be solved by it:

Isn't that a race condition in the user's code, not a problem with wait_for()? The code is pretty much the same as:

    async def coroutine_function():
        resource = object()
        await asyncio.sleep(0.5)
        return resource

At which point it is obvious that the task can be cancelled after creating the resource, but before returning it. i.e. The same issue would still be present if not using wait_for(), right? With something like:

task = asyncio.create_task(coroutine_function())
await asyncio.sleep(0)
assert not task.done()
task.cancel()
await task

This PR in its current state is more likely to ignore cancellations than before. I can see it without any testing, there are two returns in the explicit cancellation handling, while none should be there.

Why? I don't see that. The 2nd return, that is in this PR, will only ignore a cancellation if the inner task ignores the cancellation, which would be the expected behaviour... Otherwise, the behaviour shouldn't be any different.

@aaliddell
Copy link

See my comments above, which explain the change to the PR and a proposal for how the race condition can actually be fixed

The idea to fix the order in which different coroutines are scheduled is interesting, although I can easily come up with an example that would not be solved by it:

import asyncio


async def main():
    evt = asyncio.Event()

    async def coroutine_function():
        try:
            resource = object()
            evt.set()  # to synchronize execution to an edge-case
            return resource
        finally:
            # simulate some time-consuming cleanup, during which:
            #   - this coroutine shall expect the result to be returned successfully
            #   - the coroutine is not done, so cancelling it is possible
            await asyncio.sleep(0.5)

    async def with_for_coro():
        await asyncio.wait_for(coroutine_function(), timeout=1)
        assert False, 'End of with_for_coro. Should not be reached!'

    task = asyncio.create_task(with_for_coro())
    await evt.wait()  # to synchronize execution to an edge-case
    assert not task.done()
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass
    else:
        assert False, "cancellation ignored"

asyncio.run(main())

Here the coroutine_function() that is being waited for will initiate a successful return, but a long cleanup will give plenty time for a simultaneous cancellation to take place. This makes any kind of scheduling after the cancellation irrelevant to the problem because it happens before/at the cancellation. Ultimately the returned resource gets lost. I could complicate the return path of the coroutine even more, by repeatedly initiating return and aborting it:

class Retry(Exception):
    pass


def x():
    i = 0
    while True:
        try:
            i += 1
            try:
                return i
            finally:
                if i < 10:
                    raise Retry()  # simulate failed cleanup or something
        except Retry:
            pass


print('x', x())  # prints 10

I don't think these are representative examples of the race condition, rather they are badly formed coroutines that don't take into account cancellation being possible at any point of a coroutine's execution until it completes. A fixed wait_for cannot not be expected to know how to fix coroutines that don't handle cancellations correctly, as these coroutines would leak if they were cancelled by any other method besides being wrapped by wait_for (e.g. during app shutdown). If you want to fix these from leaking resources, you would have to protect the finally blocks from cancellations.

The race condition issue is cancellation happening after the coroutine completes but before the wait_for task has been rescheduled. In the above examples, that race point happens after your finally has completed and the function completely exited.

To achieve this, we would need to establish some kind of dependency between the with_for_coro() future and the coroutine_function() future. Such that, if with_for_coro() is cancelled, it will always be scheduled to run ahead of coroutine_function(). This obviously requires some changes to the event loop. Maybe a function that says fut1 depends on fut2, so if fut1 and fut2 are in loop._ready at the same time, fut2 will always get called before fut1?

@Dreamsorcerer: Is this solvable by amending the event loop interface to allow saying "when coroutine A completes (the wrapped task), switch straight to this other coroutine B (the return to wait_for) rather than moving to any ready task"? This prevents explicitly having to scan a dependency graph and would prevent execution of another coroutine that could inject the external cancellation. e.g. a very dumbed down event loop pseudocode:

ready_tasks = []
while True:
    # The usual asyncio behaviour
    task = ready_tasks.pop(0)
    task.run()

    # Run down dependency chain until we hit a task with next_task == None
    # In this case, next_task would be the return to wait_for
    while task.next_task:
        task = task.next_task
        task.run()
        # Cleanup tasks list needed here

@Traktormaster
Copy link

I concede that the example was faulty for the scheduling proposal. I've already started to amend it while your comments arrived. I guess the question would be if that would not burn too much resources for handling rare edge cases, sounds promising otherwise.

Why? I don't see that. The 2nd return, that is in this PR, will only ignore a cancellation if the inner task ignores the cancellation, which would be the expected behaviour... Otherwise, the behaviour shouldn't be any different.

The problem there is, that the inner task may have finished and not even got the cancellation. It is different if the inner task receives the cancellation and ignores it, or it finishes, despite the explicit cancellation from the outer code. This was the original issue why I made wait_for2 in the first place:

import asyncio


async def main():

    async def do_something():
        for _ in range(10):
            await asyncio.sleep(0.1)
        return object()

    async def work_coro():
        while True:
            await asyncio.wait_for(do_something(), timeout=5)

    task = asyncio.create_task(work_coro())
    await asyncio.sleep(1.0)  # needs adjustment to actually produce the race condition
    task.cancel()
    await asyncio.gather(task, return_exceptions=True)

asyncio.run(main())

In the example if the do_something() returns just at the right moment (race-condition) when the task is being cancelled, the wait_for will ignore the cancellation, because the inner future completes and never gets the cancellation.
The outer code never even checks if the cancellation was ignored, because there is no code in the do_something() that could ignore the cancellation. This makes the gather() call hang indefinitely because of the state caused by the race-condition.

@Dreamsorcerer
Copy link
Contributor Author

@Dreamsorcerer: Is this solvable by amending the event loop interface to allow saying "when coroutine A completes (the wrapped task), switch straight to this other coroutine B (the return to wait_for) rather than moving to any ready task"? This prevents explicitly having to scan a dependency graph and would prevent execution of another coroutine that could inject the external cancellation. e.g. a very dumbed down event loop pseudocode:

Hmm, I'm not sure. Maybe I'm misunderstanding, but I don't think that works, because you're saying you want to run coro() immediately after wait_for(), but the loop will see coro() first, so will already have run it before then.

So, the situation we are at, is that coro() is already waiting to be run on the next iteration (loop._ready == [coro()]). Then we cancel the wait_for(), which gets added as well (loop._ready == [coro(), wait_for()]).

So, we could declare that coro() has a dependency on wait_for(), then when the loop picks up coro(), it could check wait_for() in loop._ready and switch to running that task first. If there are a lot of (dependant) tasks ready, this could be quite bad for performance.

Alternatively, we could declare the relationship in reverse, from wait_for() to coro(). Then when we cancel wait_for() and it is getting scheduled into loop._ready we can check for the position of coro() and insert it just before (roughly loop._ready.insert(wait_for(), loop._ready.index(coro()))). If we decide that this should only happen on cancellations, then that would limit the performance impact to cancelling dependant tasks, which might be a very reasonable approach as it only penalizes the edge case.

@Traktormaster
Copy link

Traktormaster commented Dec 21, 2021

This example explicitly hangs on the first iteration for me every time:

import asyncio


async def main():

    async def do_something():
        await asyncio.sleep(0.01)
        return object()

    async def work_coro():
        while True:
            await asyncio.wait_for(do_something(), timeout=5)

    for i in range(1000):
        print(i)
        task = asyncio.create_task(work_coro())
        await asyncio.sleep(0.01)
        task.cancel()
        await asyncio.gather(task, return_exceptions=True)

asyncio.run(main())

I'd argue it must never hang by ignoring the cancellation, as there is no code here that would indicate such is possible.

Update: This demonstration seemed to be very stable and simple so I integrated it to wait_for2 tests: https://github.com/Traktormaster/wait-for2/blob/main/tests/test_lost_cancellation.py

@Dreamsorcerer
Copy link
Contributor Author

This example explicitly hangs on the first iteration for me every time

I'll try to rewrite it as an actual test and add it to this PR. Probably, we'll just have to look at fixing the race condition before merging any other changes.

@Traktormaster
Copy link

Traktormaster commented Dec 26, 2021

Would the cancellation scheduling fix this case? Would this not certainly produce a race condition? (when an explicit CancellationError also has a result)
This example also stably ignores the cancellation on 3.9 currently and never stops:

import asyncio
from functools import partial


async def main():
    loop = asyncio.get_running_loop()

    async def work_coro(f_):
        await asyncio.wait_for(f_, timeout=5)
        while True:
            await asyncio.sleep(1)

    f = loop.create_future()
    loop.call_later(0.01, partial(f.set_result, object()))
    task = asyncio.create_task(work_coro(f))
    loop.call_later(0.01, partial(task.cancel))
    await asyncio.gather(task, return_exceptions=True)

asyncio.run(main())

Edit: I was looking through some codes when I realized this is a prominent use-case. I have several of these (where a wait_for awaits on a Future object), they implement request-response multiplexing over socket communication for example: Each request has a session-id that is mapped to its future using a dict. With that, a response can lookup and set the future's result if it has not been abandoned/cancelled.

@Dreamsorcerer
Copy link
Contributor Author

Dreamsorcerer commented Dec 26, 2021

Hmm, using a Future like that, instead of a Task may be more tricky. I'll have a look and see how it works, but I'm guessing the Future doesn't get added to loop._ready, which won't help much.

@QuantumTim
Copy link

QuantumTim commented Jan 4, 2022

I'm a bit late to the party, but I think there's an underlying problem that asyncio.Task cancellation is broken by itself, and so it's impossible to build something correctly on top of it (such as wait_for).

For example, the following code doesn't use wait_for at all, but still has a resource leak race condition:

Show code
import asyncio

# Global variable to store the outer Task so we can cancel it at the right point.
use_resource_task = None

class Resource:
    "Simulates some resource that needs releasing."
    def __init__(self):
        self.acquired = True
        print('Resource created')
    def __del__(self):
        print(f'Resource {"leaked!!!" if self.acquired else "destroyed"}')
    def __enter__(self):
        pass
    def __exit__(self, *args):
        self.release()
    def release(self):
        print('Resource released')
        self.acquired = False


async def acquire_resource():
    # Simulate a race to cancel the use_resource_task.
    use_resource_task.cancel()
    return Resource()


async def use_resource(use_inner_task):
    resource = None
    try:
        t = acquire_resource()
        if use_inner_task:
            t = asyncio.create_task(t, name=f'Inner')

        # BUG HERE: If use_inner_task is True, then the Task cancellation code will
        # cancel this task/coro after acquire_resource has run and returned, but
        # before the await completes, so the result is never bound to resource and
        # our context manager never gets a chance to clean up correctly.
        with await t as resource:
            print(f'use_resource: using resource')
            return 'Success'
    except asyncio.CancelledError:
        print('use_resource: cancelled')
        raise

async def test(use_inner_task):
    global use_resource_task
    print(f'Using inner task = {use_inner_task}')
    use_resource_task = asyncio.create_task(use_resource(use_inner_task), name = 'Outer')
    try:
        success = await use_resource_task
    except asyncio.CancelledError:
        print('test: use_resource was cancelled')
    except Exception as exc:
        print(f'test: use_resource exception: {exc!r}')
    else:
        print(f'test: use_resource returned {success}')
    print()

asyncio.run(test(use_inner_task = False))
asyncio.run(test(use_inner_task = True))

The output from running this is:

$ python asyncio-bug.py 
Using inner task = False
Resource created
use_resource: using resource
Resource released
Resource destroyed
test: use_resource was cancelled

Using inner task = True
Resource created
Resource leaked!!!
use_resource: cancelled
test: use_resource was cancelled

See in particular that there's a difference between use_resource calling acquire_resource directly as a coroutine vs wrapping it in a Task. In the former case, when we cancel the outer use_resource Task, the asyncio.Task code has no visibility of the await t line, so use_resource returns "Success", but then the await use_resource_task line raises the CancelledError (in essence, the "Success" string is leaked). When we wrap the inner coroutine as a task, the asyncio.Task code now 'sees' the await t line, so takes that opportunity to do the cancellation, but this is after acquire_resource has handed over responsibility for the resource, but before use_resource gets to see it.

The "leak" happens in tasks.py:236, where exc.value is being thrown away in favour of cancelling the Task. You can change this code to do super().set_result(exc.value) instead, and keep the result, but then you can run into the problem that @Traktormaster raises, which is that you silently lose the cancellation, even though nothing actively ignored it.

Another problem that a coroutine doesn't really have any way of knowing if it's being called within a Task or not, and so if it gets a CancelledError, it doesn't know if that's:

  1. The outer coroutine wrapping us in a Task and then directly cancelling us, in which case it's probably okay to squash the CancelledError and just return the result if we have it.
  2. The outer coroutine is the Task that got cancelled, and we're just a coroutine that's being called directly, but the Task happens to be awaiting inside our code. In this case, if we squash the exception and return the result, then the outer code won't know it needs to cancel.

This is related to the problem in wait_for (below). Once we catch the CancelledError, we don't know if the caller is just cancelling the wait_for call (i.e. f = wait_for(coro); f.cancel(), in which case we can just pretend the cancellation came too late and return the result, or if the caller itself is being cancelled, in which case we ought to propagate the cancellation - but then we leak the result, which is also bad.

# wait until the future completes or the timeout
try:
    await waiter
except exceptions.CancelledError:
    if fut.done():
        # What to do here? Either choice might be wrong!
        sys.exit( "Does not compute!" )

I'm not sure I have a good solution to the problem. Even though there's a bit more control with coroutines, it feels very similar to the problem with terminating threads, in that it's hard to make sure you do it when it's safe. One idea I tried was this, which effectively makes it so only a Future can raise CancellationError when you await on it - the Task code won't spuriously generate new CancellationError exceptions in places where a value might get leaked. It does however also make it so that once you cancel() a Task, it will keep cancelling any Future's that you await on until the Task completes (or lets itself be cancelled). It might be possible to add some mechanism like CancelledError.abort() for code to explicitly abort a cancellation attempt.

@Dreamsorcerer
Copy link
Contributor Author

Dreamsorcerer commented Jan 4, 2022

Just making a note to myself for another test based on that. We could have loop._ready == [coro(), cancel_wait_for()], after coro() has run we would have loop._ready == [cancel_wait_for(), wait_for()]. If cancel_wait_for() runs next, then wait_for() will be cancelled despite being ready to get the result from coro().

Again, if we can create some kind of dependency that ensures that wait_for() is run immediately as the next task, then I still think it can work. The example above would still fail as the inner task cancels the outer task just before returning. I'd say this is user error though and not an actual race condition.

@QuantumTim
Copy link

I made the inner task cancel the outer one just to ensure the race condition happened, but it's not limited to happenning in that case. It could easily have been done from another Task that ran after acquire_resource but before use_resource (i.e. loop._ready = [acquire_resource(), cancel_use_resource()] followed by loop._ready = [cancel_use_resource(), use_resource()]). So presumably that would mean you'd need to do this dependency tracking for arbitrary tasks, not just things with wait_for()? This would presumably also mean some tasks might have to run more than once per loop, for example:

def waiter():
    await t1
    await t2
    await t3
    await t4
    await t5

Now if you had loop._ready = [t1(), t2(), t3(), t4(), t5()] and each of those tasks completed this time around the loop, then you need to run waiter() 5 times, once after each other task completes. That might start to starve out other tasks if waiter() is doing anything more complicated in between those awaits. I don't think it would be possible to completely block other tasks from running, unless anything else was able to insert stuff into the middle of the ready queue - I'm guessing this is the reason why new tasks/callbacks scheduled during loop._run_once don't actually get run until the next iteration of the loop, so I'd be a bit wary about making an exception for this case.

Furthermore, I don't think there's anything in the documentation saying that a task is prevented from cancelling itself or something awaiting on it. In fact, that code in tasks.py only makes sense if it's possible for a task to become cancelled while it's running. If task._must_cancel is set at the start of task.__step then it unsets it and raises a cancellation error into the coro, so you can only hit that code if it became set (again) while the task was running.

@Dreamsorcerer
Copy link
Contributor Author

Dreamsorcerer commented Jan 7, 2022

I made the inner task cancel the outer one just to ensure the race condition happened,

Yep, I understood, just clarifying that my proposal wouldn't fix it, but that specific case probably doesn't matter, as it doesn't make much sense to cancel the current task right before returning.

That might start to starve out other tasks if waiter() is doing anything more complicated in between those awaits.

Maybe. I was thinking of making it an explicit dependency, maybe something like asyncio.create_task(foo(), linked=True) or whatever. So, only a few tasks would be linked like that where required. I think in terms of how much that affects the running of the code, it should be pretty much the same as if not using a task.

i.e. The code would run in the same order with both of these examples:

t = asyncio.create_task(foo(), linked=True)
await t
await foo()

whereas this could run in a different order (another task might be run after foo() completes, but before returning to this function:

t = asyncio.create_task(foo())
await t

So, the objective is just to be able to run a task as if it were just a nested coro.

This approach does add a burden to the user to recognise such risks and use this parameter, but I've not come up with any better ideas yet.

@twisteroidambassador
Copy link
Contributor

twisteroidambassador commented Oct 17, 2022

I have just recently found out that one of my libraries had always been affected by this bug.

Here's a script that reliably reproduces the bug on Python 3.9 - 3.11, without relying on timing coincidence:

import asyncio


async def left(tasks: list[asyncio.Task], event: asyncio.Event):
    me = asyncio.current_task()
    assert tasks[0] is me
    await asyncio.sleep(0.1)
    event.set()
    tasks[1].cancel()
    print('cancelled right')


async def right(tasks: list[asyncio.Task], event: asyncio.Event, use_wait_for: bool):
    try:
        me = asyncio.current_task()
        assert tasks[1] is me
        if use_wait_for:
            await asyncio.wait_for(event.wait(), 1)
        else:
            await event.wait()
        print('right done, was not cancelled')
    except asyncio.CancelledError:
        print('right got cancelled')
        raise


async def left_right(use_wait_for: bool):
    tasks = []
    event = asyncio.Event()
    tasks.append(asyncio.create_task(left(tasks, event)))
    tasks.append(asyncio.create_task(right(tasks, event, use_wait_for)))
    await asyncio.wait(tasks)


if __name__ == '__main__':
    print('Not using wait_for():')
    asyncio.run(left_right(False))
    print('\nUsing wait_for():')
    asyncio.run(left_right(True))

Actual output:

Not using wait_for():
cancelled right
right got cancelled

Using wait_for():
cancelled right
right done, was not cancelled

@kumaraditya303
Copy link
Contributor

Superseded by #96764

@gvanrossum
Copy link
Member

And fixed in 3.12 by gh-96764. (We're not 100.00% sure that the fix doesn't disturb some workaround, so we're not backporting the fix to 3.11 even though that has timeout(), and we would have to devise a totally different fix for 3.10, which we're not inclined to do, sorry. But going forward it should be fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
awaiting changes tests Tests in the Lib/test dir
Projects
None yet
Development

Successfully merging this pull request may close these issues.