Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Sketch of how shared tasks might work #303

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions trio/_shared_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
__all__ = ["SharedTaskRegistry"]

Check warning on line 1 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L1

Added line #L1 was not covered by tests


# Here's some cleverness to normalize out functools.partial usage, important
# b/c otherwise there's no way to pass kwargs without having to specify a key=
# manually.
#
# XX should we also do signature cleverness to normalize stuff like
# def f(x): ...
# and treat these the same:
# (f, (1,), {})
# (f, (), {"x": 1})
# ? This is less important b/c we can document that if you want magic key
# generation then you should be careful to make your matching calls obviously
# matching.
def _unpack_call(fn, args, kwargs):

Check warning on line 16 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L16

Added line #L16 was not covered by tests
if isinstance(fn, functools.partial):
inner_fn, inner_args, inner_kwargs = _call_to_key(

Check warning on line 18 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L18

Added line #L18 was not covered by tests
fn.func, fn.args, fn.kwargs
)
fn = inner_fn
args = (*inner_args, *args)
kwargs = {**inner_kwargs, **kwargs}
return fn, args, kwargs

Check warning on line 24 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L21-L24

Added lines #L21 - L24 were not covered by tests


def call_to_hashable_key(fn, args):
fn, args, kwargs = _unpack_call(fn, args, {})
return (fn, args, tuple(sorted(kwargs.items())))

Check warning on line 29 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L27-L29

Added lines #L27 - L29 were not covered by tests


@attr.s
class SharedTask:
registry = attr.ib()
key = attr.ib()
cancel_scope = attr.ib(default=None)

Check warning on line 36 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L33-L36

Added lines #L33 - L36 were not covered by tests
# Needed to work around a race condition, where we realize we want to
# cancel the child before it's even created the cancel scope
cancelled_early = attr.ib(default=False)

Check warning on line 39 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L39

Added line #L39 was not covered by tests
# Reference count
waiter_count = attr.ib(default=0)

Check warning on line 41 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L41

Added line #L41 was not covered by tests
# Reporting back
finished = attr.ib(default=attr.Factory(trio.Event))
result = attr.ib(default=None)

Check warning on line 44 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L43-L44

Added lines #L43 - L44 were not covered by tests

# This runs in system task context, so it has KI protection enabled and
# any exceptions will crash the whole program.
async def run(self, async_fn, args):

Check warning on line 48 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L48

Added line #L48 was not covered by tests

async def cancellable_runner():

Check warning on line 50 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L50

Added line #L50 was not covered by tests
with trio.open_cancel_scope() as cancel_scope:
self.cancel_scope = cancel_scope

Check warning on line 52 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L52

Added line #L52 was not covered by tests
if self.cancelled_early:
self.cancel_scope.cancel()
return await ki_unprotected_runner()

Check warning on line 55 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L54-L55

Added lines #L54 - L55 were not covered by tests

@trio.hazmat.disable_ki_protection

Check warning on line 57 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L57

Added line #L57 was not covered by tests
async def ki_unprotected_runner():
return await async_fn(*args)

Check warning on line 59 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L59

Added line #L59 was not covered by tests

self.result = await Result.acapture(cancellable_runner)
self.finished.set()

Check warning on line 62 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L61-L62

Added lines #L61 - L62 were not covered by tests
if self.registry._tasks.get(self.key) is self:
del self.registry._tasks[self.key]

Check warning on line 64 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L64

Added line #L64 was not covered by tests


@attr.s(slots=True, frozen=True, hash=False, cmp=False, repr=False)
class SharedTaskRegistry:
_tasks = attr.ib(default=attr.Factory(dict))

Check warning on line 69 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L68-L69

Added lines #L68 - L69 were not covered by tests

@trio.hazmat.enable_ki_protection

Check warning on line 71 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L71

Added line #L71 was not covered by tests
async def run(self, async_fn, *args, key=None):
if key is None:
key = call_to_hashable_key(async_fn, args)

Check warning on line 74 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L74

Added line #L74 was not covered by tests

if key not in self._tasks:
shared_task = SharedTask(self, key)
self._tasks[key] = shared_task
trio.hazmat.spawn_system_task(shared_task.run, async_fn, args)

Check warning on line 79 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L77-L79

Added lines #L77 - L79 were not covered by tests

shared_task = self._tasks[key]
shared_task.waiter_count += 1

Check warning on line 82 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L81-L82

Added lines #L81 - L82 were not covered by tests

try:
await shared_task.finished.wait()
except:

Check warning on line 86 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L84-L86

Added lines #L84 - L86 were not covered by tests
# Cancelled, or some bug
shared_task.waiter_count -= 1

Check warning on line 88 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L88

Added line #L88 was not covered by tests
if shared_task.waiter_count == 0:
# Make sure any incoming calls to run() start a new task
del self._tasks[key]

Check warning on line 91 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L91

Added line #L91 was not covered by tests

# Cancel the child, while working around the race condition
if shared_task.cancel_scope is None:
shared_task.cancelled_early = True

Check warning on line 95 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L95

Added line #L95 was not covered by tests
else:
shared_task.cancel_scope.cancel()

Check warning on line 97 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L97

Added line #L97 was not covered by tests

with trio.open_cancel_scope(shield=True) as cancel_scope:
await shared_task.finished()

Check warning on line 100 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L100

Added line #L100 was not covered by tests
# Some possibilities:
# - they raised Cancelled. The cancellation we injected is
# absorbed internally, though, so this can only happen
# if a cancellation came from outside. The only way a
# system task can see this is if the whole system is
# going down, so it's OK to re-raise that -- any scope
# that includes a system task includes all the code in
# trio, including us.
# - they raise some other error: we should propagate
# - they return nothing (most common, b/c cancelled was
# raised and then
if not shared_task.cancel_scope.cancelled_caught:
return shared_task.result.unwrap()

Check warning on line 113 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L113

Added line #L113 was not covered by tests
else:
shared_task.result.unwrap()
raise

Check warning on line 116 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L115-L116

Added lines #L115 - L116 were not covered by tests

return shared_task.result.unwrap()

Check warning on line 118 in trio/_shared_task.py

View check run for this annotation

Codecov / codecov/patch

trio/_shared_task.py#L118

Added line #L118 was not covered by tests
Loading