Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In to_thread_run_sync(), add abandon_on_cancel= as an alias for the cancellable= flag #2841

Merged
merged 9 commits into from
Nov 2, 2023
6 changes: 3 additions & 3 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1827,16 +1827,16 @@ to spawn a child thread, and then use a :ref:`memory channel

The ``from_thread.run*`` functions reuse the host task that called
:func:`trio.to_thread.run_sync` to run your provided function, as long as you're
using the default ``cancellable=False`` so Trio can be sure that the task will remain
around to perform the work. If you pass ``cancellable=True`` at the outset, or if
using the default ``abandon_on_cancel=False`` so Trio can be sure that the task will remain
around to perform the work. If you pass ``abandon_on_cancel=True`` at the outset, or if
you provide a :class:`~trio.lowlevel.TrioToken` when calling back in to Trio, your
functions will be executed in a new system task. Therefore, the
:func:`~trio.lowlevel.current_task`, :func:`current_effective_deadline`, or other
task-tree specific values may differ depending on keyword argument values.

You can also use :func:`trio.from_thread.check_cancelled` to check for cancellation from
a thread that was spawned by :func:`trio.to_thread.run_sync`. If the call to
:func:`~trio.to_thread.run_sync` was cancelled (even if ``cancellable=False``!), then
:func:`~trio.to_thread.run_sync` was cancelled, then
:func:`~trio.from_thread.check_cancelled` will raise :func:`trio.Cancelled`.
It's like ``trio.from_thread.run(trio.sleep, 0)``, but much faster.

Expand Down
8 changes: 8 additions & 0 deletions newsfragments/2841.deprecated.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
To better reflect the underlying thread handling semantics,
the keyword argument for `trio.to_thread.run_sync` that was
previously called ``cancellable`` is now named ``abandon_on_cancel``.
It still does the same thing -- allow the thread to be abandoned
if the call to `trio.to_thread.run_sync` is cancelled -- but since we now
have other ways to propagate a cancellation without abandoning
the thread, "cancellable" has become somewhat of a misnomer.
The old ``cancellable`` name is now deprecated.
6 changes: 3 additions & 3 deletions trio/_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def numeric_only_failure(exc: BaseException) -> bool:
type,
proto,
flags,
cancellable=True,
abandon_on_cancel=True,
)


Expand All @@ -261,7 +261,7 @@ async def getnameinfo(
return await hr.getnameinfo(sockaddr, flags)
else:
return await trio.to_thread.run_sync(
_stdlib_socket.getnameinfo, sockaddr, flags, cancellable=True
_stdlib_socket.getnameinfo, sockaddr, flags, abandon_on_cancel=True
)


Expand All @@ -272,7 +272,7 @@ async def getprotobyname(name: str) -> int:

"""
return await trio.to_thread.run_sync(
_stdlib_socket.getprotobyname, name, cancellable=True
_stdlib_socket.getprotobyname, name, abandon_on_cancel=True
)


Expand Down
4 changes: 2 additions & 2 deletions trio/_subprocess_platform/waitid.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ async def _waitid_system_task(pid: int, event: Event) -> None:
"""Spawn a thread that waits for ``pid`` to exit, then wake any tasks
that were waiting on it.
"""
# cancellable=True: if this task is cancelled, then we abandon the
# abandon_on_cancel=True: if this task is cancelled, then we abandon the
# thread to keep running waitpid in the background. Since this is
# always run as a system task, this will only happen if the whole
# call to trio.run is shutting down.

try:
await to_thread_run_sync(
sync_wait_reapable, pid, cancellable=True, limiter=waitid_limiter
sync_wait_reapable, pid, abandon_on_cancel=True, limiter=waitid_limiter
)
except OSError:
# If waitid fails, waitpid will fail too, so it still makes
Expand Down
52 changes: 36 additions & 16 deletions trio/_tests/test_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
CancelScope,
CapacityLimiter,
Event,
TrioDeprecationWarning,
_core,
fail_after,
move_on_after,
Expand Down Expand Up @@ -337,10 +338,10 @@ def f(q: stdlib_queue.Queue[str]) -> None:
q.get()
register[0] = "finished"

async def child(q: stdlib_queue.Queue[None], cancellable: bool) -> None:
async def child(q: stdlib_queue.Queue[None], abandon_on_cancel: bool) -> None:
record.append("start")
try:
return await to_thread_run_sync(f, q, cancellable=cancellable)
return await to_thread_run_sync(f, q, abandon_on_cancel=abandon_on_cancel)
finally:
record.append("exit")

Expand Down Expand Up @@ -402,7 +403,7 @@ def thread_fn() -> None:

async def main() -> None:
async def child() -> None:
await to_thread_run_sync(thread_fn, cancellable=True)
await to_thread_run_sync(thread_fn, abandon_on_cancel=True)

async with _core.open_nursery() as nursery:
nursery.start_soon(child)
Expand Down Expand Up @@ -491,7 +492,10 @@ def thread_fn(cancel_scope: CancelScope) -> None:
async def run_thread(event: Event) -> None:
with _core.CancelScope() as cancel_scope:
await to_thread_run_sync(
thread_fn, cancel_scope, limiter=limiter_arg, cancellable=cancel
thread_fn,
cancel_scope,
abandon_on_cancel=cancel,
limiter=limiter_arg,
)
print("run_thread finished, cancelled:", cancel_scope.cancelled_caught)
event.set()
Expand Down Expand Up @@ -553,7 +557,7 @@ def release_on_behalf_of(self, borrower: Task) -> None:

# TODO: should CapacityLimiter have an abc or protocol so users can modify it?
# because currently it's `final` so writing code like this is not allowed.
await to_thread_run_sync(lambda: None, limiter=CustomLimiter()) # type: ignore[arg-type]
await to_thread_run_sync(lambda: None, limiter=CustomLimiter()) # type: ignore[call-overload]
assert record == ["acquire", "release"]


Expand All @@ -571,7 +575,7 @@ def release_on_behalf_of(self, borrower: Task) -> NoReturn:
bs = BadCapacityLimiter()

with pytest.raises(ValueError) as excinfo:
await to_thread_run_sync(lambda: None, limiter=bs) # type: ignore[arg-type]
await to_thread_run_sync(lambda: None, limiter=bs) # type: ignore[call-overload]
assert excinfo.value.__context__ is None
assert record == ["acquire", "release"]
record = []
Expand All @@ -580,7 +584,7 @@ def release_on_behalf_of(self, borrower: Task) -> NoReturn:
# chains with it
d: dict[str, object] = {}
with pytest.raises(ValueError) as excinfo:
await to_thread_run_sync(lambda: d["x"], limiter=bs) # type: ignore[arg-type]
await to_thread_run_sync(lambda: d["x"], limiter=bs) # type: ignore[call-overload]
assert isinstance(excinfo.value.__context__, KeyError)
assert record == ["acquire", "release"]

Expand Down Expand Up @@ -881,15 +885,15 @@ async def test_trio_token_weak_referenceable() -> None:
assert token is weak_reference()


async def test_unsafe_cancellable_kwarg() -> None:
async def test_unsafe_abandon_on_cancel_kwarg() -> None:
# This is a stand in for a numpy ndarray or other objects
# that (maybe surprisingly) lack a notion of truthiness
class BadBool:
def __bool__(self) -> bool:
raise NotImplementedError

with pytest.raises(NotImplementedError):
await to_thread_run_sync(int, cancellable=BadBool()) # type: ignore[arg-type]
await to_thread_run_sync(int, abandon_on_cancel=BadBool()) # type: ignore[call-overload]


async def test_from_thread_reuses_task() -> None:
Expand Down Expand Up @@ -933,7 +937,7 @@ def sync_check() -> None:
assert not queue.get_nowait()

with _core.CancelScope() as cancel_scope:
await to_thread_run_sync(sync_check, cancellable=True)
await to_thread_run_sync(sync_check, abandon_on_cancel=True)

assert cancel_scope.cancelled_caught
assert not await to_thread_run_sync(partial(queue.get, timeout=1))
Expand All @@ -957,7 +961,7 @@ def async_check() -> None:
assert not queue.get_nowait()

with _core.CancelScope() as cancel_scope:
await to_thread_run_sync(async_check, cancellable=True)
await to_thread_run_sync(async_check, abandon_on_cancel=True)

assert cancel_scope.cancelled_caught
assert not await to_thread_run_sync(partial(queue.get, timeout=1))
Expand All @@ -976,11 +980,11 @@ async def async_time_bomb() -> None:
async def test_from_thread_check_cancelled() -> None:
q: stdlib_queue.Queue[str] = stdlib_queue.Queue()

async def child(cancellable: bool, scope: CancelScope) -> None:
async def child(abandon_on_cancel: bool, scope: CancelScope) -> None:
with scope:
record.append("start")
try:
return await to_thread_run_sync(f, cancellable=cancellable)
return await to_thread_run_sync(f, abandon_on_cancel=abandon_on_cancel)
except _core.Cancelled:
record.append("cancel")
raise
Expand Down Expand Up @@ -1009,7 +1013,7 @@ def f() -> None:
# implicit assertion, Cancelled not raised via nursery
assert record[1] == "exit"

# cancellable=False case: a cancel will pop out but be handled by
# abandon_on_cancel=False case: a cancel will pop out but be handled by
# the appropriate cancel scope
record = []
ev = threading.Event()
Expand All @@ -1025,7 +1029,7 @@ def f() -> None:
assert "cancel" in record
assert record[-1] == "exit"

# cancellable=True case: slightly different thread behavior needed
# abandon_on_cancel=True case: slightly different thread behavior needed
# check thread is cancelled "soon" after abandonment
def f() -> None: # type: ignore[no-redef] # noqa: F811
ev.wait()
Expand Down Expand Up @@ -1068,9 +1072,25 @@ async def test_reentry_doesnt_deadlock() -> None:

async def child() -> None:
while True:
await to_thread_run_sync(from_thread_run, sleep, 0, cancellable=False)
await to_thread_run_sync(from_thread_run, sleep, 0, abandon_on_cancel=False)

with move_on_after(2):
async with _core.open_nursery() as nursery:
for _ in range(4):
nursery.start_soon(child)


async def test_cancellable_and_abandon_raises() -> None:
with pytest.raises(ValueError):
await to_thread_run_sync(bool, cancellable=True, abandon_on_cancel=False) # type: ignore[call-overload]

with pytest.raises(ValueError):
await to_thread_run_sync(bool, cancellable=True, abandon_on_cancel=True) # type: ignore[call-overload]


async def test_cancellable_warns() -> None:
with pytest.warns(TrioDeprecationWarning):
await to_thread_run_sync(bool, cancellable=False)

with pytest.warns(TrioDeprecationWarning):
await to_thread_run_sync(bool, cancellable=True)
61 changes: 49 additions & 12 deletions trio/_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
import threading
from collections.abc import Awaitable, Callable
from itertools import count
from typing import Generic, TypeVar
from typing import Generic, TypeVar, overload

import attr
import outcome
from sniffio import current_async_library_cvar

import trio
from trio._core._traps import RaiseCancelT

from ._core import (
RunVar,
Expand All @@ -24,6 +23,8 @@
enable_ki_protection,
start_thread_soon,
)
from ._core._traps import RaiseCancelT
from ._deprecate import warn_deprecated
from ._sync import CapacityLimiter
from ._util import coroutine_or_error

Expand Down Expand Up @@ -171,13 +172,36 @@ def run_in_system_nursery(self, token: TrioToken) -> None:
token.run_sync_soon(self.run_sync)


@enable_ki_protection # Decorator used on function with Coroutine[Any, Any, RetT]
@overload # Decorator used on function with Coroutine[Any, Any, RetT]
async def to_thread_run_sync( # type: ignore[misc]
sync_fn: Callable[..., RetT],
*args: object,
thread_name: str | None = None,
abandon_on_cancel: bool = False,
limiter: CapacityLimiter | None = None,
) -> RetT:
...


@overload # Decorator used on function with Coroutine[Any, Any, RetT]
async def to_thread_run_sync( # type: ignore[misc]
sync_fn: Callable[..., RetT],
*args: object,
thread_name: str | None = None,
cancellable: bool = False,
limiter: CapacityLimiter | None = None,
) -> RetT:
...


@enable_ki_protection # Decorator used on function with Coroutine[Any, Any, RetT]
async def to_thread_run_sync( # type: ignore[misc]
richardsheridan marked this conversation as resolved.
Show resolved Hide resolved
sync_fn: Callable[..., RetT],
*args: object,
thread_name: str | None = None,
abandon_on_cancel: bool | None = None,
cancellable: bool | None = None,
limiter: CapacityLimiter | None = None,
) -> RetT:
"""Convert a blocking operation into an async operation using a thread.

Expand All @@ -198,8 +222,8 @@ async def to_thread_run_sync( # type: ignore[misc]
sync_fn: An arbitrary synchronous callable.
*args: Positional arguments to pass to sync_fn. If you need keyword
arguments, use :func:`functools.partial`.
cancellable (bool): Whether to allow cancellation of this operation. See
discussion below.
abandon_on_cancel (bool): Whether to abandon this thread upon
cancellation of this operation. See discussion below.
thread_name (str): Optional string to set the name of the thread.
Will always set `threading.Thread.name`, but only set the os name
if pthread.h is available (i.e. most POSIX installations).
Expand All @@ -225,17 +249,17 @@ async def to_thread_run_sync( # type: ignore[misc]
starting the thread. But once the thread is running, there are two ways it
can handle being cancelled:

* If ``cancellable=False``, the function ignores the cancellation and
* If ``abandon_on_cancel=False``, the function ignores the cancellation and
keeps going, just like if we had called ``sync_fn`` synchronously. This
is the default behavior.

* If ``cancellable=True``, then this function immediately raises
* If ``abandon_on_cancel=True``, then this function immediately raises
`~trio.Cancelled`. In this case **the thread keeps running in
background** – we just abandon it to do whatever it's going to do, and
silently discard any return value or errors that it raises. Only use
this if you know that the operation is safe and side-effect free. (For
example: :func:`trio.socket.getaddrinfo` uses a thread with
``cancellable=True``, because it doesn't really affect anything if a
``abandon_on_cancel=True``, because it doesn't really affect anything if a
stray hostname lookup keeps running in the background.)

The ``limiter`` is only released after the thread has *actually*
Expand Down Expand Up @@ -263,7 +287,20 @@ async def to_thread_run_sync( # type: ignore[misc]

"""
await trio.lowlevel.checkpoint_if_cancelled()
abandon_on_cancel = bool(cancellable) # raise early if cancellable.__bool__ raises
if cancellable is not None:
if abandon_on_cancel is not None:
raise ValueError(
"Cannot set `cancellable` and `abandon_on_cancel` simultaneously."
)
warn_deprecated(
"The `cancellable=` keyword argument to `trio.to_thread.run_sync`",
"0.23.0",
issue=2841,
instead="`abandon_on_cancel=`",
)
abandon_on_cancel = cancellable
# raise early if abandon_on_cancel.__bool__ raises
abandon_on_cancel = bool(abandon_on_cancel)
if limiter is None:
limiter = current_default_thread_limiter()

Expand Down Expand Up @@ -381,14 +418,14 @@ def from_thread_check_cancelled() -> None:
"""Raise `trio.Cancelled` if the associated Trio task entered a cancelled status.

Only applicable to threads spawned by `trio.to_thread.run_sync`. Poll to allow
``cancellable=False`` threads to raise :exc:`~trio.Cancelled` at a suitable
place, or to end abandoned ``cancellable=True`` threads sooner than they may
``abandon_on_cancel=False`` threads to raise :exc:`~trio.Cancelled` at a suitable
place, or to end abandoned ``abandon_on_cancel=True`` threads sooner than they may
otherwise.

Raises:
Cancelled: If the corresponding call to `trio.to_thread.run_sync` has had a
delivery of cancellation attempted against it, regardless of the value of
``cancellable`` supplied as an argument to it.
``abandon_on_cancel`` supplied as an argument to it.
RuntimeError: If this thread is not spawned from `trio.to_thread.run_sync`.

.. note::
Expand Down
2 changes: 1 addition & 1 deletion trio/_wait_for_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def WaitForSingleObject(obj: int | CData) -> None:
WaitForMultipleObjects_sync,
handle,
cancel_handle,
cancellable=True,
abandon_on_cancel=True,
limiter=trio.CapacityLimiter(math.inf),
)
finally:
Expand Down
Loading