diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index ed417458b7..c720cded2a 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -660,28 +660,29 @@ This means that tasks form a tree: when you call :func:`run`, then this creates an initial task, and all your other tasks will be children, grandchildren, etc. of the initial task. -The crucial thing about this setup is that when execution reaches the -end of the ``async with`` block, then the nursery cleanup code -runs. The nursery cleanup code does the following things: +Essentially, the body of the ``async with`` block acts like an initial +task that's running inside the nursery, and then each call to +``nursery.start_soon`` adds another task that runs in parallel. Two +crucial things to keep in mind: -* If the body of the ``async with`` block raised an exception, then it - cancels all remaining child tasks and saves the exception. +* If any task inside the nursery finishes with an unhandled exception, + then the nursery immediately cancels all the tasks inside the + nursery. -* It watches for child tasks to exit. If a child task exits with an - exception, then it cancels all remaining child tasks and saves the - exception. +* Since all of the tasks are running concurrently inside the ``async + with`` block, the block does not exit until *all* tasks have + completed. If you've used other concurrency frameworks, then you can + think of it as, the de-indentation at the end of the ``async with`` + automatically "joins" (waits for) all of the tasks in the nursery. -* Once all child tasks have exited: +* Once all the tasks have finished, then: - * It marks the nursery as "closed", so no new tasks can be spawned - in it. + * The nursery is marked as "closed", meaning that no new tasks can + be started inside it. - * If there's just one saved exception, it re-raises it, or - - * If there are multiple saved exceptions, it re-raises them as a - :exc:`MultiError`, or - - * if there are no saved exceptions, it exits normally. + * Any unhandled exceptions are re-raised inside the parent task. If + there are multiple exceptions, then they're collected up into a + single :exc:`MultiError` exception. Since all tasks are descendents of the initial task, one consequence of this is that :func:`run` can't finish until all tasks have @@ -745,39 +746,10 @@ and it also provides some helpful utilities like :exc:`MultiError`. -How to be a good parent task -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Supervising child tasks is a full time job. If you want your program -to do two things at once, then don't expect the parent task to do one -while a child task does another – instead, start two children and let -the parent focus on managing them. - -So, don't do this:: - - # bad idea! - async with trio.open_nursery() as nursery: - nursery.start_soon(walk) - await chew_gum() - -Instead, do this:: - - # good idea! - async with trio.open_nursery() as nursery: - nursery.start_soon(walk) - nursery.start_soon(chew_gum) - # now parent task blocks in the nursery cleanup code - -The difference between these is that in the first example, if ``walk`` -crashes, the parent is off distracted chewing gum, and won't -notice. In the second example, the parent is watching both children, -and will notice and respond appropriately if anything happens. - - Spawning tasks without becoming a parent ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Sometimes it doesn't make sense for the task that spawns a child to +Sometimes it doesn't make sense for the task that starts a child to take on responsibility for watching it. For example, a server task may want to start a new task for each connection, but it can't listen for connections and supervise children at the same time. @@ -795,13 +767,24 @@ code like this:: async with trio.open_nursery() as nursery: nursery.start_soon(new_connection_listener, handler, nursery) -Now ``new_connection_listener`` can focus on handling new connections, -while its parent focuses on supervising both it and all the individual -connection handlers. +Notice that ``server`` opens a nursery and passes it to +``new_connection_listener``, and then ``new_connection_listener`` is +able to start new tasks as "siblings" of itself. Of course, in this +case, we could just as well have written:: + + async def server(handler): + async with trio.open_nursery() as nursery: + while True: + conn = await get_new_connection() + nursery.start_soon(handler, conn) + +\...but sometimes things aren't so simple, and this trick comes in +handy. -And remember that cancel scopes are inherited from the nursery, -**not** from the task that calls ``start_soon``. So in this example, -the timeout does *not* apply to ``child`` (or to anything else):: +One thing to remember, though: cancel scopes are inherited from the +nursery, **not** from the task that calls ``start_soon``. So in this +example, the timeout does *not* apply to ``child`` (or to anything +else):: async def do_spawn(nursery): with move_on_after(TIMEOUT): # don't do this, it has no effect @@ -831,34 +814,33 @@ For example, here's a function that takes a list of functions, runs them all concurrently, and returns the result from the one that finishes first:: - # XX this example can be simplified a little after #136 is fixed in 0.3.0 - async def race(*async_fns): if not async_fns: raise ValueError("must pass at least one argument") - async def racecar(results, async_fn, cancel_scope): - result = await async_fn() - results.append(result) - cancel_scope.cancel() + q = trio.Queue(1) + + async def jockey(async_fn): + await q.put(await async_fn()) async with trio.open_nursery() as nursery: - results = [] - cancel_scope = nursery.cancel_scope for async_fn in async_fns: - nursery.start_soon(racecar, results, async_fn, cancel_scope) - - return results[0] - -This works by starting a set of racecar tasks which each try to run -their function, report back, and then cancel all the rest. Eventually -one suceeds, all the tasks are cancelled and exit, and then our -nursery exits and we return the winning value. And if one or more of -them raises an unhandled exception then Trio's normal handling kicks -in: it cancels the others and then propagates the exception. If you -wanted different behavior, you could do that by adding a ``try`` block -to the ``racecar`` function to catch exceptions and handle them -however you like. + nursery.start_soon(jockey, async_fn) + winner = await q.get() + nursery.cancel_scope.cancel() + return winner + +This works by starting a set of tasks which each try to run their +function, and then report back the value it returns. The main task +uses ``q.get()`` to wait for one to finish; as soon as the first task +crosses the finish line, it cancels the rest, and then returns the +winning value. + +Here if one or more of the racing functions raises an unhandled +exception then Trio's normal handling kicks in: it cancels the others +and then propagates the exception. If you want different behavior, you +can get that by adding a ``try`` block to the ``jockey`` function to +catch exceptions and handle them however you like. Task-related API details diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst index 031dd1cda0..fef75c407c 100644 --- a/docs/source/tutorial.rst +++ b/docs/source/tutorial.rst @@ -461,16 +461,6 @@ children, and it raises an exception, then it lets us propagate that exception into the parent; in many other frameworks, exceptions like this are just discarded. Trio never discards exceptions. -However – this is important! – the parent won't see the exception -unless and until it reaches the end of the nursery's ``async wait`` -block and runs the ``__aexit__`` function. So remember: in trio, -parenting is a full-time job! Any given piece of code manage a nursery -– which means opening it, spawning some children, and then sitting in -``__aexit__`` to supervise them – or it can do actual work, but you -shouldn't try to do both at the same time in the same function. If you -find yourself tempted to do some work in the parent, then ``start_soon`` -another child and have it do the work. In trio, children are cheap. - Ok! Let's try running it and see what we get: .. code-block:: none diff --git a/newsfragments/136.feature.rst b/newsfragments/136.feature.rst new file mode 100644 index 0000000000..5fd5d3e32d --- /dev/null +++ b/newsfragments/136.feature.rst @@ -0,0 +1,18 @@ +**Simplified nurseries**: In Trio, the rule used to be that "parenting +is a full time job", meaning that after a task opened a nursery and +spawned some children into it, it had to immediately block in +``__aexit__`` to supervise the new children, or else exception +propagation wouldn't work. Also there was some elaborate machinery to +let you replace this supervision logic with your own custom +supervision logic. Thanks to new advances in task-rearing technology, +**parenting is no longer a full time job!** Now the supervision happens +automatically in the background, and essentially the body of a ``async +with trio.open_nursery()`` block acts just like a task running inside +the nursery. This is important: it makes it possible for libraries to +abstract over nursery creation. For example, if you have a Websocket +library that needs to run a background task to handle Websocket pings, +you can now do that with ``async with open_websocket(...) as ws: +...``, and that can run a task in the background without your users +having to worry about parenting it. And don't worry, you can still +make custom supervisors; it turned out all that spiffy machinery was +actually redundant and didn't provide much value. diff --git a/notes-to-self/loopy.py b/notes-to-self/loopy.py index fca033a6c0..9f893590bd 100644 --- a/notes-to-self/loopy.py +++ b/notes-to-self/loopy.py @@ -10,8 +10,9 @@ async def loopy(): print("KI!") async def main(): - await trio.start_soon(loopy) - await trio.start_soon(loopy) - await trio.start_soon(loopy) + async with trio.open_nursery() as nursery: + nursery.start_soon(loopy) + nursery.start_soon(loopy) + nursery.start_soon(loopy) trio.run(main) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 23a486a1c6..88e0d4bab1 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -45,8 +45,7 @@ # namespaces. __all__ = [ "Task", "run", "open_nursery", "open_cancel_scope", "checkpoint", - "current_call_soon_thread_and_signal_safe", "current_task", - "current_effective_deadline", "checkpoint_if_cancelled", + "current_task", "current_effective_deadline", "checkpoint_if_cancelled", "TASK_STATUS_IGNORED" ] @@ -292,10 +291,9 @@ def started(self, value=None): for task in munged_tasks: task._attempt_delivery_of_any_pending_cancel() - # And finally, we cancel the old nursery's scope, so that its - # __aexit__ notices that all the children are gone and it can exit. - # (This is a bit of a hack.) - self._old_nursery.cancel_scope.cancel() + # And finally, poke the old nursery so it notices that all its + # children have disappeared and can exit. + self._old_nursery._check_nursery_closed() @acontextmanager @@ -311,13 +309,13 @@ async def open_nursery(): assert currently_ki_protected() with open_cancel_scope() as scope: nursery = Nursery(current_task(), scope) - pending_exc = None + nested_child_exc = None try: await yield_(nursery) except BaseException as exc: - pending_exc = exc + nested_child_exc = exc assert currently_ki_protected() - await nursery._clean_up(pending_exc) + await nursery._nested_child_finished(nested_child_exc) # I *think* this is equivalent to the above, and it gives *much* nicer @@ -353,8 +351,6 @@ async def open_nursery(): class Nursery: def __init__(self, parent_task, cancel_scope): - # the parent task -- only used for introspection, to implement - # task.parent_task self._parent_task = parent_task parent_task._child_nurseries.append(self) # the cancel stack that children inherit - we take a snapshot, so it @@ -365,16 +361,15 @@ def __init__(self, parent_task, cancel_scope): self.cancel_scope = cancel_scope assert self.cancel_scope is self._cancel_stack[-1] self._children = set() + self._pending_excs = [] + # The "nested child" is how this code refers to the contents of the + # nursery's 'async with' block, which acts like a child Task in all + # the ways we can make it. + self._nested_child_running = True + self._parent_waiting_in_aexit = False self._pending_starts = 0 - self._zombies = set() - self._monitor = _core.UnboundedQueue() self._closed = False - @property - @deprecated("0.2.0", instead="child_tasks", issue=136) - def children(self): - return frozenset(self._children) - @property def child_tasks(self): return frozenset(self._children) @@ -383,34 +378,57 @@ def child_tasks(self): def parent_task(self): return self._parent_task - @property - @deprecated("0.2.0", instead=None, issue=136) - def zombies(self): - return frozenset(self._zombies) + def _add_exc(self, exc): + self._pending_excs.append(exc) + self.cancel_scope.cancel() - @property - @deprecated("0.2.0", instead=None, issue=136) - def monitor(self): - return self._monitor + def _check_nursery_closed(self): + if (not self._nested_child_running and not self._children + and not self._pending_starts): + self._closed = True + if self._parent_waiting_in_aexit: + self._parent_waiting_in_aexit = False + GLOBAL_RUN_CONTEXT.runner.reschedule(self._parent_task) - def _child_finished(self, task): + def _child_finished(self, task, result): self._children.remove(task) - self._zombies.add(task) - self._monitor.put_nowait(task) + if type(result) is Error: + self._add_exc(result.error) + self._check_nursery_closed() + + async def _nested_child_finished(self, nested_child_exc): + if nested_child_exc is not None: + self._add_exc(nested_child_exc) + self._nested_child_running = False + self._check_nursery_closed() + + if not self._closed: + # If we get cancelled (or have an exception injected, like + # KeyboardInterrupt), then save that, but still wait until our + # children finish. + def aborted(raise_cancel): + self._add_exc(Result.capture(raise_cancel).error) + return Abort.FAILED + + self._parent_waiting_in_aexit = True + await wait_task_rescheduled(aborted) + else: + # Nothing to wait for, so just execute a checkpoint -- but we + # still need to mix any exception (e.g. from an external + # cancellation) in with the rest of our exceptions. + try: + await checkpoint() + except BaseException as exc: + self._add_exc(exc) + + popped = self._parent_task._child_nurseries.pop() + assert popped is self + if self._pending_excs: + raise MultiError(self._pending_excs) def start_soon(self, async_fn, *args, name=None): GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name) - # Returns the task, unlike start_soon - @deprecated( - "0.2.0", - thing="nursery.spawn", - instead="nursery.start_soon", - issue=284 - ) - def spawn(self, async_fn, *args, name=None): - return GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name) - async def start(self, async_fn, *args, name=None): if self._closed: raise RuntimeError("Nursery is closed to new arrivals") @@ -420,8 +438,10 @@ async def start(self, async_fn, *args, name=None): task_status = _TaskStatus(old_nursery, self) thunk = functools.partial(async_fn, task_status=task_status) old_nursery.start_soon(thunk, *args, name=name) + # Wait for either _TaskStatus.started or an exception to + # cancel this nursery: # If we get here, then the child either got reparented or exited - # normally. The complicated logic is all in __TaskStatus.started(). + # normally. The complicated logic is all in _TaskStatus.started(). # (Any exceptions propagate directly out of the above.) if not task_status._called_started: raise RuntimeError( @@ -430,96 +450,10 @@ async def start(self, async_fn, *args, name=None): return task_status._value finally: self._pending_starts -= 1 - self._monitor.put_nowait(None) - - def _reap(self, task): - try: - self._zombies.remove(task) - except KeyError: - raise ValueError("{} is not a zombie in this nursery".format(task)) - - @deprecated("0.2.0", instead=None, issue=136) - def reap(self, task): - return self._reap(task) - - def _reap_and_unwrap(self, task): - self._reap(task) - return task._result.unwrap() - - @deprecated("0.2.0", instead=None, issue=136) - def reap_and_unwrap(self, task): - return self._reap_and_unwrap(task) - - async def _clean_up(self, pending_exc): - cancelled_children = False - exceptions = [] - if pending_exc is not None: - exceptions.append(pending_exc) - # Careful - the logic in this loop is deceptively subtle, because of - # all the different possible states that we have to handle. (Entering - # with/out an error, with/out unreaped zombies, with/out children - # living, with/out an error that occurs after we enter, ...) - with open_cancel_scope() as clean_up_scope: - if not self._children and not self._zombies: - try: - await _core.checkpoint() - except BaseException as exc: - exceptions.append(exc) - while self._children or self._zombies or self._pending_starts: - # First, reap any zombies. They may or may not still be in the - # monitor queue, and they may or may not trigger cancellation - # of remaining tasks, so we have to check first before - # blocking on the monitor queue. - for task in list(self._zombies): - if type(task._result) is Error: - exceptions.append(task._result.error) - self._reap(task) - - if exceptions and not cancelled_children: - self.cancel_scope.cancel() - clean_up_scope.shield = True - cancelled_children = True - - if self._children or self._pending_starts: - try: - # We ignore the return value here, and will pick up - # the actual tasks from the zombies set after looping - # around. (E.g. it's possible there are tasks in the - # queue that were already reaped.) - await self._monitor.get_batch() - except (Cancelled, KeyboardInterrupt) as exc: - exceptions.append(exc) - - self._closed = True - popped = self._parent_task._child_nurseries.pop() - assert popped is self - if exceptions: - mexc = MultiError(exceptions) - if (pending_exc and mexc.__cause__ is None - and mexc.__context__ is None): - # pending_exc is *part* of this MultiError, so it doesn't - # make sense to also have it as - # __context__. Unfortunately, we can't stop Python from - # setting it as __context__, but we can at least suppress - # it from being printed. - raise mexc from None - else: - # There could potentially be a genuine __context__ that - # should be attached, e.g.: - # - # try: - # ... - # except: - # with open_nursery(): - # ... - # - # Or, if len(exceptions) == 1, this could be a regular - # exception that already has __cause__ or __context__ - # set. - raise mexc + self._check_nursery_closed() def __del__(self): - assert not self._children and not self._zombies + assert not self._children ################################################################ @@ -547,10 +481,6 @@ class Task: _runner = attr.ib() name = attr.ib() # Invariant: - # - for unfinished tasks, result is None - # - for finished tasks, result is a Result object - _result = attr.ib(default=None) - # Invariant: # - for unscheduled tasks, _next_send is None # - for scheduled tasks, _next_send is a Result object # Tasks start out unscheduled. @@ -573,24 +503,6 @@ class Task: def __repr__(self): return ("".format(self.name, id(self))) - @property - @deprecated("0.2.0", instead=None, issue=136) - def result(self): - return self._result - - # For debugging and visualization: - @property - @deprecated("0.2.0", instead="parent_nursery.parent_task", issue=136) - def parent_task(self): - """This task's parent task (or None if this is the "init" task). - - Example use case: drawing a visualization of the task tree. - """ - if self._parent_nursery is None: - return None - else: - return self._parent_nursery._parent_task - @property def parent_nursery(self): """The nursery this task is inside (or None if this is the "init" @@ -611,70 +523,6 @@ def child_nurseries(self): """ return list(self._child_nurseries) - ################ - # Monitoring task exit - ################ - - _monitors = attr.ib(default=attr.Factory(set)) - - @deprecated("0.2.0", instead=None, issue=136) - def add_monitor(self, queue): - """Register to be notified when this task exits. - - Args: - queue (UnboundedQueue): An :class:`UnboundedQueue` object that this - task object will be put into when it exits. - - Raises: - TypeError: if ``queue`` is not a :class:`UnboundedQueue` - ValueError: if ``queue`` is already registered with this task - - """ - return self._add_monitor(queue) - - def _add_monitor(self, queue): - # Rationale: (a) don't particularly want to create a - # callback-in-disguise API by allowing people to stick in some - # arbitrary object with a put_nowait method, (b) don't want to have to - # figure out how to deal with errors from a user-provided object; if - # UnboundedQueue.put_nowait raises then that's legitimately a bug in - # trio so raising InternalError is justified. - if type(queue) is not _core.UnboundedQueue: - raise TypeError("monitor must be an UnboundedQueue object") - if queue in self._monitors: - raise ValueError("can't add same monitor twice") - if self._result is not None: - queue.put_nowait(self) - else: - self._monitors.add(queue) - - @deprecated("0.2.0", instead=None, issue=136) - def discard_monitor(self, queue): - """Unregister the given queue from being notified about this task - exiting. - - This operation always succeeds, regardless of whether ``queue`` was - previously registered. - - Args: - queue (UnboundedQueue): The queue that should no longer recieve - notification. - """ - - self._monitors.discard(queue) - - @deprecated("0.2.0", instead=None, issue=136) - async def wait(self): - """Wait for this task to exit. - - """ - q = _core.UnboundedQueue() - self._add_monitor(q) - try: - await q.get_batch() - finally: - self._monitors.discard(q) - ################ # Cancellation ################ @@ -690,7 +538,7 @@ def _attempt_abort(self, raise_cancel): # rescheduling itself (hopefully eventually calling reraise to raise # the given exception, but not necessarily). success = self._abort_func(raise_cancel) - if type(success) is not _core.Abort: + if type(success) is not Abort: raise TrioInternalError("abort function must return Abort enum") # We only attempt to abort once per blocking call, regardless of # whether we succeeded or failed. @@ -736,11 +584,6 @@ class _RunStatistics: io_statistics = attr.ib() run_sync_soon_queue_size = attr.ib() - @property - @deprecated("0.2.0", issue=68, instead="run_sync_soon_queue_size") - def call_soon_queue_size(self): - return self.run_sync_soon_queue_size - @attr.s(cmp=False, hash=False) class Runner: @@ -761,8 +604,10 @@ class Runner: deadlines = attr.ib(default=attr.Factory(SortedDict)) init_task = attr.ib(default=None) - main_task = attr.ib(default=None) + init_task_result = attr.ib(default=None) system_nursery = attr.ib(default=None) + main_task = attr.ib(default=None) + main_task_result = attr.ib(default=None) entry_queue = attr.ib(default=attr.Factory(EntryQueue)) trio_token = attr.ib(default=None) @@ -989,7 +834,6 @@ def _return_value_looks_like_wrong_library(value): return task def task_exited(self, task, result): - task._result = result while task._cancel_stack: task._cancel_stack[-1]._remove_task(task) self.tasks.remove(task) @@ -997,10 +841,12 @@ def task_exited(self, task, result): # the init task should be the last task to exit assert not self.tasks else: - task._parent_nursery._child_finished(task) - for monitor in task._monitors: - monitor.put_nowait(task) - task._monitors.clear() + task._parent_nursery._child_finished(task, result) + if task is self.main_task: + self.main_task_result = result + self.system_nursery.cancel_scope.cancel() + if task is self.init_task: + self.init_task_result = result self.instrument("task_exited", task) ################ @@ -1071,20 +917,11 @@ def excfilter(exc): async def init(self, async_fn, args): async with open_nursery() as system_nursery: self.system_nursery = system_nursery - - self.entry_queue.spawn() - self.main_task = self.spawn_impl( - async_fn, args, self.system_nursery, name=None + async_fn, args, system_nursery, None ) - - async for task_batch in system_nursery._monitor: - for task in task_batch: - if task is self.main_task: - system_nursery.cancel_scope.cancel() - return system_nursery._reap_and_unwrap(task) - else: - system_nursery._reap_and_unwrap(task) + self.entry_queue.spawn() + return self.main_task_result.unwrap() ################ # Outside context problems @@ -1127,7 +964,7 @@ def _deliver_ki_cb(self): # the same time -- so even if KI arrives before main_task is created, # we won't get here until afterwards. assert self.main_task is not None - if self.main_task._result is not None: + if self.main_task_result is not None: # We're already in the process of exiting -- leave ki_pending set # and we'll check it again on our way out of run(). return @@ -1263,18 +1100,6 @@ def remove_instrument(self, instrument): except ValueError as exc: raise KeyError(*exc.args) - @_public - @deprecated("0.2.0", issue=257, instead="{add,remove}_instrument") - def current_instruments(self): - """Returns the list of currently active instruments. - - This list is *live*: if you mutate it, then :func:`trio.run` will - stop calling the instruments you remove and start calling the ones you - add. - - """ - return self.instruments - ################################################################ # run @@ -1356,6 +1181,8 @@ def run( """ + __tracebackhide__ = True + # Do error-checking up front, before we enter the TrioInternalError # try/catch # @@ -1409,6 +1236,8 @@ def run( def run_impl(runner, async_fn, args): + __tracebackhide__ = True + runner.instrument("before_run") runner.clock.start_clock() runner.init_task = runner.spawn_impl( @@ -1418,6 +1247,8 @@ def run_impl(runner, async_fn, args): ki_protection_enabled=True ) + # You know how people talk about "event loops"? This 'while' loop right + # here is our event loop: while runner.tasks: if runner.runq: timeout = 0 @@ -1529,14 +1360,21 @@ def run_impl(runner, async_fn, args): "other framework like asyncio? That won't work " "without some kind of compatibility shim.".format(msg) ) - # There's really no way to resume this task, so abandon it + # How can we resume this task? It's blocked in code we + # don't control, waiting for some message that we know + # nothing about. We *could* try using coro.throw(...) to + # blast an exception in and hope that it propagates out, + # but (a) that's complicated because we aren't set up to + # resume a task via .throw(), and (b) even if we did, + # there's no guarantee that the foreign code will respond + # the way we're hoping. So instead we abandon this task # and propagate the exception into the task's spawner. runner.task_exited(task, Error(exc)) runner.instrument("after_task_step", task) del GLOBAL_RUN_CONTEXT.task - return runner.init_task._result + return runner.init_task_result ################################################################ @@ -1686,8 +1524,3 @@ def _generate_method_wrappers(cls, path_to_instance): _generate_method_wrappers(Runner, "runner") _generate_method_wrappers(TheIOManager, "runner.io_manager") - - -@deprecated("0.2.0", issue=68, instead=TrioToken) -def current_call_soon_thread_and_signal_safe(): - return current_trio_token().run_sync_soon diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 03173da00b..d61fa37174 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -91,16 +91,6 @@ async def main(): # pragma: no cover assert "from inside" in str(excinfo.value) -async def test_basic_spawn_wait(recwarn): - async def child(x): - return 2 * x - - async with _core.open_nursery() as nursery: - task = nursery.spawn(child, 10) - await task.wait() - assert task.result.unwrap() == 20 - - async def test_nursery_warn_use_async_with(): with pytest.raises(RuntimeError) as excinfo: on = _core.open_nursery() @@ -115,21 +105,16 @@ async def test_nursery_warn_use_async_with(): pass -# can remove after 0.2.0 -async def test_child_crash_basic_deprecated(recwarn): - exc = ValueError("uh oh") +async def test_nursery_main_block_error_basic(): + exc = ValueError("whoops") - async def erroring(): - raise exc - - async with _core.open_nursery() as nursery: - task = nursery.spawn(erroring) - await task.wait() - assert task.result.error is exc - nursery.reap(task) + with pytest.raises(ValueError) as excinfo: + async with _core.open_nursery() as nursery: + raise exc + assert excinfo.value is exc -async def test_child_crash_basic(recwarn): +async def test_child_crash_basic(): exc = ValueError("uh oh") async def erroring(): @@ -143,22 +128,6 @@ async def erroring(): assert e is exc -async def test_reap_bad_task(recwarn): - async def child(): - pass - - async with _core.open_nursery() as nursery: - t = nursery.spawn(child) - with pytest.raises(ValueError): - nursery.reap(None) - with pytest.raises(ValueError): - nursery.reap(t) - await t.wait() - nursery.reap(t) - with pytest.raises(ValueError): - nursery.reap(t) - - async def test_basic_interleave(): async def looper(whoami, record): for i in range(3): @@ -182,9 +151,7 @@ def test_task_crash_propagation(): async def looper(): try: while True: - print("looper sleeping") await _core.checkpoint() - print("looper woke up") except _core.Cancelled: print("looper cancelled") looper_record.append("cancelled") @@ -204,26 +171,22 @@ async def main(): assert excinfo.value.args == ("argh",) -def test_main_and_task_both_crash(recwarn): +def test_main_and_task_both_crash(): # If main crashes and there's also a task crash, then we get both in a # MultiError async def crasher(): raise ValueError - async def main(wait): + async def main(): async with _core.open_nursery() as nursery: - crasher_task = nursery.spawn(crasher) - if wait: - await crasher_task.wait() + crasher_task = nursery.start_soon(crasher) raise KeyError - for wait in [True, False]: - with pytest.raises(_core.MultiError) as excinfo: - _core.run(main, wait) - print(excinfo.value) - assert set(type(exc) for exc in excinfo.value.exceptions) == { - ValueError, KeyError - } + with pytest.raises(_core.MultiError) as excinfo: + _core.run(main) + print(excinfo.value) + assert set(type(exc) + for exc in excinfo.value.exceptions) == {ValueError, KeyError} def test_two_child_crashes(): @@ -241,6 +204,16 @@ async def main(): for exc in excinfo.value.exceptions) == {ValueError, KeyError} +async def test_child_crash_wakes_parent(): + async def crasher(): + raise ValueError + + with pytest.raises(ValueError): + async with _core.open_nursery() as nursery: + nursery.start_soon(crasher) + await sleep_forever() + + async def test_reschedule(): t1 = None t2 = None @@ -273,64 +246,6 @@ async def child2(): nursery.start_soon(child2) -async def test_task_monitor(recwarn): - async def child(): - return 1 - - q1 = _core.UnboundedQueue() - q2 = _core.UnboundedQueue() - q3 = _core.UnboundedQueue() - async with _core.open_nursery() as nursery: - task = nursery.spawn(child) - task.add_monitor(q1) - task.add_monitor(q2) - - # okay to discard one that was never there - task.discard_monitor(q3) - - # discard one that *was* there, to make sure it works - task.discard_monitor(q2) - - # add one that's already there: - with pytest.raises(ValueError): - task.add_monitor(q1) - - task.add_monitor(q3) - - # q1 and q3 should be there now, check that they indeed get notified - await _core.wait_all_tasks_blocked() - - assert task.result.unwrap() == 1 - assert q1.get_batch_nowait() == [task] - with pytest.raises(_core.WouldBlock): - q2.get_batch_nowait() - assert q3.get_batch_nowait() == [task] - - # can re-add the queue now - for _ in range(2): - assert q1.empty() - task.add_monitor(q1) - # and it immediately receives the result: - assert q1.get_batch_nowait() == [task] - # and since it was used, it's already gone from the set, so we can - # loop around and do it again - - -async def test_bad_monitor_object(recwarn): - task = _core.current_task() - - with pytest.raises(TypeError): - task.add_monitor("hello") - - class BadQueue: - def put_nowait(self, obj): # pragma: no cover - raise KeyError - - bad_queue = BadQueue() - with pytest.raises(TypeError): - task.add_monitor(bad_queue) - - async def test_current_time(): t1 = _core.current_time() # Windows clock is pretty low-resolution -- appveyor tests fail unless we @@ -450,43 +365,20 @@ def filter_tasks(self, tasks): yield item -def test_instruments_deprecated(recwarn): +def test_instruments(recwarn): r1 = TaskRecorder() r2 = TaskRecorder() r3 = TaskRecorder() - async def main(): - for _ in range(4): - await _core.checkpoint() - cp = _core.current_instruments() - assert cp == [r1, r2] - # replace r2 with r3, to test that we can manipulate them as we go - cp[1] = r3 - for _ in range(1): - await _core.checkpoint() - return _core.current_task() - - task = _core.run(main, instruments=[r1, r2]) - # It sleeps 5 times, so it runs 6 times - expected = ( - [("before_run",)] + - 6 * [("schedule", task), - ("before", task), - ("after", task)] + [("after_run",)] - ) - assert len(r1.record) > len(r2.record) > len(r3.record) - assert r1.record == r2.record + r3.record - # Need to filter b/c there's also the system task bumping around in the - # record: - assert list(r1.filter_tasks([task])) == expected - + task = None -def test_instruments(recwarn): - r1 = TaskRecorder() - r2 = TaskRecorder() - r3 = TaskRecorder() + # We use a child task for this, because the main task does some extra + # bookkeeping stuff that can leak into the instrument results, and we + # don't want to deal with it. + async def task_fn(): + nonlocal task + task = _core.current_task() - async def main(): for _ in range(4): await _core.checkpoint() # replace r2 with r3, to test that we can manipulate them as we go @@ -498,9 +390,13 @@ async def main(): _core.add_instrument(r3) for _ in range(1): await _core.checkpoint() - return _core.current_task() - task = _core.run(main, instruments=[r1, r2]) + async def main(): + async with _core.open_nursery() as nursery: + nursery.start_soon(task_fn) + + _core.run(main, instruments=[r1, r2]) + # It sleeps 5 times, so it runs 6 times expected = ( [("before_run",)] + @@ -510,8 +406,6 @@ async def main(): ) assert len(r1.record) > len(r2.record) > len(r3.record) assert r1.record == r2.record + r3.record - # Need to filter b/c there's also the system task bumping around in the - # record: assert list(r1.filter_tasks([task])) == expected @@ -700,9 +594,6 @@ async def test_cancel_edge_cases(): async def test_cancel_scope_multierror_filtering(): - async def child(): - await sleep_forever() - async def crasher(): raise KeyError @@ -711,34 +602,38 @@ async def crasher(): try: async with _core.open_nursery() as nursery: # Two children that get cancelled by the nursery scope - nursery.start_soon(child) # t1 - nursery.start_soon(child) # t2 + nursery.start_soon(sleep_forever) # t1 + nursery.start_soon(sleep_forever) # t2 nursery.cancel_scope.cancel() with _core.open_cancel_scope(shield=True): await wait_all_tasks_blocked() # One child that gets cancelled by the outer scope - nursery.start_soon(child) # t3 + nursery.start_soon(sleep_forever) # t3 outer.cancel() # And one that raises a different error nursery.start_soon(crasher) # t4 + # and then our __aexit__ also receives an outer Cancelled except _core.MultiError as multi_exc: # This is outside the nursery scope but inside the outer # scope, so the nursery should have absorbed t1 and t2's - # exceptions but t3 and t4 should remain - assert len(multi_exc.exceptions) == 2 + # exceptions but t3 and t4 should remain, plus the Cancelled + # from 'outer' + assert len(multi_exc.exceptions) == 3 summary = {} for exc in multi_exc.exceptions: summary.setdefault(type(exc), 0) summary[type(exc)] += 1 - assert summary == {_core.Cancelled: 1, KeyError: 1} + assert summary == {_core.Cancelled: 2, KeyError: 1} raise + except AssertionError: # pragma: no cover + raise except BaseException as exc: - # This is ouside the outer scope, so t3's Cancelled exception should - # also have been absorbed, leaving just a regular KeyError from - # crasher() + # This is ouside the outer scope, so the two outer Cancelled + # exceptions should have been absorbed, leaving just a regular + # KeyError from crasher() assert type(exc) is KeyError - else: - assert False # pragma: no cover + else: # pragma: no cover + assert False async def test_precancelled_task(): @@ -1232,6 +1127,17 @@ async def child(): assert isinstance(excinfo.value.__context__, KeyError) +async def test_nursery_exception_chaining_doesnt_make_context_loops(): + async def crasher(): + raise KeyError + + with pytest.raises(_core.MultiError) as excinfo: + async with _core.open_nursery() as nursery: + nursery.start_soon(crasher) + raise ValueError + assert excinfo.value.__context__ is None + + def test_TrioToken_identity(): async def get_and_check_token(): token = _core.current_trio_token() @@ -1513,44 +1419,37 @@ def slow_abort(raise_cancel): assert record == ["sleeping", "abort-called", "cancelled", "done"] -async def test_Task_parent_task_deprecated(recwarn): - tasks = {} - - async def child2(): - tasks["child2"] = _core.current_task() - - async def child1(): - tasks["child1"] = _core.current_task() - async with _core.open_nursery() as nursery: - return nursery.start_soon(child2) - - async with _core.open_nursery() as nursery: - nursery.start_soon(child1) - - assert tasks["child1"].parent_task is _core.current_task() - assert tasks["child2"].parent_task is tasks["child1"] - - t = _core.current_task() - # Make sure that chaining parent_task eventually gives None (and not, for - # example, an error) - while t is not None: - t = t.parent_task - - async def test_task_tree_introspection(): tasks = {} + nurseries = {} - tasks["parent"] = _core.current_task() + async def parent(): + tasks["parent"] = _core.current_task() - assert tasks["parent"].child_nurseries == [] + assert tasks["parent"].child_nurseries == [] - async with _core.open_nursery() as nursery1: - async with _core.open_nursery() as nursery2: - assert tasks["parent"].child_nurseries == [nursery1, nursery2] + async with _core.open_nursery() as nursery1: + async with _core.open_nursery() as nursery2: + assert tasks["parent"].child_nurseries == [nursery1, nursery2] - assert tasks["parent"].child_nurseries == [] + assert tasks["parent"].child_nurseries == [] - nurseries = {} + async with _core.open_nursery() as nursery: + nurseries["parent"] = nursery + nursery.start_soon(child1) + + # Upward links survive after tasks/nurseries exit + assert nurseries["parent"].parent_task is tasks["parent"] + assert tasks["child1"].parent_nursery is nurseries["parent"] + assert nurseries["child1"].parent_task is tasks["child1"] + assert tasks["child2"].parent_nursery is nurseries["child1"] + + nursery = _core.current_task().parent_nursery + # Make sure that chaining eventually gives a nursery of None (and not, + # for example, an error) + while nursery is not None: + t = nursery.parent_task + nursery = t.parent_nursery async def child2(): tasks["child2"] = _core.current_task() @@ -1567,21 +1466,7 @@ async def child1(): nursery.start_soon(child2) async with _core.open_nursery() as nursery: - nurseries["parent"] = nursery - nursery.start_soon(child1) - - # Upward links survive after tasks/nurseries exit - assert nurseries["parent"].parent_task is tasks["parent"] - assert tasks["child1"].parent_nursery is nurseries["parent"] - assert nurseries["child1"].parent_task is tasks["child1"] - assert tasks["child2"].parent_nursery is nurseries["child1"] - - nursery = _core.current_task().parent_nursery - # Make sure that chaining eventually gives a nursery of None (and not, for - # example, an error) - while nursery is not None: - t = nursery.parent_task - nursery = t.parent_nursery + nursery.start_soon(parent) async def test_nursery_closure(): @@ -1709,7 +1594,7 @@ async def misguided(): assert "asyncio" in str(excinfo.value) -async def test_trivial_yields(recwarn): +async def test_trivial_yields(): with assert_checkpoints(): await _core.checkpoint() @@ -1731,15 +1616,6 @@ async def test_trivial_yields(recwarn): KeyError, _core.Cancelled } - async def trivial(): - pass - - async with _core.open_nursery() as nursery: - t = nursery.spawn(trivial) - assert t.result is not None - with assert_checkpoints(): - await t.wait() - async def test_nursery_start(autojump_clock): async def no_args(): # pragma: no cover @@ -1923,34 +1799,3 @@ async def start_sleep_then_crash(nursery): nursery1.start_soon(start_sleep_then_crash, nursery2) await wait_all_tasks_blocked() assert _core.current_time() - t0 == 7 - - -# can remove after 0.2.0 -async def test_some_deprecated_but_uncovered_methods(recwarn): - async def noop(): - return 33 - - async with _core.open_nursery() as nursery: - assert not nursery.zombies - assert not nursery.children - - nursery.start_soon(noop) - assert len(nursery.children) == 1 - - await wait_all_tasks_blocked() - assert len(nursery.zombies) == 1 - assert not nursery.children - - batch = await nursery.monitor.get_batch() - for task in batch: - assert nursery.reap_and_unwrap(task) == 33 - - assert not nursery.zombies - - record = [] - assert _core.current_statistics().call_soon_queue_size == 0 - call_soon = _core.current_call_soon_thread_and_signal_safe() - call_soon(record.append, 1) - assert _core.current_statistics().call_soon_queue_size == 1 - await wait_all_tasks_blocked() - assert record == [1] diff --git a/trio/_toplevel_core_reexports.py b/trio/_toplevel_core_reexports.py index c6fee116ec..1d74262f38 100644 --- a/trio/_toplevel_core_reexports.py +++ b/trio/_toplevel_core_reexports.py @@ -27,7 +27,6 @@ "current_effective_deadline", "TASK_STATUS_IGNORED", "current_time", - "current_instruments", "TaskLocal", ] diff --git a/trio/hazmat.py b/trio/hazmat.py index ab0e285939..d57d6b3d3c 100644 --- a/trio/hazmat.py +++ b/trio/hazmat.py @@ -18,7 +18,6 @@ "checkpoint_if_cancelled", "spawn_system_task", "reschedule", - "current_call_soon_thread_and_signal_safe", "remove_instrument", "add_instrument", "current_clock", diff --git a/trio/testing/_memory_streams.py b/trio/testing/_memory_streams.py index 74760522fa..7b0457f6ca 100644 --- a/trio/testing/_memory_streams.py +++ b/trio/testing/_memory_streams.py @@ -472,8 +472,6 @@ async def send_all(self, data): ) if self._data and self._receiver_closed: raise BrokenStreamError - if not self._data: - return async def wait_send_all_might_not_block(self): async with self._send_conflict_detector: diff --git a/trio/tests/test_ssl.py b/trio/tests/test_ssl.py index 1332c7ddae..7e8c040194 100644 --- a/trio/tests/test_ssl.py +++ b/trio/tests/test_ssl.py @@ -111,10 +111,6 @@ def ssl_echo_serve_sync(sock, *, expect_fail=False): # Fixture that gives a raw socket connected to a trio-test-1 echo server # (running in a thread). Useful for testing making connections with different # SSLContexts. -# -# This way of writing it is pretty janky, with the nursery hidden inside the -# fixture and no proper parental supervision. Don't copy this code; it was -# written this way before we knew better. @acontextmanager @async_generator async def ssl_echo_server_raw(**kwargs): diff --git a/trio/tests/test_testing.py b/trio/tests/test_testing.py index 93bc43dd9e..aea301067c 100644 --- a/trio/tests/test_testing.py +++ b/trio/tests/test_testing.py @@ -521,10 +521,24 @@ async def do_send_all(data): with assert_checkpoints(): assert await mss.get_data() == b"456" - with pytest.raises(_core.ResourceBusyError): - async with _core.open_nursery() as nursery: - nursery.start_soon(do_send_all, b"xxx") - nursery.start_soon(do_send_all, b"xxx") + # Call send_all twice at once; one should get ResourceBusyError and one + # should succeed. But we can't let the error propagate, because it might + # cause the other to be cancelled before it can finish doing its thing, + # and we don't know which one will get the error. + resource_busy_count = 0 + + async def do_send_all_count_resourcebusy(): + nonlocal resource_busy_count + try: + await do_send_all(b"xxx") + except _core.ResourceBusyError: + resource_busy_count += 1 + + async with _core.open_nursery() as nursery: + nursery.start_soon(do_send_all_count_resourcebusy) + nursery.start_soon(do_send_all_count_resourcebusy) + + assert resource_busy_count == 1 with assert_checkpoints(): await mss.aclose() @@ -576,7 +590,7 @@ def close_hook(): ] -async def test_MemoryRecieveStream(): +async def test_MemoryReceiveStream(): mrs = MemoryReceiveStream() async def do_receive_some(max_bytes):