Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add resources context manager to set CPUs/GPUs for task #54

Merged
merged 5 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `remote_options` context manager for managing Ray remote options for a task - [#54](https://github.com/PrefectHQ/prefect-ray/pull/54)

### Changed

### Deprecated
Expand All @@ -24,26 +26,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Released September 8th, 2022

### Added
- `pickle5` requirement for Python < 3.8 users [#30](https://github.com/PrefectHQ/prefect-ray/pull/30).
- `pickle5` requirement for Python < 3.8 users - [#30](https://github.com/PrefectHQ/prefect-ray/pull/30).

### Fixed
- Updated `RayTaskRunner` to be compatible with the updated `TaskRunner` interface in the Prefect Core library (v2.3.0) [#35](https://github.com/PrefectHQ/prefect-ray/pull/35)
- Updated `RayTaskRunner` to be compatible with the updated `TaskRunner` interface in the Prefect Core library (v2.3.0) - [#35](https://github.com/PrefectHQ/prefect-ray/pull/35)

## 0.1.4

Released on August 2nd, 2022.

### Added
- `pickle5` requirement for Python < 3.8 users [#30](https://github.com/PrefectHQ/prefect-ray/pull/30).
- `pickle5` requirement for Python < 3.8 users - [#30](https://github.com/PrefectHQ/prefect-ray/pull/30).

## 0.1.3

Released on July 26th, 2022.

### Changed

- Dropping x86_64 requirement so ray can be automatically installed [#29](https://github.com/PrefectHQ/prefect-ray/pull/29).
- Examples to better exemplify parallelism [#29](https://github.com/PrefectHQ/prefect-ray/pull/29).
- Dropping x86_64 requirement so ray can be automatically installed - [#29](https://github.com/PrefectHQ/prefect-ray/pull/29).
- Examples to better exemplify parallelism - [#29](https://github.com/PrefectHQ/prefect-ray/pull/29).

## 0.1.2

Expand Down
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,29 @@ pip install s3fs
RayTaskRunner(address="ray://1.23.199.255:10001")
```

## Specifying remote options

The `remote_options` context can be used to control the task’s remote options.

For example, we can set the number of CPUs and GPUs to use for the `process` task:

```python
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def process(x):
return x + 1


@flow(task_runner=RayTaskRunner())
def my_flow():
# equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
with remote_options(num_cpus=4, num_gpus=2):
process.submit(42)
```

## Resources

If you encounter and bugs while using `prefect-ray`, feel free to open an issue in the [prefect-ray](https://github.com/PrefectHQ/prefect-ray) repository.
Expand Down
65 changes: 65 additions & 0 deletions prefect_ray/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Contexts to manage Ray clusters and tasks.
"""

from contextlib import contextmanager
from typing import Any, Dict

from prefect.context import ContextModel, ContextVar
from pydantic import Field


class RemoteOptionsContext(ContextModel):
"""
The context for Ray remote_options management.

Attributes:
current_remote_options: A set of current remote_options in the context.
"""

current_remote_options: Dict[str, Any] = Field(default_factory=dict)

@classmethod
def get(cls) -> "RemoteOptionsContext":
"""
Return an empty `RemoteOptionsContext`
instead of `None` if no context exists.
"""
return cls.__var__.get(RemoteOptionsContext())

__var__ = ContextVar("remote_options")


@contextmanager
def remote_options(**new_remote_options: Dict[str, Any]) -> Dict[str, Any]:
"""
Context manager to add keyword arguments to Ray `@remote` calls
for task runs. If contexts are nested, new options are merged with options
in the outer context. If a key is present in both, the new option will be used.

Yields:
The current set of remote options.

Examples:
Use 4 CPUs and 2 GPUs for the `process` task:
```python
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def process(x):
return x + 1

@flow(task_runner=RayTaskRunner())
def my_flow():
# equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
with remote_options(num_cpus=4, num_gpus=2):
process.submit(42)
```
"""
current_remote_options = RemoteOptionsContext.get().current_remote_options
with RemoteOptionsContext(
current_remote_options={**current_remote_options, **new_remote_options}
):
yield
9 changes: 6 additions & 3 deletions prefect_ray/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def count_to(highest_number):
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.collections import visit_collection

from prefect_ray.context import RemoteOptionsContext


class RayTaskRunner(BaseTaskRunner):
"""
Expand Down Expand Up @@ -144,11 +146,12 @@ async def submit(

call_kwargs = self._optimize_futures(call.keywords)

remote_options = RemoteOptionsContext.get().current_remote_options
# Ray does not support the submission of async functions and we must create a
# sync entrypoint
self._ray_refs[key] = ray.remote(sync_compatible(call.func)).remote(
**call_kwargs
)
self._ray_refs[key] = ray.remote(
sync_compatible(call.func), **remote_options
).remote(**call_kwargs)

def _optimize_futures(self, expr):
"""
Expand Down
33 changes: 33 additions & 0 deletions tests/test_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from prefect_ray.context import RemoteOptionsContext, remote_options


def test_remote_options_context():
input_remote_options = {"num_cpus": 4}
current_remote_options = RemoteOptionsContext(
current_remote_options=input_remote_options
).current_remote_options
assert current_remote_options == input_remote_options


def test_remote_options_context_get():
current_remote_options = RemoteOptionsContext.get().current_remote_options
assert current_remote_options == {}


def test_remote_options():
with remote_options(num_cpus=4, num_gpus=None):
current_remote_options = RemoteOptionsContext.get().current_remote_options
assert current_remote_options == {"num_cpus": 4, "num_gpus": None}


def test_remote_options_empty():
with remote_options():
current_remote_options = RemoteOptionsContext.get().current_remote_options
assert current_remote_options == {}


def test_remote_options_override():
with remote_options(num_cpus=2, num_gpus=1):
with remote_options(num_cpus=4, num_gpus=2):
current_remote_options = RemoteOptionsContext.get().current_remote_options
assert current_remote_options == {"num_cpus": 4, "num_gpus": 2}