Skip to content

Commit

Permalink
Move UnboundedQueue into hazmat
Browse files Browse the repository at this point in the history
  • Loading branch information
njsmith committed Aug 31, 2017
1 parent 4ff43c0 commit b290a96
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 53 deletions.
6 changes: 3 additions & 3 deletions docs/source/design.rst
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ Specific style guidelines
and the ``nowait`` version raises :exc:`trio.WouldBlock` if it would block.

* The word ``monitor`` is used for APIs that involve an
:class:`UnboundedQueue` receiving some kind of events. (Examples:
nursery ``.monitor`` attribute, some of the low-level I/O functions in
:mod:`trio.hazmat`.)
:class:`trio.hazmat.UnboundedQueue` receiving some kind of events.
(Examples: nursery ``.monitor`` attribute, some of the low-level I/O
functions in :mod:`trio.hazmat`.)

* ...we should, but currently don't, have a solid convention to
distinguish between functions that take an async callable and those
Expand Down
55 changes: 15 additions & 40 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1218,13 +1218,15 @@ Broadcasting an event with :class:`Event`
:members:


Passing messages with :class:`Queue` and :class:`UnboundedQueue`
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. _queue:

Trio provides two types of queues suitable for different
purposes. Where they differ is in their strategies for handling flow
control. Here's a toy example to demonstrate the problem. Suppose we
have a queue with two producers and one consumer::
Passing messages with :class:`Queue`
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can use :class:`Queue` objects to safely pass objects between
tasks. Trio :class:`Queue` objects always have a bounded size. Here's
a toy example to demonstrate why this is important. Suppose we have a
queue with two producers and one consumer::

async def producer(queue):
while True:
Expand All @@ -1235,9 +1237,9 @@ have a queue with two producers and one consumer::
print(await queue.get())

async def main():
# Trio's actual queue classes have countermeasures to prevent
# this example from working, so imagine we have some sort of
# platonic ideal of a queue here
# This example won't work with Trio's actual Queue class, so
# imagine we have some sort of platonic ideal of an unbounded
# queue here:
queue = trio.HypotheticalQueue()
async with trio.open_nursery() as nursery:
# Two producers
Expand All @@ -1255,41 +1257,14 @@ we add two items to the queue but only remove one, then over time the
queue size grows arbitrarily large, our latency is terrible, we run
out of memory, it's just generally bad news all around.

There are two potential strategies for avoiding this problem.

The preferred solution is to apply *backpressure*. If our queue starts
getting too big, then we can make the producers slow down by having
``put`` block until ``get`` has had a chance to remove an item. This
is the strategy used by :class:`trio.Queue`.

The other possibility is for the queue consumer to get greedy: each
time it runs, it could eagerly consume all of the pending items before
allowing another task to run. (In some other systems, this would
happen automatically because their queue's ``get`` method doesn't
invoke the scheduler unless it has to block. But :ref:`in trio, get is
always a checkpoint <checkpoint-rule>`.) This would work, but it's a
bit risky: basically instead of applying backpressure to specifically
the producer tasks, we're applying it to *all* the tasks in our
system. The danger here is that if enough items have built up in the
queue, then "stopping the world" to process them all may cause
unacceptable latency spikes in unrelated tasks. Nonetheless, this is
still the right choice in situations where it's impossible to apply
backpressure more precisely. For example, when monitoring exiting
tasks, blocking tasks from reporting their death doesn't really
accomplish anything – the tasks are taking up memory either way,
etc. (In this particular case it `might be possible to do better
<https://github.com/python-trio/trio/issues/64>`__, but in general the
principle holds.) So this is the strategy implemented by
:class:`trio.UnboundedQueue`.

tl;dr: use :class:`Queue` if you can.
By placing an upper bound on our queue's size, we avoid this problem.
If the queue gets too big, then it applies *backpressure*: ``put``
blocks and forces the producers to slow down and wait until the
consumer calls ``get``.

.. autoclass:: Queue
:members:

.. autoclass:: UnboundedQueue
:members:


Lower-level synchronization primitives
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
30 changes: 30 additions & 0 deletions docs/source/reference-hazmat.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,36 @@ anything real. See `#26
:with: queue


Unbounded queues
================

In the section :ref:`queue`, we showed an example with two producers
and one consumer using the same queue, where the queue size would grow
without bound to produce unbounded latency and memory usage.
:class:`trio.Queue` avoids this by placing an upper bound on how big
the queue can get before ``put`` starts blocking. But what if you're
in a situation where ``put`` can't block?

There is another option: the queue consumer could get greedy. Each
time it runs, it could eagerly consume all of the pending items before
allowing another task to run. (In some other systems, this would
happen automatically because their queue's ``get`` method doesn't
invoke the scheduler unless it has to block. But :ref:`in trio, get is
always a checkpoint <checkpoint-rule>`.) This works, but it's a bit
risky: basically instead of applying backpressure to specifically the
producer tasks, we're applying it to *all* the tasks in our system.
The danger here is that if enough items have built up in the queue,
then "stopping the world" to process them all may cause unacceptable
latency spikes in unrelated tasks. Nonetheless, this is still the
right choice in situations where it's impossible to apply backpressure
more precisely. So this is the strategy implemented by
:class:`UnboundedQueue`. The main time you should use this is when
working with low-level APIs like :func:`monitor_kevent`.

.. autoclass:: UnboundedQueue
:members:


Global state: system tasks and run-local storage
================================================

Expand Down
4 changes: 4 additions & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@
_deprecate.DeprecatedAttribute(hazmat.Value, "0.2.0", issue=136),
"Error":
_deprecate.DeprecatedAttribute(hazmat.Error, "0.2.0", issue=136),
"UnboundedQueue":
_deprecate.DeprecatedAttribute(
hazmat.UnboundedQueue, "0.2.0", issue=136
),
"run_in_worker_thread":
_deprecate.DeprecatedAttribute(
run_sync_in_worker_thread, "0.2.0", issue=68
Expand Down
9 changes: 5 additions & 4 deletions trio/_core/_unbounded_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class _UnboundedQueueStats:
tasks_waiting = attr.ib()


@_hazmat
class UnboundedQueue:
"""An unbounded queue suitable for certain unusual forms of inter-task
communication.
Expand All @@ -25,8 +26,8 @@ class UnboundedQueue:
"batches". If a consumer task processes each batch without yielding, then
this helps achieve (but does not guarantee) an effective bound on the
queue's memory use, at the cost of potentially increasing system latencies
in general. You should generally prefer to use a :class:`Queue` instead if
you can.
in general. You should generally prefer to use a :class:`trio.Queue`
instead if you can.
Currently each batch completely empties the queue, but `this may change in
the future <https://github.com/python-trio/trio/issues/51>`__.
Expand Down Expand Up @@ -99,10 +100,10 @@ def get_batch_nowait(self):
Returns:
list: A list of dequeued items, in order. On a successful call this
list is always non-empty; if it would be empty we raise
:exc:`WouldBlock` instead.
:exc:`~trio.WouldBlock` instead.
Raises:
WouldBlock: if the queue is empty.
~trio.WouldBlock: if the queue is empty.
"""
if not self._can_get:
Expand Down
5 changes: 1 addition & 4 deletions trio/_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ def catch_signals(signals):
The async iterator blocks until at least one signal has arrived, and then
yields a :class:`set` containing all of the signals that were received
since the last iteration. (This is generally similar to how
:class:`UnboundedQueue` works, but since Unix semantics are that identical
signals can/should be coalesced, here we use a :class:`set` for storage
instead of a :class:`list`.)
since the last iteration.
Note that if you leave the ``with`` block while the iterator has
unextracted signals still pending inside it, then they will be
Expand Down
3 changes: 1 addition & 2 deletions trio/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,8 +808,7 @@ class Queue:
"""A bounded queue suitable for inter-task communication.
This class is generally modelled after :class:`queue.Queue`, but with the
major difference that it is always bounded. For an unbounded queue, see
:class:`trio.UnboundedQueue`.
major difference that it is always bounded.
A :class:`Queue` object can be used as an asynchronous iterator, that
dequeues objects one at a time. I.e., these two loops are equivalent::
Expand Down

0 comments on commit b290a96

Please sign in to comment.