Skip to content

Commit

Permalink
feat: impl ActorPool (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
zen-xu authored Jan 13, 2025
1 parent 4c83988 commit 070dc6a
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 8 deletions.
8 changes: 1 addition & 7 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,11 @@ jobs:
id: setup-python
with:
python-version: ${{ matrix.python-version }}
check-latest: true

- name: Set up nox
run: pip install nox

- name: Set up cache
uses: actions/cache@v4
id: cache
with:
path: .nox
key: nox-${{ runner.os }}-${{ matrix.python-version }}-${{ hashFiles('noxfile.py') }}

- name: Run unit test
run: |
nox -s "test-${{ matrix.python-version }}" --no-stop-on-first-error -- coverage.${{ runner.os }}-py${{ matrix.python-version }}.xml
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sunray = 'sunray.__main__:cli'
[tool.commitizen]
bump_message = "chore(release): bump version $current_version -> $new_version"
pre_bump_hooks = ["./update-changelog.sh"]
version = "0.10.0" # version anchor
version = "0.10.0" # version anchor
version_files = ["pyproject.toml:version anchor", "sunray/__init__.py"]

[build-system]
Expand Down
109 changes: 109 additions & 0 deletions sunray/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Generic
from typing import TypeVar
from typing import overload

from ray import util as _ray_util

import sunray


if TYPE_CHECKING:
from collections.abc import Generator
from typing import Any
from typing import Callable


_ActorT = TypeVar("_ActorT", bound=sunray.Actor)
_V = TypeVar("_V")
_R = TypeVar("_R")


class ActorPool(Generic[_ActorT], _ray_util.ActorPool): # pragma: no cover
"""Utility class to operate on a fixed pool of actors.
Arguments:
actors: List of Sunray actors to use in this pool.
Examples:
```python
import sunray
from sunray.util import ActorPool
class Actor(sunray.ActorMixin):
@sunray.remote_method
def double(self, v):
return 2 * v
a1, a2 = Actor.new_actor().remote(), Actor.new_actor().remote()
pool = ActorPool([a1, a2])
print(list(pool.map(lambda a, v: a.methods.double.remote(v), [1, 2, 3, 4])))
[2, 4, 6, 8]
```
"""

def __init__(self, actors: list[_ActorT]) -> None:
super().__init__(actors)

if TYPE_CHECKING:

@overload # type: ignore[override]
def map(
self,
fn: Callable[[_ActorT, _V], sunray.ObjectRef[_R]],
values: list[_V],
) -> Generator[_R, None, None]: ...

@overload
def map(
self,
fn: Callable[[_ActorT, _V], _R],
values: list[_V],
) -> Generator[_R, None, None]: ...

def map(
self,
fn: Callable[[_ActorT, _V], _R | sunray.ObjectRef[_R]],
values: list[_V],
) -> Generator[_R, None, None]: ...

@overload # type: ignore[override]
def map_unordered(
self,
fn: Callable[[_ActorT, _V], sunray.ObjectRef[_R]],
values: list[_V],
) -> Generator[_R, None, None]: ...

@overload
def map_unordered(
self,
fn: Callable[[_ActorT, _V], _R],
values: list[_V],
) -> Generator[_R, None, None]: ...

def map_unordered(
self,
fn: Callable[[_ActorT, _V], _R | sunray.ObjectRef[_R]],
values: list[_V],
) -> Generator[_R, None, None]: ...

@overload
def submit(
self, fn: Callable[[_ActorT, _V], sunray.ObjectRef[_R]], value: _V
) -> None: ...

@overload
def submit(self, fn: Callable[[_ActorT, _V], _R], value: _V) -> None: ...

def submit(self, fn: Callable[[_ActorT, _V], Any], value: _V) -> None: ...

def has_next(self) -> bool: ...

def pop_idle(self) -> _ActorT | None: ...

def push(self, actor: _ActorT) -> None: ...
23 changes: 23 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# ruff: noqa: I002
from typing import Tuple # noqa: UP035

import sunray

from sunray._internal.util import get_num_returns
from sunray.util import ActorPool


def func1() -> Tuple[int, float]: # noqa: UP006
Expand All @@ -25,3 +28,23 @@ def test_get_num_returns():
assert get_num_returns(func2) == 2
assert get_num_returns(func3) == 1
assert get_num_returns(func4) == 1


def test_actor_pool(init_ray):
class Actor(sunray.ActorMixin):
@sunray.remote_method
def double(self, v: int) -> int:
return 2 * v

a1, a2 = Actor.new_actor().remote(), Actor.new_actor().remote()
pool = ActorPool([a1, a2])
assert list(pool.map(lambda a, v: a.methods.double.remote(v), [1, 2, 3, 4])) == [
2,
4,
6,
8,
]

assert set(
pool.map_unordered(lambda a, v: a.methods.double.remote(v), [1, 2, 3, 4])
) == {2, 4, 6, 8}

0 comments on commit 070dc6a

Please sign in to comment.