diff --git a/docs/source/_static/hackrtd.css b/docs/source/_static/hackrtd.css index 5f6c4b68c3..e75a889f69 100644 --- a/docs/source/_static/hackrtd.css +++ b/docs/source/_static/hackrtd.css @@ -5,6 +5,13 @@ pre { line-height: normal !important; } +/* Make .. deprecation:: blocks visible + * (by default they're entirely unstyled) + */ +.deprecated { + background-color: #ffe13b; +} + /* Add a snakey triskelion ornament to
* https://stackoverflow.com/questions/8862344/css-hr-with-ornament/18541258#18541258 * but only do it to
s in the content box, b/c the RTD popup control panel diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 36f5b14a72..317889a54d 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1387,8 +1387,8 @@ In acknowledgment of this reality, Trio provides two useful utilities for working with real, operating-system level, :mod:`threading`\-module-style threads. First, if you're in Trio but need to push some blocking I/O into a thread, there's -:func:`run_in_worker_thread`. And if you're in a thread and need to -communicate back with trio, there's the closely related +:func:`run_sync_in_worker_thread`. And if you're in a thread and need +to communicate back with trio, there's the closely related :func:`current_run_in_trio_thread` and :func:`current_await_in_trio_thread`. @@ -1409,7 +1409,7 @@ are spawned and the system gets overloaded and crashes. Instead, the N threads start executing the first N jobs, while the other (100,000 - N) jobs sit in a queue and wait their turn. Which is generally what you want, and this is how -:func:`trio.run_in_worker_thread` works by default. +:func:`trio.run_sync_in_worker_thread` works by default. The downside of this kind of thread pool is that sometimes, you need more sophisticated logic for controlling how many threads are run at @@ -1456,7 +1456,7 @@ re-using threads, but has no admission control policy: if you give it responsible for providing the policy to make sure that this doesn't happen – but since it *only* has to worry about policy, it can be much simpler. In fact, all there is to it is the ``limiter=`` argument -passed to :func:`run_in_worker_thread`. This defaults to a global +passed to :func:`run_sync_in_worker_thread`. This defaults to a global :class:`CapacityLimiter` object, which gives us the classic fixed-size thread pool behavior. (See :func:`current_default_worker_thread_limiter`.) But if you want to use @@ -1510,15 +1510,15 @@ time:: async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs): - # *args belong to async_fn; **kwargs belong to run_in_worker_thread + # *args belong to async_fn; **kwargs belong to run_sync_in_worker_thread kwargs["limiter"] = get_user_limiter(user_id) - return await trio.run_in_worker_thread(asycn_fn, *args, **kwargs) + return await trio.run_sync_in_worker_thread(asycn_fn, *args, **kwargs) Putting blocking I/O into worker threads ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. autofunction:: run_in_worker_thread +.. autofunction:: run_sync_in_worker_thread .. autofunction:: current_default_worker_thread_limiter @@ -1665,8 +1665,8 @@ The tutorial has a :ref:`fully-worked example trio's internal scheduling decisions. -Exceptions ----------- +Exceptions and warnings +----------------------- .. autoexception:: Cancelled @@ -1679,3 +1679,6 @@ Exceptions .. autoexception:: RunFinishedError .. autoexception:: TrioInternalError + +.. autoexception:: TrioDeprecationWarning + :show-inheritance: diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 9ca67e9fa0..9742e62f01 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -304,7 +304,7 @@ This logic is a bit convoluted, but accomplishes all of the following: These functions can also be useful in other situations, e.g. if you're going to call an uncancellable operation like -:func:`trio.run_in_worker_thread` or (potentially) overlapped I/O +:func:`trio.run_sync_in_worker_thread` or (potentially) overlapped I/O operations on Windows, then you can call :func:`yield_if_cancelled` first to make sure that the whole thing is a checkpoint. diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index cbe16f611d..f23997be9b 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -390,6 +390,9 @@ Socket objects .. method:: sendall(data, flags=0) :async: + .. deprecated:: 0.2.0 + Use :class:`trio.SocketStream` and its ``send_all`` method instead. + Send the data to the socket, blocking until all of it has been accepted by the operating system. @@ -492,7 +495,7 @@ To understand why, you need to know two things. First, right now no mainstream operating system offers a generic, reliable, native API for async file for filesystem operations, so we have to fake it by using threads (specifically, -:func:`run_in_worker_thread`). This is cheap but isn't free: on a +:func:`run_sync_in_worker_thread`). This is cheap but isn't free: on a typical PC, dispatching to a worker thread adds something like ~100 µs of overhead to each operation. ("µs" is pronounced "microseconds", and there are 1,000,000 µs in a second. Note that all the numbers here are diff --git a/trio/__init__.py b/trio/__init__.py index f51a0df24b..03e88bbaa5 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -64,6 +64,9 @@ from ._highlevel_ssl_helpers import * __all__ += _highlevel_ssl_helpers.__all__ +from ._deprecate import * +__all__ += _deprecate.__all__ + # Imported by default from . import socket from . import abc diff --git a/trio/_core/_traps.py b/trio/_core/_traps.py index 90dee343f9..d9b1802d3c 100644 --- a/trio/_core/_traps.py +++ b/trio/_core/_traps.py @@ -109,7 +109,7 @@ def abort_func(raise_cancel): At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what - :func:`trio.run_in_worker_thread` does if cancellation is disabled.) + :func:`trio.run_sync_in_worker_thread` does if cancellation is disabled.) The other possibility is that the ``abort_func`` does succeed in cancelling the operation, but for some reason isn't able to report that right away. (Example: on Windows, it's possible to request that an diff --git a/trio/_deprecate.py b/trio/_deprecate.py new file mode 100644 index 0000000000..987240c834 --- /dev/null +++ b/trio/_deprecate.py @@ -0,0 +1,73 @@ +from functools import wraps, partial +import warnings + +__all__ = ["TrioDeprecationWarning"] + + +# We want our warnings to be visible by default (at least for now), but we +# also want it to be possible to override that using the -W switch. AFAICT +# this means we cannot inherit from DeprecationWarning, because the only way +# to make it visible by default then would be to add our own filter at import +# time, but that would override -W switches... +class TrioDeprecationWarning(FutureWarning): + """Warning emitted if you use deprecated Trio functionality. + + As a young project, Trio is currently quite aggressive about deprecating + and/or removing functionality that we realize was a bad idea. If you use + Trio, you should subscribe to `issue #1 + `__ to get information about + upcoming deprecations and other backwards compatibility breaking changes. + + Despite the name, this class currently inherits from + :class:`FutureWarning`, not :class:`DeprecationWarning`, because while + we're in young-and-aggressive mode we want these warnings to be visible by + default. You can hide them by installing a filter or with the ``-W`` + switch: see the :mod:`warnings` documentation for details. + + """ + + +def _stringify(thing): + if hasattr(thing, "__module__") and hasattr(thing, "__qualname__"): + return "{}.{}".format(thing.__module__, thing.__qualname__) + return str(thing) + + +def warn_deprecated(thing, *, version, alternative, stacklevel=2): + stacklevel += 1 + msg = "{} is deprecated since Trio {}".format(_stringify(thing), version) + if alternative is not None: + msg = "{}; use {} instead".format(msg, _stringify(alternative)) + warnings.warn(TrioDeprecationWarning(msg), stacklevel=stacklevel) + + +# deprecated(version=..., alternative=...) +def deprecated(*, thing=None, version, alternative): + def do_wrap(fn): + nonlocal thing + + @wraps(fn) + def wrapper(*args, **kwargs): + warn_deprecated(thing, version=version, alternative=alternative) + return fn(*args, **kwargs) + + # If our __module__ or __qualname__ get modified, we want to pick up + # on that, so we read them off the wrapper object instead of the (now + # hidden) fn object + if thing is None: + thing = wrapper + + return wrapper + + return do_wrap + + +def deprecated_alias(old_qualname, new_fn, *, version): + @deprecated(version=version, alternative=new_fn) + @wraps(new_fn) + def wrapper(*args, **kwargs): + return new_fn(*args, **kwargs) + + wrapper.__qualname__ = old_qualname + wrapper.__name__ = old_qualname.rpartition(".")[-1] + return wrapper diff --git a/trio/_file_io.py b/trio/_file_io.py index 150cff19ac..a086c864b0 100644 --- a/trio/_file_io.py +++ b/trio/_file_io.py @@ -53,7 +53,7 @@ class AsyncIOWrapper(AsyncResource): """A generic :class:`~io.IOBase` wrapper that implements the :term:`asynchronous file object` interface. Wrapped methods that could block are executed in - :meth:`trio.run_in_worker_thread`. + :meth:`trio.run_sync_in_worker_thread`. All properties and methods defined in in :mod:`~io` are exposed by this wrapper, if they exist in the wrapped file object. @@ -80,7 +80,7 @@ def __getattr__(self, name): @async_wraps(self.__class__, self._wrapped.__class__, name) async def wrapper(*args, **kwargs): func = partial(meth, *args, **kwargs) - return await trio.run_in_worker_thread(func) + return await trio.run_sync_in_worker_thread(func) # cache the generated method setattr(self, name, wrapper) @@ -115,7 +115,7 @@ async def detach(self): """ - raw = await trio.run_in_worker_thread(self._wrapped.detach) + raw = await trio.run_sync_in_worker_thread(self._wrapped.detach) return wrap_file(raw) async def aclose(self): @@ -128,7 +128,7 @@ async def aclose(self): # ensure the underling file is closed during cancellation with _core.open_cancel_scope(shield=True): - await trio.run_in_worker_thread(self._wrapped.close) + await trio.run_sync_in_worker_thread(self._wrapped.close) await _core.yield_if_cancelled() @@ -165,7 +165,7 @@ async def open_file( file = file.__fspath__() _file = wrap_file( - await trio.run_in_worker_thread( + await trio.run_sync_in_worker_thread( io.open, file, mode, buffering, encoding, errors, newline, closefd, opener ) diff --git a/trio/_highlevel_socket.py b/trio/_highlevel_socket.py index 28c8a7c08a..bce6f080b6 100644 --- a/trio/_highlevel_socket.py +++ b/trio/_highlevel_socket.py @@ -106,7 +106,7 @@ async def send_all(self, data): raise ClosedStreamError("can't send data after sending EOF") with self._send_conflict_detector.sync: with _translate_socket_errors_to_stream_errors(): - await self.socket.sendall(data) + await self.socket._sendall(data) async def wait_send_all_might_not_block(self): async with self._send_conflict_detector: diff --git a/trio/_path.py b/trio/_path.py index 0e9cfd7ed3..8a0a9ecc97 100644 --- a/trio/_path.py +++ b/trio/_path.py @@ -59,7 +59,7 @@ async def wrapper(self, *args, **kwargs): args = unwrap_paths(args) meth = getattr(self._wrapped, meth_name) func = partial(meth, *args, **kwargs) - value = await trio.run_in_worker_thread(func) + value = await trio.run_sync_in_worker_thread(func) return rewrap_path(value) return wrapper @@ -112,7 +112,7 @@ def generate_magic(cls, attrs): class Path(metaclass=AsyncAutoWrapperType): """A :class:`pathlib.Path` wrapper that executes blocking methods in - :meth:`trio.run_in_worker_thread`. + :meth:`trio.run_sync_in_worker_thread`. """ @@ -155,7 +155,7 @@ async def open(self, *args, **kwargs): """ func = partial(self._wrapped.open, *args, **kwargs) - value = await trio.run_in_worker_thread(func) + value = await trio.run_sync_in_worker_thread(func) return trio.wrap_file(value) diff --git a/trio/_socket.py b/trio/_socket.py index d7e3b7acf0..31703f1ce1 100644 --- a/trio/_socket.py +++ b/trio/_socket.py @@ -8,7 +8,8 @@ import idna as _idna from . import _core -from ._threads import run_in_worker_thread as _run_in_worker_thread +from ._deprecate import deprecated +from ._threads import run_sync_in_worker_thread __all__ = [] @@ -241,7 +242,7 @@ def numeric_only_failure(exc): if hr is not None: return await hr.getaddrinfo(host, port, family, type, proto, flags) else: - return await _run_in_worker_thread( + return await run_sync_in_worker_thread( _stdlib_socket.getaddrinfo, host, port, @@ -268,7 +269,7 @@ async def getnameinfo(sockaddr, flags): if hr is not None: return await hr.getnameinfo(sockaddr, flags) else: - return await _run_in_worker_thread( + return await run_sync_in_worker_thread( _stdlib_socket.getnameinfo, sockaddr, flags, cancellable=True ) @@ -280,7 +281,7 @@ async def getprotobyname(name): Like :func:`socket.getprotobyname`, but async. """ - return await _run_in_worker_thread( + return await run_sync_in_worker_thread( _stdlib_socket.getprotobyname, name, cancellable=True ) @@ -830,7 +831,9 @@ async def sendmsg(self, *args): # sendall ################################################################ - async def sendall(self, data, flags=0): + # XX: When we remove sendall(), we should move this code (and its test) + # into SocketStream.send_all(). + async def _sendall(self, data, flags=0): with memoryview(data) as data: if not data: await _core.yield_briefly() @@ -841,6 +844,12 @@ async def sendall(self, data, flags=0): sent = await self.send(remaining, flags) total_sent += sent + @deprecated( + version="0.2.0", alternative="the high-level SocketStream interface" + ) + async def sendall(self, data, flags=0): + return await self._sendall(data, flags) + ################################################################ # sendfile ################################################################ diff --git a/trio/_sync.py b/trio/_sync.py index f28124b53e..7b1b467f7c 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -133,9 +133,9 @@ class CapacityLimiter: fixed number of seats, and if they're all taken then you have to wait for someone to get up before you can sit down. - By default, :func:`run_in_worker_thread` uses a :class:`CapacityLimiter` to - limit the number of threads running at once; see - :func:`current_default_worker_thread_limiter` for details. + By default, :func:`run_sync_in_worker_thread` uses a + :class:`CapacityLimiter` to limit the number of threads running at once; + see :func:`current_default_worker_thread_limiter` for details. If you're familiar with semaphores, then you can think of this as a restricted semaphore that's specialized for one common use case, with @@ -234,9 +234,9 @@ def acquire_on_behalf_of_nowait(self, borrower): Args: borrower: A :class:`Task` or arbitrary opaque object used to record who is borrowing this token. This is used by - :func:`run_in_worker_thread` to allow threads to "hold tokens", - with the intention in the future of using it to `allow deadlock - detection and other useful things + :func:`run_sync_in_worker_thread` to allow threads to "hold + tokens", with the intention in the future of using it to `allow + deadlock detection and other useful things `__ Raises: diff --git a/trio/_threads.py b/trio/_threads.py index 4aea263d41..3b5dab60a5 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -6,12 +6,14 @@ from . import _core from ._sync import CapacityLimiter +from ._deprecate import deprecated_alias __all__ = [ "current_await_in_trio_thread", "current_run_in_trio_thread", - "run_in_worker_thread", + "run_sync_in_worker_thread", "current_default_worker_thread_limiter", + "run_in_worker_thread", ] @@ -171,7 +173,7 @@ def current_await_in_trio_thread(): def current_default_worker_thread_limiter(): """Get the default :class:`CapacityLimiter` used by - :func:`run_in_worker_thread`. + :func:`run_sync_in_worker_thread`. The most common reason to call this would be if you want to modify its :attr:`~CapacityLimiter.total_tokens` attribute. @@ -194,7 +196,7 @@ class ThreadPlaceholder: @_core.enable_ki_protection -async def run_in_worker_thread( +async def run_sync_in_worker_thread( sync_fn, *args, cancellable=False, limiter=None ): """Convert a blocking operation into an async operation using a thread. @@ -202,7 +204,7 @@ async def run_in_worker_thread( These two lines are equivalent:: sync_fn(*args) - await run_in_worker_thread(sync_fn, *args) + await run_sync_in_worker_thread(sync_fn, *args) except that if ``sync_fn`` takes a long time, then the first line will block the Trio loop while it runs, while the second line allows other Trio @@ -221,7 +223,7 @@ async def run_in_worker_thread( anything providing compatible :meth:`~trio.CapacityLimiter.acquire_on_behalf_of` and :meth:`~trio.CapacityLimiter.release_on_behalf_of` - methods. :func:`run_in_worker_thread` will call + methods. :func:`run_sync_in_worker_thread` will call ``acquire_on_behalf_of`` before starting the thread, and ``release_on_behalf_of`` after the thread has finished. @@ -231,39 +233,41 @@ async def run_in_worker_thread( **Cancellation handling**: Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a - thread. :func:`run_in_worker_thread` will always check for cancellation on - entry, before starting the thread. But once the thread is running, there - are two ways it can handle being cancelled: + thread. :func:`run_sync_in_worker_thread` will always check for + cancellation on entry, before starting the thread. But once the thread is + running, there are two ways it can handle being cancelled: * If ``cancellable=False``, the function ignores the cancellation and keeps going, just like if we had called ``sync_fn`` synchronously. This is the default behavior. - * If ``cancellable=True``, then ``run_in_worker_thread`` immediately + * If ``cancellable=True``, then ``run_sync_in_worker_thread`` immediately raises :exc:`Cancelled`. In this case **the thread keeps running in background** – we just abandon it to do whatever it's going to do, and silently discard any return value or errors that it raises. Only use this if you know that the operation is safe and side-effect free. (For example: :func:`trio.socket.getaddrinfo` is implemented using - :func:`run_in_worker_thread`, and it sets ``cancellable=True`` because - it doesn't really affect anything if a stray hostname lookup keeps - running in the background.) + :func:`run_sync_in_worker_thread`, and it sets ``cancellable=True`` + because it doesn't really affect anything if a stray hostname lookup + keeps running in the background.) The ``limiter`` is only released after the thread has *actually* finished – which in the case of cancellation may be some time after - :func:`run_in_worker_thread` has returned. (This is why it's crucial - that :func:`run_in_worker_thread` takes care of acquiring and releasing - the limiter.) If :func:`trio.run` finishes before the thread does, then - the limiter release method will never be called at all. + :func:`run_sync_in_worker_thread` has returned. (This is why it's + crucial that :func:`run_sync_in_worker_thread` takes care of acquiring + and releasing the limiter.) If :func:`trio.run` finishes before the + thread does, then the limiter release method will never be called at + all. .. warning:: - You should not use :func:`run_in_worker_thread` to call long-running - CPU-bound functions! In addition to the usual GIL-related reasons why - using threads for CPU-bound work is not very effective in Python, there - is an additional problem: on CPython, `CPU-bound threads tend to - "starve out" IO-bound threads `__, - so using :func:`run_in_worker_thread` for CPU-bound work is likely to + You should not use :func:`run_sync_in_worker_thread` to call + long-running CPU-bound functions! In addition to the usual GIL-related + reasons why using threads for CPU-bound work is not very effective in + Python, there is an additional problem: on CPython, `CPU-bound threads + tend to "starve out" IO-bound threads + `__, so using + :func:`run_sync_in_worker_thread` for CPU-bound work is likely to adversely affect the main thread running trio. If you need to do this, you're better off using a worker process, or perhaps PyPy (which still has a GIL, but may do a better job of fairly allocating CPU time @@ -336,3 +340,8 @@ def abort(_): return _core.Abort.FAILED return await _core.yield_indefinitely(abort) + + +run_in_worker_thread = deprecated_alias( + "run_in_worker_thread", run_sync_in_worker_thread, version="0.2.0" +) diff --git a/trio/tests/test_deprecate.py b/trio/tests/test_deprecate.py new file mode 100644 index 0000000000..b09160c488 --- /dev/null +++ b/trio/tests/test_deprecate.py @@ -0,0 +1,140 @@ +import pytest + +import inspect +import warnings + +from .._deprecate import TrioDeprecationWarning, warn_deprecated, deprecated, deprecated_alias + + +@pytest.fixture +def recwarn_always(recwarn): + warnings.simplefilter("always") + return recwarn + + +def _here(): + info = inspect.getframeinfo(inspect.currentframe().f_back) + return (info.filename, info.lineno) + + +def test_warn_deprecated(recwarn_always): + def deprecated_thing(): + warn_deprecated("ice", version="1.2", alternative="water") + + filename, lineno = _here() # https://github.com/google/yapf/issues/447 + deprecated_thing() + assert len(recwarn_always) == 1 + got = recwarn_always.pop(TrioDeprecationWarning) + assert "ice is deprecated" in got.message.args[0] + assert "Trio 1.2" in got.message.args[0] + assert "water instead" in got.message.args[0] + assert got.filename == filename + assert got.lineno == lineno + 1 + + +def test_warn_deprecated_no_alternative(recwarn_always): + # Explicitly no alternative + warn_deprecated("water", version="1.3", alternative=None) + assert len(recwarn_always) == 1 + got = recwarn_always.pop(TrioDeprecationWarning) + assert "water is deprecated" in got.message.args[0] + assert "Trio 1.3" in got.message.args[0] + + +def test_warn_deprecated_stacklevel(recwarn_always): + def nested1(): + nested2() + + def nested2(): + warn_deprecated("x", version="1.3", alternative="y", stacklevel=3) + + filename, lineno = _here() # https://github.com/google/yapf/issues/447 + nested1() + got = recwarn_always.pop(TrioDeprecationWarning) + assert got.filename == filename + assert got.lineno == lineno + 1 + + +def old(): # pragma: no cover + pass + + +def new(): # pragma: no cover + pass + + +def test_warn_deprecated_formatting(recwarn_always): + warn_deprecated(old, version="1.0", alternative=new) + got = recwarn_always.pop(TrioDeprecationWarning) + assert "test_deprecate.old is deprecated" in got.message.args[0] + assert "test_deprecate.new instead" in got.message.args[0] + + +@deprecated(version="1.5", alternative=new) +def deprecated_old(): + return 3 + + +def test_deprecated_decorator(recwarn_always): + assert deprecated_old() == 3 + got = recwarn_always.pop(TrioDeprecationWarning) + assert "test_deprecate.deprecated_old is deprecated" in got.message.args[0] + assert "1.5" in got.message.args[0] + assert "test_deprecate.new" in got.message.args[0] + + +class Foo: + @deprecated(version="1.0", alternative="crying") + def method(self): + return 7 + + +def test_deprecated_decorator_method(recwarn_always): + f = Foo() + assert f.method() == 7 + got = recwarn_always.pop(TrioDeprecationWarning) + assert "test_deprecate.Foo.method is deprecated" in got.message.args[0] + + +@deprecated(thing="you know, the thing", version=1.2, alternative=None) +def deprecated_with_thing(): + return 72 + + +def test_deprecated_decorator_with_explicit_thing(recwarn_always): + assert deprecated_with_thing() == 72 + got = recwarn_always.pop(TrioDeprecationWarning) + assert "you know, the thing is deprecated" in got.message.args[0] + + +def new_hotness(): + return "new hotness" + + +old_hotness = deprecated_alias("old_hotness", new_hotness, version="1.23") + + +def test_deprecated_alias(recwarn_always): + assert old_hotness() == "new hotness" + got = recwarn_always.pop(TrioDeprecationWarning) + assert "test_deprecate.old_hotness is deprecated" in got.message.args[0] + assert "1.23" in got.message.args[0] + assert "test_deprecate.new_hotness instead" in got.message.args[0] + + +class Alias: + def new_hotness_method(self): + return "new hotness method" + + old_hotness_method = deprecated_alias( + "Alias.old_hotness_method", new_hotness_method, version="3.21" + ) + + +def test_deprecated_alias_method(recwarn_always): + obj = Alias() + assert obj.old_hotness_method() == "new hotness method" + got = recwarn_always.pop(TrioDeprecationWarning) + msg = got.message.args[0] + assert "test_deprecate.Alias.old_hotness_method is deprecated" in msg + assert "test_deprecate.Alias.new_hotness_method instead" in msg diff --git a/trio/tests/test_socket.py b/trio/tests/test_socket.py index 03d4e6d330..2656251b26 100644 --- a/trio/tests/test_socket.py +++ b/trio/tests/test_socket.py @@ -196,8 +196,8 @@ async def test_from_stdlib_socket(): ta = tsocket.from_stdlib_socket(sa) assert tsocket.is_trio_socket(ta) assert sa.fileno() == ta.fileno() - await ta.sendall(b"xxx") - assert sb.recv(3) == b"xxx" + await ta.send(b"x") + assert sb.recv(1) == b"x" # rejects other types with pytest.raises(TypeError): @@ -216,27 +216,21 @@ async def test_from_fd(): ta = tsocket.fromfd(sa.fileno(), sa.family, sa.type, sa.proto) with sa, sb, ta: assert ta.fileno() != sa.fileno() - await ta.sendall(b"xxx") - assert sb.recv(3) == b"xxx" + await ta.send(b"x") + assert sb.recv(3) == b"x" async def test_socketpair_simple(): async def child(sock): print("sending hello") - await sock.sendall(b"hello!") - buf = bytearray() - while buf != b"hello!": - print("reading", buf) - buf += await sock.recv(10) - return "ok" + await sock.send(b"h") + assert await sock.recv(1) == b"h" a, b = tsocket.socketpair() with a, b: async with _core.open_nursery() as nursery: - task1 = nursery.spawn(child, a) - task2 = nursery.spawn(child, b) - assert task1.result.unwrap() == "ok" - assert task2.result.unwrap() == "ok" + nursery.spawn(child, a) + nursery.spawn(child, b) @pytest.mark.skipif(not hasattr(tsocket, "fromshare"), reason="windows only") @@ -248,8 +242,8 @@ async def test_fromshare(): a2 = tsocket.fromshare(shared) with a2: assert a.fileno() != a2.fileno() - await a2.sendall(b"xxx") - assert await b.recv(3) == b"xxx" + await a2.send(b"x") + assert await b.recv(1) == b"x" async def test_socket(): @@ -328,23 +322,23 @@ async def test_SocketType_dup(): assert isinstance(a2, tsocket._SocketType) assert a2.fileno() != a.fileno() a.close() - await a2.sendall(b"xxx") - assert await b.recv(3) == b"xxx" + await a2.send(b"x") + assert await b.recv(1) == b"x" async def test_SocketType_shutdown(): a, b = tsocket.socketpair() with a, b: - await a.sendall(b"xxx") - assert await b.recv(3) == b"xxx" + await a.send(b"x") + assert await b.recv(1) == b"x" assert not a.did_shutdown_SHUT_WR assert not b.did_shutdown_SHUT_WR a.shutdown(tsocket.SHUT_WR) assert a.did_shutdown_SHUT_WR assert not b.did_shutdown_SHUT_WR - assert await b.recv(3) == b"" - await b.sendall(b"yyy") - assert await a.recv(3) == b"yyy" + assert await b.recv(1) == b"" + await b.send(b"y") + assert await a.recv(1) == b"y" a, b = tsocket.socketpair() with a, b: @@ -378,8 +372,8 @@ async def test_SocketType_simple_server(address, socket_type): server, client_addr = accept_task.result.unwrap() with server: assert client_addr == server.getpeername() == client.getsockname() - await server.sendall(b"xxx") - assert await client.recv(3) == b"xxx" + await server.send(b"x") + assert await client.recv(1) == b"x" async def test_SocketType_resolve(): @@ -610,15 +604,15 @@ async def test_send_recv_variants(): a, b = tsocket.socketpair() with a, b: # recv, including with flags - await a.sendall(b"xxx") - assert await b.recv(10, tsocket.MSG_PEEK) == b"xxx" - assert await b.recv(10) == b"xxx" + assert await a.send(b"x") == 1 + assert await b.recv(10, tsocket.MSG_PEEK) == b"x" + assert await b.recv(10) == b"x" # recv_into - await a.sendall(b"xxx") + await a.send(b"x") buf = bytearray(10) await b.recv_into(buf) - assert buf == b"xxx" + b"\x00" * 7 + assert buf == b"x" + b"\x00" * 9 if hasattr(a, "sendmsg"): assert await a.sendmsg([b"xxx"], []) == 3 @@ -689,15 +683,19 @@ async def test_send_recv_variants(): with a, b: b.bind(("127.0.0.1", 0)) await a.connect(b.getsockname()) - # sendall on a connected udp socket; each call creates a separate + # send on a connected udp socket; each call creates a separate # datagram - await a.sendall(b"xxx") - await a.sendall(b"yyy") + await a.send(b"xxx") + await a.send(b"yyy") assert await b.recv(10) == b"xxx" assert await b.recv(10) == b"yyy" -async def test_SocketType_sendall(): +# XX: when we remove sendall(), then this test should be: +# - moved to the SocketStream tests +# - have the recwarn fixture removed (currently used to suppress the +# deprecation warnings that it's issuing) +async def test_SocketType_sendall(recwarn): BIG = 10000000 a, b = tsocket.socketpair() diff --git a/trio/tests/test_ssl.py b/trio/tests/test_ssl.py index 84509d21f1..cce863e0fc 100644 --- a/trio/tests/test_ssl.py +++ b/trio/tests/test_ssl.py @@ -123,7 +123,7 @@ async def ssl_echo_server_raw(**kwargs): # nursery context manager to exit too. with a, b: nursery.spawn( - trio.run_in_worker_thread, + trio.run_sync_in_worker_thread, partial(ssl_echo_serve_sync, b, **kwargs) ) diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index 2070f41fdd..aeb8e88c22 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -172,7 +172,7 @@ async def test_run_in_worker_thread(): def f(x): return (x, threading.current_thread()) - x, child_thread = await run_in_worker_thread(f, 1) + x, child_thread = await run_sync_in_worker_thread(f, 1) assert x == 1 assert child_thread != trio_thread @@ -180,7 +180,7 @@ def g(): raise ValueError(threading.current_thread()) with pytest.raises(ValueError) as excinfo: - await run_in_worker_thread(g) + await run_sync_in_worker_thread(g) print(excinfo.value.args) assert excinfo.value.args[0] != trio_thread @@ -195,14 +195,14 @@ def f(q): register[0] = "finished" async def child(q, cancellable): - return await run_in_worker_thread(f, q, cancellable=cancellable) + return await run_sync_in_worker_thread(f, q, cancellable=cancellable) q = stdlib_queue.Queue() async with _core.open_nursery() as nursery: task1 = nursery.spawn(child, q, True) # Give it a chance to get started. (This is important because - # run_in_worker_thread does a yield_if_cancelled before blocking on - # the thread, and we don't want to trigger this.) + # run_sync_in_worker_thread does a yield_if_cancelled before blocking + # on the thread, and we don't want to trigger this.) await wait_all_tasks_blocked() # Then cancel it. nursery.cancel_scope.cancel() @@ -248,7 +248,7 @@ def thread_fn(): async def main(): async def child(): - await run_in_worker_thread(thread_fn, cancellable=True) + await run_sync_in_worker_thread(thread_fn, cancellable=True) async with _core.open_nursery() as nursery: t = nursery.spawn(child) @@ -277,8 +277,8 @@ async def test_run_in_worker_thread_limiter(MAX, cancel, use_default_limiter): # This test is a bit tricky. The goal is to make sure that if we set # limiter=CapacityLimiter(MAX), then in fact only MAX threads are ever # running at a time, even if there are more concurrent calls to - # run_in_worker_thread, and even if some of those are cancelled. And also - # to make sure that the default limiter actually limits. + # run_sync_in_worker_thread, and even if some of those are cancelled. And + # also to make sure that the default limiter actually limits. COUNT = 2 * MAX gate = threading.Event() lock = threading.Lock() @@ -330,7 +330,7 @@ def thread_fn(cancel_scope): async def run_thread(): with _core.open_cancel_scope() as cancel_scope: - await run_in_worker_thread( + await run_sync_in_worker_thread( thread_fn, cancel_scope, limiter=limiter_arg, @@ -395,7 +395,7 @@ def release_on_behalf_of(self, borrower): record.append("release") assert borrower == self._borrower - await run_in_worker_thread(lambda: None, limiter=CustomLimiter()) + await run_sync_in_worker_thread(lambda: None, limiter=CustomLimiter()) assert record == ["acquire", "release"] @@ -413,7 +413,7 @@ def release_on_behalf_of(self, borrower): bs = BadCapacityLimiter() with pytest.raises(ValueError) as excinfo: - await run_in_worker_thread(lambda: None, limiter=bs) + await run_sync_in_worker_thread(lambda: None, limiter=bs) assert excinfo.value.__context__ is None assert record == ["acquire", "release"] record = [] @@ -422,7 +422,7 @@ def release_on_behalf_of(self, borrower): # chains with it d = {} with pytest.raises(ValueError) as excinfo: - await run_in_worker_thread(lambda: d["x"], limiter=bs) + await run_sync_in_worker_thread(lambda: d["x"], limiter=bs) assert isinstance(excinfo.value.__context__, KeyError) assert record == ["acquire", "release"] @@ -439,7 +439,7 @@ def bad_start(self): # We get an appropriate error, and the limiter is cleanly released with pytest.raises(RuntimeError) as excinfo: - await run_in_worker_thread(lambda: None) # pragma: no cover + await run_sync_in_worker_thread(lambda: None) # pragma: no cover assert "engines" in str(excinfo.value) assert limiter.borrowed_tokens == 0