Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trionics #241

Merged
merged 32 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
340ddba
Rename the nursery module to `_supervise`
goodboy Oct 4, 2021
680a841
Start `trionics` sub-pkg with `async_enter_all()`
goodboy Oct 4, 2021
4114eb1
Move broadcast channel parts into trionics
goodboy Oct 4, 2021
1e917fd
Add an async actor cluster spawner prototype
goodboy Oct 4, 2021
79fb1d0
Fix top level nursery import
goodboy Oct 4, 2021
97006c9
Expose `Lagged` for broadcasting
goodboy Oct 4, 2021
8ba1031
Fix type path to new `_supervise` mod
goodboy Oct 4, 2021
9ddd757
Lul, fix everything for cluster helper
goodboy Oct 4, 2021
c372367
Fix *args-like type annot
goodboy Oct 5, 2021
76767a3
Add 'trio.trionics' to setup.py
overclockworked64 Oct 16, 2021
7d502ce
Add 'open_actor_cluster' to __all__
overclockworked64 Oct 16, 2021
21afc69
Postpone evaluation of annotations
overclockworked64 Oct 16, 2021
2815f1c
Make 'async_enter_all' take a teardown trigger which '_enter_and_wait…
overclockworked64 Oct 16, 2021
73cbb23
Avoid RuntimeError by not using current_actor's uid
overclockworked64 Oct 16, 2021
6e6baf2
Make sure the ID is a str
overclockworked64 Oct 16, 2021
6f9229c
Cancel nursery
overclockworked64 Oct 16, 2021
3130a04
Rename a variable and fix type annotations
overclockworked64 Oct 17, 2021
c1089db
Add a clustering test
overclockworked64 Oct 17, 2021
b7a4641
Allow specifying start_method and hard_kill
overclockworked64 Oct 17, 2021
04895b9
Get rid of dumb random uid and use current actor's uid
overclockworked64 Oct 22, 2021
87e3d32
Get rid of external teardown trigger because #245 resolves the problem
overclockworked64 Oct 22, 2021
b91adcf
Get rid of external teardown trigger
overclockworked64 Oct 22, 2021
71b8f9f
Merge pull request #252 from goodboy/246_facepalm_backup
goodboy Oct 23, 2021
5040035
Fix type annotations
overclockworked64 Oct 23, 2021
ebf080b
Merge pull request #253 from overclockworked64/fix-type-annotation
goodboy Oct 23, 2021
d0f5c7a
Change to `gather_contexts()`, use event for graceful exit
goodboy Oct 24, 2021
925af28
Merge pull request #254 from goodboy/graceful_gather
goodboy Oct 25, 2021
083b73a
Test: don't grab debug lock if not in mode
goodboy Oct 25, 2021
c7f59bd
Add a news fragment
overclockworked64 Oct 25, 2021
49dd230
Add a newline
overclockworked64 Oct 25, 2021
6da7694
Fix the syntax and point to the new package
overclockworked64 Oct 27, 2021
9c13827
Merge pull request #256 from overclockworked64/241-news-fragment
goodboy Oct 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/debugging/fast_error_in_root_after_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def sleep(


async def open_ctx(
n: tractor._trionics.ActorNursery
n: tractor._supervise.ActorNursery
):

# spawn both actors
Expand Down
6 changes: 6 additions & 0 deletions newsfragments/241.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Introduce a new `sub-package`_ that exposes all our high(er) level trio primitives and goodies, most importantly:

- A new ``open_actor_cluster`` procedure is available for concurrently spawning a number of actors.
- A new ``gather_contexts`` procedure is available for concurrently entering a sequence of async context managers.

.. _sub-package: ../tractor/trionics
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
platforms=['linux', 'windows'],
packages=[
'tractor',
'tractor.trionics',
'tractor.testing',
],
install_requires=[
Expand Down
37 changes: 37 additions & 0 deletions tests/test_clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import itertools

import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import gather_contexts

from conftest import tractor_test


MESSAGE = 'tractoring at full speed'


@tractor.context
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE


@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
gather_contexts(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
gather_contexts(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)
3 changes: 1 addition & 2 deletions tests/test_task_broadcasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import trio
from trio.lowlevel import current_task
import tractor
from tractor._broadcast import broadcast_receiver, Lagged
from tractor.trionics import broadcast_receiver, Lagged


@tractor.context
Expand Down Expand Up @@ -432,7 +432,6 @@ async def main():
tx, rx = trio.open_memory_channel(1)
brx = broadcast_receiver(rx, 1)
cs = trio.CancelScope()
sequence = list(range(3))

async def sub_and_recv():
with cs:
Expand Down
4 changes: 3 additions & 1 deletion tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from trio import MultiError

from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
Context,
Expand All @@ -13,7 +14,7 @@
context,
)
from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._trionics import open_nursery
from ._supervise import open_nursery
from ._state import current_actor, is_root_process
from ._exceptions import (
RemoteActorError,
Expand All @@ -39,6 +40,7 @@
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'Portal',
Expand Down
53 changes: 53 additions & 0 deletions tractor/_clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'''
Actor cluster helpers.

'''
from __future__ import annotations

from contextlib import asynccontextmanager as acm
from multiprocessing import cpu_count
from typing import AsyncGenerator, Optional

import trio
import tractor


@acm
async def open_actor_cluster(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be the answer to the process pool question right? looks great!

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More or less yah.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the last question here is how do we export this?

Should it be top level from tractor import open_actor_cluster or maybe should be export via something like tractor.builtin, tractor.extras?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also change async_enter_all to mass_aenter, for example.

Copy link
Owner Author

@goodboy goodboy Oct 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, <something_aenter() is interesting, though i wonder if we should try to mimic what contextlib stack apis offer? Not that any of them have this kind of interface per say. mass_ seems a bit foreign to me.

I think we need to emphasize that the entering is done concurrently, not just that __aenter__() is being called (which is obviously async). I think in worker pool parlance this is done with something like pool.map() but the difference here is that we're not running functions and collecting output - it's more of a setup/teardown type thing..

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about enter_all_soon()?

Copy link
Owner Author

@goodboy goodboy Oct 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncExitStack api for reference.

modules: list[str],
count: int = cpu_count(),
names: Optional[list[str]] = None,
start_method: Optional[str] = None,
hard_kill: bool = False,
) -> AsyncGenerator[
dict[str, tractor.Portal],
None,
]:

portals: dict[str, tractor.Portal] = {}

if not names:
names = [f'worker_{i}' for i in range(count)]

if not len(names) == count:
raise ValueError(
'Number of names is {len(names)} but count it {count}')

async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n:
uid = tractor.current_actor().uid

async def _start(name: str) -> None:
name = f'{name}.{uid}'
portals[name] = await an.start_actor(
enable_modules=modules,
name=name,
)

for name in names:
n.start_soon(_start, name)

assert len(portals) == count
yield portals

await an.cancel(hard_kill=hard_kill)
3 changes: 3 additions & 0 deletions tractor/_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ async def acquire_debug_lock(
Grab root's debug lock on entry, release on exit.

'''
if not debug_mode():
return

async with trio.open_nursery() as n:
cs = await n.start(
wait_for_parent_stdin_hijack,
Expand Down
2 changes: 1 addition & 1 deletion tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from ._ipc import Channel
from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor
from ._broadcast import broadcast_receiver, BroadcastReceiver
from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver


log = get_logger(__name__)
Expand Down
File renamed without changes.
14 changes: 14 additions & 0 deletions tractor/trionics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'''
Sugary patterns for trio + tractor designs.

'''
from ._mngrs import gather_contexts
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged


__all__ = [
'gather_contexts',
'broadcast_receiver',
'BroadcastReceiver',
'Lagged',
]
File renamed without changes.
78 changes: 78 additions & 0 deletions tractor/trionics/_mngrs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
'''
Async context manager primitives with hard ``trio``-aware semantics

'''
from typing import AsyncContextManager, AsyncGenerator
from typing import TypeVar, Sequence
from contextlib import asynccontextmanager as acm

import trio


# A regular invariant generic type
T = TypeVar("T")


async def _enter_and_wait(

mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
all_entered: trio.Event,
parent_exit: trio.Event,

) -> None:
'''
Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.

'''
async with mngr as value:
unwrapped[id(mngr)] = value

if all(unwrapped.values()):
all_entered.set()

await parent_exit.wait()


@acm
async def gather_contexts(

mngrs: Sequence[AsyncContextManager[T]],

) -> AsyncGenerator[tuple[T, ...], None]:
'''
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
same order once all managers have entered. On exit all contexts are
subsequently and concurrently exited.

This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited cancellation just works.

'''
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)

all_entered = trio.Event()
parent_exit = trio.Event()

async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_wait,
mngr,
unwrapped,
all_entered,
parent_exit,
)

# deliver control once all managers have started up
await all_entered.wait()

yield tuple(unwrapped.values())

# we don't need a try/finally since cancellation will be triggered
# by the surrounding nursery on error.
parent_exit.set()