Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #54 from PrefectHQ/resources_context_manager
Browse files Browse the repository at this point in the history
Add resources context manager to set CPUs/GPUs for task
  • Loading branch information
ahuang11 authored Oct 25, 2022
2 parents d657f9e + 810b996 commit 6089262
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 8 deletions.
12 changes: 7 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `remote_options` context manager for managing Ray remote options for a task - [#54](https://github.com/PrefectHQ/prefect-ray/pull/54)

### Changed

### Deprecated
Expand All @@ -24,26 +26,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Released September 8th, 2022

### Added
- `pickle5` requirement for Python < 3.8 users [#30](https://github.com/PrefectHQ/prefect-ray/pull/30).
- `pickle5` requirement for Python < 3.8 users - [#30](https://github.com/PrefectHQ/prefect-ray/pull/30).

### Fixed
- Updated `RayTaskRunner` to be compatible with the updated `TaskRunner` interface in the Prefect Core library (v2.3.0) [#35](https://github.com/PrefectHQ/prefect-ray/pull/35)
- Updated `RayTaskRunner` to be compatible with the updated `TaskRunner` interface in the Prefect Core library (v2.3.0) - [#35](https://github.com/PrefectHQ/prefect-ray/pull/35)

## 0.1.4

Released on August 2nd, 2022.

### Added
- `pickle5` requirement for Python < 3.8 users [#30](https://github.com/PrefectHQ/prefect-ray/pull/30).
- `pickle5` requirement for Python < 3.8 users - [#30](https://github.com/PrefectHQ/prefect-ray/pull/30).

## 0.1.3

Released on July 26th, 2022.

### Changed

- Dropping x86_64 requirement so ray can be automatically installed [#29](https://github.com/PrefectHQ/prefect-ray/pull/29).
- Examples to better exemplify parallelism [#29](https://github.com/PrefectHQ/prefect-ray/pull/29).
- Dropping x86_64 requirement so ray can be automatically installed - [#29](https://github.com/PrefectHQ/prefect-ray/pull/29).
- Examples to better exemplify parallelism - [#29](https://github.com/PrefectHQ/prefect-ray/pull/29).

## 0.1.2

Expand Down
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,29 @@ pip install s3fs
RayTaskRunner(address="ray://1.23.199.255:10001")
```

## Specifying remote options

The `remote_options` context can be used to control the task’s remote options.

For example, we can set the number of CPUs and GPUs to use for the `process` task:

```python
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def process(x):
return x + 1


@flow(task_runner=RayTaskRunner())
def my_flow():
# equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
with remote_options(num_cpus=4, num_gpus=2):
process.submit(42)
```

## Resources

If you encounter and bugs while using `prefect-ray`, feel free to open an issue in the [prefect-ray](https://github.com/PrefectHQ/prefect-ray) repository.
Expand Down
65 changes: 65 additions & 0 deletions prefect_ray/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Contexts to manage Ray clusters and tasks.
"""

from contextlib import contextmanager
from typing import Any, Dict

from prefect.context import ContextModel, ContextVar
from pydantic import Field


class RemoteOptionsContext(ContextModel):
"""
The context for Ray remote_options management.
Attributes:
current_remote_options: A set of current remote_options in the context.
"""

current_remote_options: Dict[str, Any] = Field(default_factory=dict)

@classmethod
def get(cls) -> "RemoteOptionsContext":
"""
Return an empty `RemoteOptionsContext`
instead of `None` if no context exists.
"""
return cls.__var__.get(RemoteOptionsContext())

__var__ = ContextVar("remote_options")


@contextmanager
def remote_options(**new_remote_options: Dict[str, Any]) -> Dict[str, Any]:
"""
Context manager to add keyword arguments to Ray `@remote` calls
for task runs. If contexts are nested, new options are merged with options
in the outer context. If a key is present in both, the new option will be used.
Yields:
The current set of remote options.
Examples:
Use 4 CPUs and 2 GPUs for the `process` task:
```python
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options
@task
def process(x):
return x + 1
@flow(task_runner=RayTaskRunner())
def my_flow():
# equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
with remote_options(num_cpus=4, num_gpus=2):
process.submit(42)
```
"""
current_remote_options = RemoteOptionsContext.get().current_remote_options
with RemoteOptionsContext(
current_remote_options={**current_remote_options, **new_remote_options}
):
yield
9 changes: 6 additions & 3 deletions prefect_ray/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def count_to(highest_number):
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.collections import visit_collection

from prefect_ray.context import RemoteOptionsContext


class RayTaskRunner(BaseTaskRunner):
"""
Expand Down Expand Up @@ -144,11 +146,12 @@ async def submit(

call_kwargs = self._optimize_futures(call.keywords)

remote_options = RemoteOptionsContext.get().current_remote_options
# Ray does not support the submission of async functions and we must create a
# sync entrypoint
self._ray_refs[key] = ray.remote(sync_compatible(call.func)).remote(
**call_kwargs
)
self._ray_refs[key] = ray.remote(
sync_compatible(call.func), **remote_options
).remote(**call_kwargs)

def _optimize_futures(self, expr):
"""
Expand Down
33 changes: 33 additions & 0 deletions tests/test_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from prefect_ray.context import RemoteOptionsContext, remote_options


def test_remote_options_context():
input_remote_options = {"num_cpus": 4}
current_remote_options = RemoteOptionsContext(
current_remote_options=input_remote_options
).current_remote_options
assert current_remote_options == input_remote_options


def test_remote_options_context_get():
current_remote_options = RemoteOptionsContext.get().current_remote_options
assert current_remote_options == {}


def test_remote_options():
with remote_options(num_cpus=4, num_gpus=None):
current_remote_options = RemoteOptionsContext.get().current_remote_options
assert current_remote_options == {"num_cpus": 4, "num_gpus": None}


def test_remote_options_empty():
with remote_options():
current_remote_options = RemoteOptionsContext.get().current_remote_options
assert current_remote_options == {}


def test_remote_options_override():
with remote_options(num_cpus=2, num_gpus=1):
with remote_options(num_cpus=4, num_gpus=2):
current_remote_options = RemoteOptionsContext.get().current_remote_options
assert current_remote_options == {"num_cpus": 4, "num_gpus": 2}

0 comments on commit 6089262

Please sign in to comment.