diff --git a/docs/source/design.rst b/docs/source/design.rst index 81d26928ee..256350d601 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -417,9 +417,9 @@ Specific style guidelines and the ``nowait`` version raises :exc:`trio.WouldBlock` if it would block. * The word ``monitor`` is used for APIs that involve an - :class:`UnboundedQueue` receiving some kind of events. (Examples: - nursery ``.monitor`` attribute, some of the low-level I/O functions in - :mod:`trio.hazmat`.) + :class:`trio.hazmat.UnboundedQueue` receiving some kind of events. + (Examples: nursery ``.monitor`` attribute, some of the low-level I/O + functions in :mod:`trio.hazmat`.) * ...we should, but currently don't, have a solid convention to distinguish between functions that take an async callable and those diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index eed220ec68..687b2d37c5 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1218,13 +1218,15 @@ Broadcasting an event with :class:`Event` :members: -Passing messages with :class:`Queue` and :class:`UnboundedQueue` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +.. _queue: -Trio provides two types of queues suitable for different -purposes. Where they differ is in their strategies for handling flow -control. Here's a toy example to demonstrate the problem. Suppose we -have a queue with two producers and one consumer:: +Passing messages with :class:`Queue` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You can use :class:`Queue` objects to safely pass objects between +tasks. Trio :class:`Queue` objects always have a bounded size. Here's +a toy example to demonstrate why this is important. Suppose we have a +queue with two producers and one consumer:: async def producer(queue): while True: @@ -1235,9 +1237,9 @@ have a queue with two producers and one consumer:: print(await queue.get()) async def main(): - # Trio's actual queue classes have countermeasures to prevent - # this example from working, so imagine we have some sort of - # platonic ideal of a queue here + # This example won't work with Trio's actual Queue class, so + # imagine we have some sort of platonic ideal of an unbounded + # queue here: queue = trio.HypotheticalQueue() async with trio.open_nursery() as nursery: # Two producers @@ -1255,41 +1257,14 @@ we add two items to the queue but only remove one, then over time the queue size grows arbitrarily large, our latency is terrible, we run out of memory, it's just generally bad news all around. -There are two potential strategies for avoiding this problem. - -The preferred solution is to apply *backpressure*. If our queue starts -getting too big, then we can make the producers slow down by having -``put`` block until ``get`` has had a chance to remove an item. This -is the strategy used by :class:`trio.Queue`. - -The other possibility is for the queue consumer to get greedy: each -time it runs, it could eagerly consume all of the pending items before -allowing another task to run. (In some other systems, this would -happen automatically because their queue's ``get`` method doesn't -invoke the scheduler unless it has to block. But :ref:`in trio, get is -always a checkpoint `.) This would work, but it's a -bit risky: basically instead of applying backpressure to specifically -the producer tasks, we're applying it to *all* the tasks in our -system. The danger here is that if enough items have built up in the -queue, then "stopping the world" to process them all may cause -unacceptable latency spikes in unrelated tasks. Nonetheless, this is -still the right choice in situations where it's impossible to apply -backpressure more precisely. For example, when monitoring exiting -tasks, blocking tasks from reporting their death doesn't really -accomplish anything – the tasks are taking up memory either way, -etc. (In this particular case it `might be possible to do better -`__, but in general the -principle holds.) So this is the strategy implemented by -:class:`trio.UnboundedQueue`. - -tl;dr: use :class:`Queue` if you can. +By placing an upper bound on our queue's size, we avoid this problem. +If the queue gets too big, then it applies *backpressure*: ``put`` +blocks and forces the producers to slow down and wait until the +consumer calls ``get``. .. autoclass:: Queue :members: -.. autoclass:: UnboundedQueue - :members: - Lower-level synchronization primitives ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 6e328f94d2..150ed83559 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -159,6 +159,36 @@ anything real. See `#26 :with: queue +Unbounded queues +================ + +In the section :ref:`queue`, we showed an example with two producers +and one consumer using the same queue, where the queue size would grow +without bound to produce unbounded latency and memory usage. +:class:`trio.Queue` avoids this by placing an upper bound on how big +the queue can get before ``put`` starts blocking. But what if you're +in a situation where ``put`` can't block? + +There is another option: the queue consumer could get greedy. Each +time it runs, it could eagerly consume all of the pending items before +allowing another task to run. (In some other systems, this would +happen automatically because their queue's ``get`` method doesn't +invoke the scheduler unless it has to block. But :ref:`in trio, get is +always a checkpoint `.) This works, but it's a bit +risky: basically instead of applying backpressure to specifically the +producer tasks, we're applying it to *all* the tasks in our system. +The danger here is that if enough items have built up in the queue, +then "stopping the world" to process them all may cause unacceptable +latency spikes in unrelated tasks. Nonetheless, this is still the +right choice in situations where it's impossible to apply backpressure +more precisely. So this is the strategy implemented by +:class:`UnboundedQueue`. The main time you should use this is when +working with low-level APIs like :func:`monitor_kevent`. + +.. autoclass:: UnboundedQueue + :members: + + Global state: system tasks and run-local storage ================================================ diff --git a/trio/__init__.py b/trio/__init__.py index 5a98824ee8..96cab098ee 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -92,6 +92,10 @@ _deprecate.DeprecatedAttribute(hazmat.Value, "0.2.0", issue=136), "Error": _deprecate.DeprecatedAttribute(hazmat.Error, "0.2.0", issue=136), + "UnboundedQueue": + _deprecate.DeprecatedAttribute( + hazmat.UnboundedQueue, "0.2.0", issue=136 + ), "run_in_worker_thread": _deprecate.DeprecatedAttribute( run_sync_in_worker_thread, "0.2.0", issue=68 diff --git a/trio/_core/_unbounded_queue.py b/trio/_core/_unbounded_queue.py index fb6844d8a3..24c39906de 100644 --- a/trio/_core/_unbounded_queue.py +++ b/trio/_core/_unbounded_queue.py @@ -14,6 +14,7 @@ class _UnboundedQueueStats: tasks_waiting = attr.ib() +@_hazmat class UnboundedQueue: """An unbounded queue suitable for certain unusual forms of inter-task communication. @@ -25,8 +26,8 @@ class UnboundedQueue: "batches". If a consumer task processes each batch without yielding, then this helps achieve (but does not guarantee) an effective bound on the queue's memory use, at the cost of potentially increasing system latencies - in general. You should generally prefer to use a :class:`Queue` instead if - you can. + in general. You should generally prefer to use a :class:`trio.Queue` + instead if you can. Currently each batch completely empties the queue, but `this may change in the future `__. @@ -99,10 +100,10 @@ def get_batch_nowait(self): Returns: list: A list of dequeued items, in order. On a successful call this list is always non-empty; if it would be empty we raise - :exc:`WouldBlock` instead. + :exc:`~trio.WouldBlock` instead. Raises: - WouldBlock: if the queue is empty. + ~trio.WouldBlock: if the queue is empty. """ if not self._can_get: diff --git a/trio/_signals.py b/trio/_signals.py index bd9fca40b5..817d8b6cbf 100644 --- a/trio/_signals.py +++ b/trio/_signals.py @@ -113,10 +113,7 @@ def catch_signals(signals): The async iterator blocks until at least one signal has arrived, and then yields a :class:`set` containing all of the signals that were received - since the last iteration. (This is generally similar to how - :class:`UnboundedQueue` works, but since Unix semantics are that identical - signals can/should be coalesced, here we use a :class:`set` for storage - instead of a :class:`list`.) + since the last iteration. Note that if you leave the ``with`` block while the iterator has unextracted signals still pending inside it, then they will be diff --git a/trio/_sync.py b/trio/_sync.py index dbd00e1858..82c02477d6 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -808,8 +808,7 @@ class Queue: """A bounded queue suitable for inter-task communication. This class is generally modelled after :class:`queue.Queue`, but with the - major difference that it is always bounded. For an unbounded queue, see - :class:`trio.UnboundedQueue`. + major difference that it is always bounded. A :class:`Queue` object can be used as an asynchronous iterator, that dequeues objects one at a time. I.e., these two loops are equivalent::