Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch async enter all (addresses #242) #246

Merged
merged 13 commits into from
Oct 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
platforms=['linux', 'windows'],
packages=[
'tractor',
'tractor.trionics',
'tractor.testing',
],
install_requires=[
Expand Down
37 changes: 37 additions & 0 deletions tests/test_clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import itertools

import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import async_enter_all

from conftest import tractor_test


MESSAGE = 'tractoring at full speed'


@tractor.context
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE


@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
async_enter_all(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
async_enter_all(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe i love this, but we still have 3.8 support atm.

I wonder though maybe we'll just drop since 3.10 is out?
I kinda like the idea of just rolling with latest 2 majors.

mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)
2 changes: 2 additions & 0 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from trio import MultiError

from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
Context,
Expand Down Expand Up @@ -39,6 +40,7 @@
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'Portal',
Expand Down
31 changes: 17 additions & 14 deletions tractor/_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Actor cluster helpers.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been wondering how to start exposing "high level" helpers like this. @guilledk and I had though about the names:

  • tractor.extras
  • tractor.builtin


'''
from __future__ import annotations

from contextlib import asynccontextmanager as acm
from multiprocessing import cpu_count
from typing import AsyncGenerator, Optional
Expand All @@ -12,39 +14,40 @@

@acm
async def open_actor_cluster(

modules: list[str],
count: int = cpu_count(),
names: Optional[list[str]] = None,

start_method: Optional[str] = None,
hard_kill: bool = False,
) -> AsyncGenerator[
list[str],
dict[str, tractor.Portal]
]:

portals: dict[str, tractor.Portal] = {}
uid = tractor.current_actor().uid

if not names:
suffix = '_'.join(uid)
names = [f'worker_{i}.' + suffix for i in range(count)]
names = [f'worker_{i}' for i in range(count)]

if not len(names) == count:
raise ValueError(
'Number of names is {len(names)} but count it {count}')

async with tractor.open_nursery() as an:
async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just stick the uid = line underneath here but before you do the zip loop and it should all work nicely.

for index, key in zip(range(count), names):
uid = tractor.current_actor().uid

async def start(i) -> None:
key = f'worker_{i}.' + '_'.join(uid)
portals[key] = await an.start_actor(
enable_modules=modules,
name=key,
)
async def _start(name: str) -> None:
name = f'{name}.{uid}'
portals[name] = await an.start_actor(
enable_modules=modules,
name=name,
)

n.start_soon(start, index)
for name in names:
n.start_soon(_start, name)

assert len(portals) == count
yield portals

await an.cancel(hard_kill=hard_kill)
38 changes: 15 additions & 23 deletions tractor/trionics/_mngrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
Async context manager primitives with hard ``trio``-aware semantics

'''
from typing import AsyncContextManager
from typing import TypeVar
from typing import AsyncContextManager, AsyncGenerator
from typing import TypeVar, Sequence
from contextlib import asynccontextmanager as acm

import trio
Expand All @@ -13,52 +13,44 @@
T = TypeVar("T")


async def _enter_and_sleep(

async def _enter_and_wait(
mngr: AsyncContextManager[T],
to_yield: dict[int, T],
unwrapped: dict[int, T],
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woa someone has the functional parlance going on 😎

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO that's what this does, it unwraps the thing so you can consume what's inside, so I thought the name was appropriate.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name is great; trio uses the same one internally for wrapping coroutines.

all_entered: trio.Event,
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,

) -> T:
) -> None:
'''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.

'''
async with mngr as value:
to_yield[id(mngr)] = value
unwrapped[id(mngr)] = value

if all(to_yield.values()):
if all(unwrapped.values()):
all_entered.set()

# sleep until cancelled
await trio.sleep_forever()


@acm
async def async_enter_all(

*mngrs: tuple[AsyncContextManager[T]],

) -> tuple[T]:

to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[T, ...], None]:
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)

all_entered = trio.Event()

async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_sleep,
_enter_and_wait,
mngr,
to_yield,
unwrapped,
all_entered,
)

# deliver control once all managers have started up
await all_entered.wait()
yield tuple(to_yield.values())

# tear down all sleeper tasks thus triggering individual
# mngr ``__aexit__()``s.
n.cancel_scope.cancel()
yield tuple(unwrapped.values())

n.cancel_scope.cancel()