Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented voluntary cancellation in worker threads #629

Merged
merged 12 commits into from
Nov 22, 2023

Conversation

agronholm
Copy link
Owner

No description provided.

Copy link
Contributor

@richardsheridan richardsheridan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs impl and tests are good. I don't know what the exact implications of using _parent_cancelled() for this are, but any edge cases would need documentation rather than fixing.

Without implementing parent task reuse, there might be some divergence of from_thread behaviors between backends. Most users won't run into that though, so I won't suggest you hold a release over it.

Comment on lines 27 to 47
:param cancellable: ``True`` to allow cancellation of the operation
:param abandon_on_cancel: ``True`` to abandon the thread (leaving it to run
unchecked on own) if the host task is cancelled, ``False`` to ignore
cancellations in the host task until the operation has completed in the worker
thread
:param cancellable: deprecated alias of ``abandon_on_cancel``
:param limiter: capacity limiter to use to limit the total amount of threads running
(if omitted, the default limiter is used)
:return: an awaitable that yields the return value of the function.

"""
if cancellable is not None:
abandon_on_cancel = cancellable
warn(
"The `cancellable=` keyword argument to `anyio.to_thread.run_sync` is "
"deprecated since AnyIO 4.1.0; use `abandon_on_cancel=` instead",
DeprecationWarning,
stacklevel=2,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allowing both aliases to be passed is a valid choice but departs from trio. Maybe also document/test which one overrides the other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree if both are passed it'd be nice to throw an error, but then you need to change the types a little 🤷‍♀️ happy with either implementation in practice since a warning will still be raised.

@agronholm
Copy link
Owner Author

Docs impl and tests are good. I don't know what the exact implications of using _parent_cancelled() for this are, but any edge cases would need documentation rather than fixing.

Without implementing parent task reuse, there might be some divergence of from_thread behaviors between backbends. Most users won't run into that though, so I won't suggest you hold a release over it.

Can you elaborate on the concept of parent task reuse?

@agronholm
Copy link
Owner Author

agronholm commented Nov 6, 2023

Docs impl and tests are good. I don't know what the exact implications of using _parent_cancelled() for this are, but any edge cases would need documentation rather than fixing.
Without implementing parent task reuse, there might be some divergence of from_thread behaviors between backbends. Most users won't run into that though, so I won't suggest you hold a release over it.

Can you elaborate on the concept of parent task reuse?

I think a snippet which demonstrates the differences between this PR and trio would be great.

@richardsheridan
Copy link
Contributor

richardsheridan commented Nov 6, 2023

Sure, pulled from the test suite:

async def test_cancelscope_propagation():
    async def async_time_bomb():
        cancel_scope.cancel()
        with fail_after(1):
            await sleep(3)

    with CancelScope() as cancel_scope:
        await run_sync(from_thread_run, async_time_bomb)

    assert cancel_scope.cancelled_caught


async def test_from_thread_reuses_task():
    task = get_current_task()

    async def async_current_task():
        return get_current_task()

    assert task is await run_sync(from_thread_run_sync, get_current_task)
    assert task is await run_sync(from_thread_run, async_current_task)
    assert task is not await run_sync(from_thread_run_sync, get_current_task, abandon_on_cancel=True)
    assert task is not await run_sync(from_thread_run, async_current_task, abandon_on_cancel=True)

@richardsheridan
Copy link
Contributor

@graingert pushed for it in the first place so maybe he can give some concrete use case?

python-trio/trio#2392 (comment)

@agronholm
Copy link
Owner Author

Right, so what happens here is Task 1 -> worker thread -> Task 2 which then cancels Task 1's scope, but Task 1 can't exit because it still waits for the worker thread. I think this was a good example and I may need to rework the implementation.

@agronholm
Copy link
Owner Author

I have a creeping feeling that implementing this for the trio backend could be very problematic.

@richardsheridan
Copy link
Contributor

Right, so what happens here is Task 1 -> worker thread -> Task 2 which then cancels Task 1's scope, but Task 1 can't exit because it still waits for the worker thread. I think this was a good example and I may need to rework the implementation.

Well on Trio it's implemented as Task 1 -> worker thread -> Task 1 if abandon_on_cancel==False. This means Task 1 indeed cancels itself, but also unwinds the thread and waits for that unwinding to complete. You could implement it as Task 1 -> worker thread -> Task 2 as long as Task 2 is subject to the CancelScope.

Again, depending on how long you think it might take to do this on both backends, it might be a higher priority to release as-is and save parent task reuse for the next version.

@agronholm
Copy link
Owner Author

I've been scratching my head trying to figure out why test_cancelscope_propagation() doesn't work as expected with the trio backend. If I use trio.from_thread.run() to run the nested task, it works as expected, but not if I use anyio.from_thread.run(). This is unexpected because behind the scenes, anyio.from_thread.run() also calls trio.from_thread.run()!

@agronholm
Copy link
Owner Author

I've been scratching my head trying to figure out why test_cancelscope_propagation() doesn't work as expected with the trio backend. If I use trio.from_thread.run() to run the nested task, it works as expected, but not if I use anyio.from_thread.run(). This is unexpected because behind the scenes, anyio.from_thread.run() also calls trio.from_thread.run()!

Oh yeah, I noticed that if I pass the trio token explicitly via trio_token= to trio.from_thread_run(), it fails to work as expected. Why? I checked that in the absence of an explicit task, it automatically retrieves the same token from the PARENT_TASK_DATA threadlocal! As a matter of fact, if I remove the trio_token kwarg from the call in TrioBackend, the test suite passes anyway, so I might just do that, but I want to understand why this is happening.

@agronholm
Copy link
Owner Author

Ohh...now I see. If an explicit token is provided, it always runs the new task in the system nursery, even if there is a parent task.

@richardsheridan
Copy link
Contributor

Ohh...now I see. If an explicit token is provided, it always runs the new task in the system nursery, even if there is a parent task.

Yeah I view that both as consistency (passing trio_token always does the same thing) and as an optional switch (to get the old behavior where you don't abandon but still use a system task)

@agronholm
Copy link
Owner Author

I've made some progress on implementing the cancellation semantics for async tasks spawned from worker threads, but it broke some previous tests and I'm still sorting that out.

@agronholm
Copy link
Owner Author

Ok, this should have feature parity with Trio now. I have to admit that I don't fully understand why my latest fix worked 😅 but it does.

Copy link
Contributor

@richardsheridan richardsheridan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tests make a lot of sense, so that makes it easy to trust the implementation, even though that part is relatively confusing.

Comment on lines +2134 to +2135
except CancelledError as exc:
raise concurrent.futures.CancelledError(str(exc)) from None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this break the chain of any exceptions attached to the CancelledError? I guess that's probably rare.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the chain is preserved anyway, but I can check if you like.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and it doesn't seem to matter. There's a C level black box between that reraise and the calling synchronous code.

@agronholm agronholm merged commit 3186fb9 into master Nov 22, 2023
16 checks passed
@agronholm agronholm deleted the voluntary-thread-cancellation branch November 22, 2023 21:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants