-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Patch async enter all (addresses #242) #246
Changes from all commits
f1c24c7
4974e3e
21042ef
dc8ccca
0f61305
010a994
c8e7eb8
5f7802d
b17bdbf
f4af279
14a7a12
3020793
8d9ad6b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import itertools | ||
|
||
import trio | ||
import tractor | ||
from tractor import open_actor_cluster | ||
from tractor.trionics import async_enter_all | ||
|
||
from conftest import tractor_test | ||
|
||
|
||
MESSAGE = 'tractoring at full speed' | ||
|
||
|
||
@tractor.context | ||
async def worker(ctx: tractor.Context) -> None: | ||
await ctx.started() | ||
async with ctx.open_stream() as stream: | ||
async for msg in stream: | ||
# do something with msg | ||
print(msg) | ||
assert msg == MESSAGE | ||
|
||
|
||
@tractor_test | ||
async def test_streaming_to_actor_cluster() -> None: | ||
async with ( | ||
open_actor_cluster(modules=[__name__]) as portals, | ||
async_enter_all( | ||
mngrs=[p.open_context(worker) for p in portals.values()], | ||
) as contexts, | ||
async_enter_all( | ||
mngrs=[ctx[0].open_stream() for ctx in contexts], | ||
) as streams, | ||
): | ||
with trio.move_on_after(1): | ||
for stream in itertools.cycle(streams): | ||
await stream.send(MESSAGE) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,8 @@ | |
Actor cluster helpers. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been wondering how to start exposing "high level" helpers like this. @guilledk and I had though about the names:
|
||
|
||
''' | ||
from __future__ import annotations | ||
|
||
from contextlib import asynccontextmanager as acm | ||
from multiprocessing import cpu_count | ||
from typing import AsyncGenerator, Optional | ||
|
@@ -12,39 +14,40 @@ | |
|
||
@acm | ||
async def open_actor_cluster( | ||
|
||
modules: list[str], | ||
count: int = cpu_count(), | ||
names: Optional[list[str]] = None, | ||
|
||
start_method: Optional[str] = None, | ||
hard_kill: bool = False, | ||
) -> AsyncGenerator[ | ||
list[str], | ||
dict[str, tractor.Portal] | ||
]: | ||
|
||
portals: dict[str, tractor.Portal] = {} | ||
uid = tractor.current_actor().uid | ||
|
||
if not names: | ||
suffix = '_'.join(uid) | ||
names = [f'worker_{i}.' + suffix for i in range(count)] | ||
names = [f'worker_{i}' for i in range(count)] | ||
|
||
if not len(names) == count: | ||
raise ValueError( | ||
'Number of names is {len(names)} but count it {count}') | ||
|
||
async with tractor.open_nursery() as an: | ||
async with tractor.open_nursery(start_method=start_method) as an: | ||
async with trio.open_nursery() as n: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So just stick the |
||
for index, key in zip(range(count), names): | ||
uid = tractor.current_actor().uid | ||
|
||
async def start(i) -> None: | ||
key = f'worker_{i}.' + '_'.join(uid) | ||
portals[key] = await an.start_actor( | ||
enable_modules=modules, | ||
name=key, | ||
) | ||
async def _start(name: str) -> None: | ||
name = f'{name}.{uid}' | ||
portals[name] = await an.start_actor( | ||
enable_modules=modules, | ||
name=name, | ||
) | ||
|
||
n.start_soon(start, index) | ||
for name in names: | ||
n.start_soon(_start, name) | ||
|
||
assert len(portals) == count | ||
yield portals | ||
|
||
await an.cancel(hard_kill=hard_kill) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,8 @@ | |
Async context manager primitives with hard ``trio``-aware semantics | ||
|
||
''' | ||
from typing import AsyncContextManager | ||
from typing import TypeVar | ||
from typing import AsyncContextManager, AsyncGenerator | ||
from typing import TypeVar, Sequence | ||
from contextlib import asynccontextmanager as acm | ||
|
||
import trio | ||
|
@@ -13,52 +13,44 @@ | |
T = TypeVar("T") | ||
|
||
|
||
async def _enter_and_sleep( | ||
|
||
async def _enter_and_wait( | ||
mngr: AsyncContextManager[T], | ||
to_yield: dict[int, T], | ||
unwrapped: dict[int, T], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. woa someone has the functional parlance going on 😎 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO that's what this does, it unwraps the thing so you can consume what's inside, so I thought the name was appropriate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. name is great; |
||
all_entered: trio.Event, | ||
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, | ||
|
||
) -> T: | ||
) -> None: | ||
'''Open the async context manager deliver it's value | ||
to this task's spawner and sleep until cancelled. | ||
|
||
''' | ||
async with mngr as value: | ||
to_yield[id(mngr)] = value | ||
unwrapped[id(mngr)] = value | ||
|
||
if all(to_yield.values()): | ||
if all(unwrapped.values()): | ||
all_entered.set() | ||
|
||
# sleep until cancelled | ||
await trio.sleep_forever() | ||
|
||
|
||
@acm | ||
async def async_enter_all( | ||
|
||
*mngrs: tuple[AsyncContextManager[T]], | ||
|
||
) -> tuple[T]: | ||
|
||
to_yield = {}.fromkeys(id(mngr) for mngr in mngrs) | ||
mngrs: Sequence[AsyncContextManager[T]], | ||
) -> AsyncGenerator[tuple[T, ...], None]: | ||
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) | ||
|
||
all_entered = trio.Event() | ||
|
||
async with trio.open_nursery() as n: | ||
for mngr in mngrs: | ||
n.start_soon( | ||
_enter_and_sleep, | ||
_enter_and_wait, | ||
mngr, | ||
to_yield, | ||
unwrapped, | ||
all_entered, | ||
) | ||
|
||
# deliver control once all managers have started up | ||
await all_entered.wait() | ||
yield tuple(to_yield.values()) | ||
|
||
# tear down all sleeper tasks thus triggering individual | ||
# mngr ``__aexit__()``s. | ||
n.cancel_scope.cancel() | ||
yield tuple(unwrapped.values()) | ||
|
||
n.cancel_scope.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hehe i love this, but we still have 3.8 support atm.
I wonder though maybe we'll just drop since 3.10 is out?
I kinda like the idea of just rolling with latest 2 majors.