Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Resolve Ray Futures Decentralized & Concurrently (#69)
Browse files Browse the repository at this point in the history
* resolve ray futures decentralized & concurrently

* improve naming & apply hooks

* add docstrings

* add test case with dependencies

* Add v0.2.3 changelog entry

* v0.2.3 changelog entry - move to fixed

* v0.2.3 changelog entry - reformulate
  • Loading branch information
toro-berlin authored Feb 15, 2023
1 parent c531d4b commit 2989c7d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 14 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Security

## 0.2.3

Released on Februrary 15th, 2023

### Fixed

- Reduce pressure on networking by resolving Ray/Prefect futures concurrently and decentralized right before executing the Prefect Task. - [#69](https://github.com/PrefectHQ/prefect-ray/pull/69)

## 0.2.2

Released on December 1st, 2022
Expand Down
43 changes: 29 additions & 14 deletions prefect_ray/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def submit(
"The task runner must be started before submitting work."
)

call_kwargs = self._optimize_futures(call.keywords)
call_kwargs = self._exchange_prefect_for_ray_futures(call.keywords)

remote_options = RemoteOptionsContext.get().current_remote_options
# Ray does not support the submission of async functions and we must create a
Expand All @@ -154,27 +154,42 @@ async def submit(
ray_decorator = ray.remote(**remote_options)
else:
ray_decorator = ray.remote
self._ray_refs[key] = ray_decorator(sync_compatible(call.func)).remote(
**call_kwargs
self._ray_refs[key] = ray_decorator(self._run_prefect_task).remote(
sync_compatible(call.func), **call_kwargs
)

def _optimize_futures(self, expr):
"""
Exchange PrefectFutures for ray-compatible futures
"""
def _exchange_prefect_for_ray_futures(self, kwargs_prefect_futures):
"""Exchanges Prefect futures for Ray futures."""

def visit_fn(expr):
"""
Resolves ray futures when used as dependencies
"""
def exchange_prefect_for_ray_future(expr):
"""Exchanges Prefect future for Ray future."""
if isinstance(expr, PrefectFuture):
ray_future = self._ray_refs.get(expr.key)
if ray_future is not None:
return ray.get(ray_future)
# Fallback to return the expression unaltered
return ray_future
return expr

return visit_collection(expr, visit_fn=visit_fn, return_data=True)
kwargs_ray_futures = visit_collection(
kwargs_prefect_futures,
visit_fn=exchange_prefect_for_ray_future,
return_data=True,
)

return kwargs_ray_futures

@staticmethod
def _run_prefect_task(func, *args, **kwargs):
"""Resolves Ray futures before calling the actual Prefect task function."""

def resolve_ray_future(expr):
"""Resolves Ray future."""
if isinstance(expr, ray.ObjectRef):
return ray.get(expr)
return expr

kwargs = visit_collection(kwargs, visit_fn=resolve_ray_future, return_data=True)

return func(*args, **kwargs)

async def wait(self, key: UUID, timeout: float = None) -> Optional[State]:
ref = self._get_ray_ref(key)
Expand Down
19 changes: 19 additions & 0 deletions tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,22 @@ def my_flow():
process.submit(42)

my_flow()

def test_dependencies(self):
@task
def a():
time.sleep(self.get_sleep_time())

b = c = d = e = a

@flow(task_runner=RayTaskRunner())
def flow_with_dependent_tasks():
for _ in range(3):
a_future = a.submit(wait_for=[])
b_future = b.submit(wait_for=[a_future])

c.submit(wait_for=[b_future])
d.submit(wait_for=[b_future])
e.submit(wait_for=[b_future])

flow_with_dependent_tasks()

0 comments on commit 2989c7d

Please sign in to comment.