Skip to content

Commit

Permalink
Rename run_in_worker_thread -> run_sync_in_worker_thread
Browse files Browse the repository at this point in the history
Part of the changes for python-triogh-68

Also keeps run_in_worker_thread around as a deprecated alias.
  • Loading branch information
njsmith committed Aug 19, 2017
1 parent ebb7665 commit f71aba7
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 64 deletions.
14 changes: 7 additions & 7 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1387,8 +1387,8 @@ In acknowledgment of this reality, Trio provides two useful utilities
for working with real, operating-system level,
:mod:`threading`\-module-style threads. First, if you're in Trio but
need to push some blocking I/O into a thread, there's
:func:`run_in_worker_thread`. And if you're in a thread and need to
communicate back with trio, there's the closely related
:func:`run_sync_in_worker_thread`. And if you're in a thread and need
to communicate back with trio, there's the closely related
:func:`current_run_in_trio_thread` and
:func:`current_await_in_trio_thread`.

Expand All @@ -1409,7 +1409,7 @@ are spawned and the system gets overloaded and crashes. Instead, the N
threads start executing the first N jobs, while the other
(100,000 - N) jobs sit in a queue and wait their turn. Which is
generally what you want, and this is how
:func:`trio.run_in_worker_thread` works by default.
:func:`trio.run_sync_in_worker_thread` works by default.

The downside of this kind of thread pool is that sometimes, you need
more sophisticated logic for controlling how many threads are run at
Expand Down Expand Up @@ -1456,7 +1456,7 @@ re-using threads, but has no admission control policy: if you give it
responsible for providing the policy to make sure that this doesn't
happen – but since it *only* has to worry about policy, it can be much
simpler. In fact, all there is to it is the ``limiter=`` argument
passed to :func:`run_in_worker_thread`. This defaults to a global
passed to :func:`run_sync_in_worker_thread`. This defaults to a global
:class:`CapacityLimiter` object, which gives us the classic fixed-size
thread pool behavior. (See
:func:`current_default_worker_thread_limiter`.) But if you want to use
Expand Down Expand Up @@ -1510,15 +1510,15 @@ time::


async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs):
# *args belong to async_fn; **kwargs belong to run_in_worker_thread
# *args belong to async_fn; **kwargs belong to run_sync_in_worker_thread
kwargs["limiter"] = get_user_limiter(user_id)
return await trio.run_in_worker_thread(asycn_fn, *args, **kwargs)
return await trio.run_sync_in_worker_thread(asycn_fn, *args, **kwargs)


Putting blocking I/O into worker threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: run_in_worker_thread
.. autofunction:: run_sync_in_worker_thread

.. autofunction:: current_default_worker_thread_limiter

Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference-hazmat.rst
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ This logic is a bit convoluted, but accomplishes all of the following:

These functions can also be useful in other situations, e.g. if you're
going to call an uncancellable operation like
:func:`trio.run_in_worker_thread` or (potentially) overlapped I/O
:func:`trio.run_sync_in_worker_thread` or (potentially) overlapped I/O
operations on Windows, then you can call :func:`yield_if_cancelled`
first to make sure that the whole thing is a checkpoint.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ To understand why, you need to know two things.
First, right now no mainstream operating system offers a generic,
reliable, native API for async file for filesystem operations, so we
have to fake it by using threads (specifically,
:func:`run_in_worker_thread`). This is cheap but isn't free: on a
:func:`run_sync_in_worker_thread`). This is cheap but isn't free: on a
typical PC, dispatching to a worker thread adds something like ~100 µs
of overhead to each operation. ("µs" is pronounced "microseconds", and
there are 1,000,000 µs in a second. Note that all the numbers here are
Expand Down
2 changes: 1 addition & 1 deletion trio/_core/_traps.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def abort_func(raise_cancel):
At that point there are again two possibilities. You can simply ignore
the cancellation altogether: wait for the operation to complete and
then reschedule and continue as normal. (For example, this is what
:func:`trio.run_in_worker_thread` does if cancellation is disabled.)
:func:`trio.run_sync_in_worker_thread` does if cancellation is disabled.)
The other possibility is that the ``abort_func`` does succeed in
cancelling the operation, but for some reason isn't able to report that
right away. (Example: on Windows, it's possible to request that an
Expand Down
10 changes: 5 additions & 5 deletions trio/_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
class AsyncIOWrapper(AsyncResource):
"""A generic :class:`~io.IOBase` wrapper that implements the :term:`asynchronous
file object` interface. Wrapped methods that could block are executed in
:meth:`trio.run_in_worker_thread`.
:meth:`trio.run_sync_in_worker_thread`.
All properties and methods defined in in :mod:`~io` are exposed by this
wrapper, if they exist in the wrapped file object.
Expand All @@ -80,7 +80,7 @@ def __getattr__(self, name):
@async_wraps(self.__class__, self._wrapped.__class__, name)
async def wrapper(*args, **kwargs):
func = partial(meth, *args, **kwargs)
return await trio.run_in_worker_thread(func)
return await trio.run_sync_in_worker_thread(func)

# cache the generated method
setattr(self, name, wrapper)
Expand Down Expand Up @@ -115,7 +115,7 @@ async def detach(self):
"""

raw = await trio.run_in_worker_thread(self._wrapped.detach)
raw = await trio.run_sync_in_worker_thread(self._wrapped.detach)
return wrap_file(raw)

async def aclose(self):
Expand All @@ -128,7 +128,7 @@ async def aclose(self):

# ensure the underling file is closed during cancellation
with _core.open_cancel_scope(shield=True):
await trio.run_in_worker_thread(self._wrapped.close)
await trio.run_sync_in_worker_thread(self._wrapped.close)

await _core.yield_if_cancelled()

Expand Down Expand Up @@ -165,7 +165,7 @@ async def open_file(
file = file.__fspath__()

_file = wrap_file(
await trio.run_in_worker_thread(
await trio.run_sync_in_worker_thread(
io.open, file, mode, buffering, encoding, errors, newline, closefd,
opener
)
Expand Down
6 changes: 3 additions & 3 deletions trio/_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def wrapper(self, *args, **kwargs):
args = unwrap_paths(args)
meth = getattr(self._wrapped, meth_name)
func = partial(meth, *args, **kwargs)
value = await trio.run_in_worker_thread(func)
value = await trio.run_sync_in_worker_thread(func)
return rewrap_path(value)

return wrapper
Expand Down Expand Up @@ -112,7 +112,7 @@ def generate_magic(cls, attrs):

class Path(metaclass=AsyncAutoWrapperType):
"""A :class:`pathlib.Path` wrapper that executes blocking methods in
:meth:`trio.run_in_worker_thread`.
:meth:`trio.run_sync_in_worker_thread`.
"""

Expand Down Expand Up @@ -155,7 +155,7 @@ async def open(self, *args, **kwargs):
"""

func = partial(self._wrapped.open, *args, **kwargs)
value = await trio.run_in_worker_thread(func)
value = await trio.run_sync_in_worker_thread(func)
return trio.wrap_file(value)


Expand Down
8 changes: 4 additions & 4 deletions trio/_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from . import _core
from ._deprecate import deprecated
from ._threads import run_in_worker_thread as _run_in_worker_thread
from ._threads import run_sync_in_worker_thread

__all__ = []

Expand Down Expand Up @@ -242,7 +242,7 @@ def numeric_only_failure(exc):
if hr is not None:
return await hr.getaddrinfo(host, port, family, type, proto, flags)
else:
return await _run_in_worker_thread(
return await run_sync_in_worker_thread(
_stdlib_socket.getaddrinfo,
host,
port,
Expand All @@ -269,7 +269,7 @@ async def getnameinfo(sockaddr, flags):
if hr is not None:
return await hr.getnameinfo(sockaddr, flags)
else:
return await _run_in_worker_thread(
return await run_sync_in_worker_thread(
_stdlib_socket.getnameinfo, sockaddr, flags, cancellable=True
)

Expand All @@ -281,7 +281,7 @@ async def getprotobyname(name):
Like :func:`socket.getprotobyname`, but async.
"""
return await _run_in_worker_thread(
return await run_sync_in_worker_thread(
_stdlib_socket.getprotobyname, name, cancellable=True
)

Expand Down
12 changes: 6 additions & 6 deletions trio/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ class CapacityLimiter:
fixed number of seats, and if they're all taken then you have to wait for
someone to get up before you can sit down.
By default, :func:`run_in_worker_thread` uses a :class:`CapacityLimiter` to
limit the number of threads running at once; see
:func:`current_default_worker_thread_limiter` for details.
By default, :func:`run_sync_in_worker_thread` uses a
:class:`CapacityLimiter` to limit the number of threads running at once;
see :func:`current_default_worker_thread_limiter` for details.
If you're familiar with semaphores, then you can think of this as a
restricted semaphore that's specialized for one common use case, with
Expand Down Expand Up @@ -234,9 +234,9 @@ def acquire_on_behalf_of_nowait(self, borrower):
Args:
borrower: A :class:`Task` or arbitrary opaque object used to record
who is borrowing this token. This is used by
:func:`run_in_worker_thread` to allow threads to "hold tokens",
with the intention in the future of using it to `allow deadlock
detection and other useful things
:func:`run_sync_in_worker_thread` to allow threads to "hold
tokens", with the intention in the future of using it to `allow
deadlock detection and other useful things
<https://github.com/python-trio/trio/issues/182>`__
Raises:
Expand Down
53 changes: 31 additions & 22 deletions trio/_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

from . import _core
from ._sync import CapacityLimiter
from ._deprecate import deprecated_alias

__all__ = [
"current_await_in_trio_thread",
"current_run_in_trio_thread",
"run_in_worker_thread",
"run_sync_in_worker_thread",
"current_default_worker_thread_limiter",
"run_in_worker_thread",
]


Expand Down Expand Up @@ -171,7 +173,7 @@ def current_await_in_trio_thread():

def current_default_worker_thread_limiter():
"""Get the default :class:`CapacityLimiter` used by
:func:`run_in_worker_thread`.
:func:`run_sync_in_worker_thread`.
The most common reason to call this would be if you want to modify its
:attr:`~CapacityLimiter.total_tokens` attribute.
Expand All @@ -194,15 +196,15 @@ class ThreadPlaceholder:


@_core.enable_ki_protection
async def run_in_worker_thread(
async def run_sync_in_worker_thread(
sync_fn, *args, cancellable=False, limiter=None
):
"""Convert a blocking operation into an async operation using a thread.
These two lines are equivalent::
sync_fn(*args)
await run_in_worker_thread(sync_fn, *args)
await run_sync_in_worker_thread(sync_fn, *args)
except that if ``sync_fn`` takes a long time, then the first line will
block the Trio loop while it runs, while the second line allows other Trio
Expand All @@ -221,7 +223,7 @@ async def run_in_worker_thread(
anything providing compatible
:meth:`~trio.CapacityLimiter.acquire_on_behalf_of` and
:meth:`~trio.CapacityLimiter.release_on_behalf_of`
methods. :func:`run_in_worker_thread` will call
methods. :func:`run_sync_in_worker_thread` will call
``acquire_on_behalf_of`` before starting the thread, and
``release_on_behalf_of`` after the thread has finished.
Expand All @@ -231,39 +233,41 @@ async def run_in_worker_thread(
**Cancellation handling**: Cancellation is a tricky issue here, because
neither Python nor the operating systems it runs on provide any general
mechanism for cancelling an arbitrary synchronous function running in a
thread. :func:`run_in_worker_thread` will always check for cancellation on
entry, before starting the thread. But once the thread is running, there
are two ways it can handle being cancelled:
thread. :func:`run_sync_in_worker_thread` will always check for
cancellation on entry, before starting the thread. But once the thread is
running, there are two ways it can handle being cancelled:
* If ``cancellable=False``, the function ignores the cancellation and
keeps going, just like if we had called ``sync_fn`` synchronously. This
is the default behavior.
* If ``cancellable=True``, then ``run_in_worker_thread`` immediately
* If ``cancellable=True``, then ``run_sync_in_worker_thread`` immediately
raises :exc:`Cancelled`. In this case **the thread keeps running in
background** – we just abandon it to do whatever it's going to do, and
silently discard any return value or errors that it raises. Only use
this if you know that the operation is safe and side-effect free. (For
example: :func:`trio.socket.getaddrinfo` is implemented using
:func:`run_in_worker_thread`, and it sets ``cancellable=True`` because
it doesn't really affect anything if a stray hostname lookup keeps
running in the background.)
:func:`run_sync_in_worker_thread`, and it sets ``cancellable=True``
because it doesn't really affect anything if a stray hostname lookup
keeps running in the background.)
The ``limiter`` is only released after the thread has *actually*
finished – which in the case of cancellation may be some time after
:func:`run_in_worker_thread` has returned. (This is why it's crucial
that :func:`run_in_worker_thread` takes care of acquiring and releasing
the limiter.) If :func:`trio.run` finishes before the thread does, then
the limiter release method will never be called at all.
:func:`run_sync_in_worker_thread` has returned. (This is why it's
crucial that :func:`run_sync_in_worker_thread` takes care of acquiring
and releasing the limiter.) If :func:`trio.run` finishes before the
thread does, then the limiter release method will never be called at
all.
.. warning::
You should not use :func:`run_in_worker_thread` to call long-running
CPU-bound functions! In addition to the usual GIL-related reasons why
using threads for CPU-bound work is not very effective in Python, there
is an additional problem: on CPython, `CPU-bound threads tend to
"starve out" IO-bound threads <https://bugs.python.org/issue7946>`__,
so using :func:`run_in_worker_thread` for CPU-bound work is likely to
You should not use :func:`run_sync_in_worker_thread` to call
long-running CPU-bound functions! In addition to the usual GIL-related
reasons why using threads for CPU-bound work is not very effective in
Python, there is an additional problem: on CPython, `CPU-bound threads
tend to "starve out" IO-bound threads
<https://bugs.python.org/issue7946>`__, so using
:func:`run_sync_in_worker_thread` for CPU-bound work is likely to
adversely affect the main thread running trio. If you need to do this,
you're better off using a worker process, or perhaps PyPy (which still
has a GIL, but may do a better job of fairly allocating CPU time
Expand Down Expand Up @@ -336,3 +340,8 @@ def abort(_):
return _core.Abort.FAILED

return await _core.yield_indefinitely(abort)


run_in_worker_thread = deprecated_alias(
"run_in_worker_thread", run_sync_in_worker_thread, version="0.2.0"
)
2 changes: 1 addition & 1 deletion trio/tests/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def ssl_echo_server_raw(**kwargs):
# nursery context manager to exit too.
with a, b:
nursery.spawn(
trio.run_in_worker_thread,
trio.run_sync_in_worker_thread,
partial(ssl_echo_serve_sync, b, **kwargs)
)

Expand Down
Loading

0 comments on commit f71aba7

Please sign in to comment.