Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Resolve Ray Futures Decentralized & Concurrently #69

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Security

## 0.2.3

Released on Februrary 15th, 2023

### Fixed

- Reduce pressure on networking by resolving Ray/Prefect futures concurrently and decentralized right before executing the Prefect Task. - [#69](https://github.com/PrefectHQ/prefect-ray/pull/69)

## 0.2.2

Released on December 1st, 2022
Expand Down
43 changes: 29 additions & 14 deletions prefect_ray/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def submit(
"The task runner must be started before submitting work."
)

call_kwargs = self._optimize_futures(call.keywords)
call_kwargs = self._exchange_prefect_for_ray_futures(call.keywords)

remote_options = RemoteOptionsContext.get().current_remote_options
# Ray does not support the submission of async functions and we must create a
Expand All @@ -154,27 +154,42 @@ async def submit(
ray_decorator = ray.remote(**remote_options)
else:
ray_decorator = ray.remote
self._ray_refs[key] = ray_decorator(sync_compatible(call.func)).remote(
**call_kwargs
self._ray_refs[key] = ray_decorator(self._run_prefect_task).remote(
sync_compatible(call.func), **call_kwargs
)

def _optimize_futures(self, expr):
"""
Exchange PrefectFutures for ray-compatible futures
"""
def _exchange_prefect_for_ray_futures(self, kwargs_prefect_futures):
"""Exchanges Prefect futures for Ray futures."""

def visit_fn(expr):
"""
Resolves ray futures when used as dependencies
"""
def exchange_prefect_for_ray_future(expr):
"""Exchanges Prefect future for Ray future."""
if isinstance(expr, PrefectFuture):
ray_future = self._ray_refs.get(expr.key)
if ray_future is not None:
return ray.get(ray_future)
# Fallback to return the expression unaltered
return ray_future
return expr

return visit_collection(expr, visit_fn=visit_fn, return_data=True)
kwargs_ray_futures = visit_collection(
kwargs_prefect_futures,
visit_fn=exchange_prefect_for_ray_future,
return_data=True,
)

return kwargs_ray_futures

@staticmethod
def _run_prefect_task(func, *args, **kwargs):
"""Resolves Ray futures before calling the actual Prefect task function."""

def resolve_ray_future(expr):
"""Resolves Ray future."""
if isinstance(expr, ray.ObjectRef):
return ray.get(expr)
return expr

kwargs = visit_collection(kwargs, visit_fn=resolve_ray_future, return_data=True)

return func(*args, **kwargs)

async def wait(self, key: UUID, timeout: float = None) -> Optional[State]:
ref = self._get_ray_ref(key)
Expand Down
19 changes: 19 additions & 0 deletions tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,22 @@ def my_flow():
process.submit(42)

my_flow()

def test_dependencies(self):
@task
def a():
time.sleep(self.get_sleep_time())

b = c = d = e = a

@flow(task_runner=RayTaskRunner())
def flow_with_dependent_tasks():
for _ in range(3):
a_future = a.submit(wait_for=[])
b_future = b.submit(wait_for=[a_future])

c.submit(wait_for=[b_future])
d.submit(wait_for=[b_future])
e.submit(wait_for=[b_future])

flow_with_dependent_tasks()