Skip to content

Commit

Permalink
Deprecate most of the nursery and task APIs
Browse files Browse the repository at this point in the history
For python-triogh-136

This commit contains:

- the actual deprecations
- changes to _run.py to prevent warnings
- changes to test_run.py to prevent warnings

Still need to clean up everything else.
  • Loading branch information
njsmith committed Aug 22, 2017
1 parent 6ea17a5 commit b115a81
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 125 deletions.
76 changes: 52 additions & 24 deletions trio/_core/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,27 +359,34 @@ def __init__(self, parent, cancel_scope):
self._children = set()
self._pending_starts = 0
self._zombies = set()
self.monitor = _core.UnboundedQueue()
self._monitor = _core.UnboundedQueue()
self._closed = False

@property
def children(self):
return frozenset(self._children)

@property
@deprecated("0.2.0", instead=None, issue=136)
def zombies(self):
return frozenset(self._zombies)

@property
@deprecated("0.2.0", instead=None, issue=136)
def monitor(self):
return self._monitor

def _child_finished(self, task):
self._children.remove(task)
self._zombies.add(task)
self.monitor.put_nowait(task)
self._monitor.put_nowait(task)

def start_soon(self, async_fn, *args, name=None):
GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name)

# Returns the task, unlike start_soon
#@deprecated("nursery.spawn", version="0.2.0", "nursery.start_soon")
@deprecated("0.2.0", thing="nursery.spawn", instead="nursery.start_soon",
issue=136)
def spawn(self, async_fn, *args, name=None):
return GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name)

Expand All @@ -402,17 +409,25 @@ async def start(self, async_fn, *args, name=None):
return task_status._value
finally:
self._pending_starts -= 1
self.monitor.put_nowait(None)
self._monitor.put_nowait(None)

def reap(self, task):
def _reap(self, task):
try:
self._zombies.remove(task)
except KeyError:
raise ValueError("{} is not a zombie in this nursery".format(task))

@deprecated("0.2.0", instead=None, issue=136)
def reap(self, task):
return self._reap(task)

def _reap_and_unwrap(self, task):
self._reap(task)
return task._result.unwrap()

@deprecated("0.2.0", instead=None, issue=136)
def reap_and_unwrap(self, task):
self.reap(task)
return task.result.unwrap()
return self._reap_and_unwrap(task)

async def _clean_up(self, pending_exc):
cancelled_children = False
Expand All @@ -435,22 +450,22 @@ async def _clean_up(self, pending_exc):
# of remaining tasks, so we have to check first before
# blocking on the monitor queue.
for task in list(self._zombies):
if type(task.result) is Error:
exceptions.append(task.result.error)
self.reap(task)
if type(task._result) is Error:
exceptions.append(task._result.error)
self._reap(task)

if exceptions and not cancelled_children:
self.cancel_scope.cancel()
clean_up_scope.shield = True
cancelled_children = True

if self.children or self._pending_starts:
if self._children or self._pending_starts:
try:
# We ignore the return value here, and will pick up
# the actual tasks from the zombies set after looping
# around. (E.g. it's possible there are tasks in the
# queue that were already reaped.)
await self.monitor.get_batch()
await self._monitor.get_batch()
except (Cancelled, KeyboardInterrupt) as exc:
exceptions.append(exc)

Expand Down Expand Up @@ -483,7 +498,7 @@ async def _clean_up(self, pending_exc):
raise mexc

def __del__(self):
assert not self.children and not self.zombies
assert not self._children and not self._zombies


################################################################
Expand Down Expand Up @@ -513,7 +528,7 @@ class Task:
# Invariant:
# - for unfinished tasks, result is None
# - for finished tasks, result is a Result object
result = attr.ib(default=None)
_result = attr.ib(default=None)
# Invariant:
# - for unscheduled tasks, _next_send is None
# - for scheduled tasks, _next_send is a Result object
Expand All @@ -537,6 +552,11 @@ class Task:
def __repr__(self):
return ("<Task {!r} at {:#x}>".format(self.name, id(self)))

@property
@deprecated("0.2.0", instead=None, issue=136)
def result(self):
return self._result

# For debugging and visualization:
@property
def parent_task(self):
Expand All @@ -555,6 +575,7 @@ def parent_task(self):

_monitors = attr.ib(default=attr.Factory(set))

@deprecated("0.2.0", instead=None, issue=136)
def add_monitor(self, queue):
"""Register to be notified when this task exits.
Expand All @@ -567,6 +588,9 @@ def add_monitor(self, queue):
ValueError: if ``queue`` is already registered with this task
"""
return self._add_monitor(queue)

def _add_monitor(self, queue):
# Rationale: (a) don't particularly want to create a
# callback-in-disguise API by allowing people to stick in some
# arbitrary object with a put_nowait method, (b) don't want to have to
Expand All @@ -577,11 +601,12 @@ def add_monitor(self, queue):
raise TypeError("monitor must be an UnboundedQueue object")
if queue in self._monitors:
raise ValueError("can't add same monitor twice")
if self.result is not None:
if self._result is not None:
queue.put_nowait(self)
else:
self._monitors.add(queue)

@deprecated("0.2.0", instead=None, issue=136)
def discard_monitor(self, queue):
"""Unregister the given queue from being notified about this task
exiting.
Expand All @@ -596,16 +621,17 @@ def discard_monitor(self, queue):

self._monitors.discard(queue)

@deprecated("0.2.0", instead=None, issue=136)
async def wait(self):
"""Wait for this task to exit.
"""
q = _core.UnboundedQueue()
self.add_monitor(q)
self._add_monitor(q)
try:
await q.get_batch()
finally:
self.discard_monitor(q)
self._monitors.discard(q)

################
# Cancellation
Expand Down Expand Up @@ -916,7 +942,7 @@ def _return_value_looks_like_wrong_library(value):
return task

def task_exited(self, task, result):
task.result = result
task._result = result
while task._cancel_stack:
task._cancel_stack[-1]._remove_task(task)
self.tasks.remove(task)
Expand Down Expand Up @@ -1002,14 +1028,16 @@ async def init(self, async_fn, args):
self.spawn_system_task(
self.call_soon_task, name="<call soon task>"
)
self.main_task = system_nursery.spawn(async_fn, *args)
async for task_batch in system_nursery.monitor:

self.main_task = self.spawn_impl(async_fn, args,
self.system_nursery, name=None)
async for task_batch in system_nursery._monitor:
for task in task_batch:
if task is self.main_task:
system_nursery.cancel_scope.cancel()
return system_nursery.reap_and_unwrap(task)
return system_nursery._reap_and_unwrap(task)
else:
system_nursery.reap_and_unwrap(task)
system_nursery._reap_and_unwrap(task)

################
# Outside Context Problems
Expand Down Expand Up @@ -1194,7 +1222,7 @@ def _deliver_ki_cb(self):
# same time -- so even if KI arrives before main_task is created, we
# won't get here until afterwards.
assert self.main_task is not None
if self.main_task.result is not None:
if self.main_task._result is not None:
# We're already in the process of exiting -- leave ki_pending set
# and we'll check it again on our way out of run().
return
Expand Down Expand Up @@ -1606,7 +1634,7 @@ def run_impl(runner, async_fn, args):
runner.instrument("after_task_step", task)
del GLOBAL_RUN_CONTEXT.task

return runner.init_task.result
return runner.init_task._result


################################################################
Expand Down
Loading

0 comments on commit b115a81

Please sign in to comment.