Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Wait for upstream object refs before starting task #81

Merged
merged 4 commits into from
Apr 27, 2023

Conversation

j-tr
Copy link
Contributor

@j-tr j-tr commented Apr 14, 2023

Closes #80

Ray has built-in functionality to wait for the execution of a remote function until all input object refs are available as long as these object refs are direct parameters of the function. If wrapped into other objects, this doesn't work.

Prefect features are located in call.keywords.parameters and therefore not directly passed to the remote function.

By extracting them in exchange_prefect_for_ray_future and explicitly passing them as args to the remote function, in addition to the wrapped parameters, the expected behavior is restored.

Example

Screenshots

Checklist

  • References any related issue by including "Closes #" or "Closes ".
    • If no issue exists and your change is not a small fix, please create an issue first.
  • Includes tests or only affects documentation.
  • Passes pre-commit checks.
    • Run pre-commit install && pre-commit run --all locally for formatting and linting.
  • Includes screenshots of documentation updates.
    • Run mkdocs serve view documentation locally.
  • Summarizes PR's changes in CHANGELOG.md

@pcmoritz
Copy link
Contributor

Thanks a lot for submitting this PR, it indeed looks like the right way to do this to me. The tasks will be submitted and the Ray scheduler will wait until they get executed until all args are available -- tasks do not get executed prematurely:

Screen Shot 2023-04-24 at 2 27 55 AM

pcmoritz
pcmoritz previously approved these changes Apr 24, 2023
@j-tr
Copy link
Contributor Author

j-tr commented Apr 25, 2023

@ahuang11 does this look good to you? what would be needed in order to merge this?

@zanieb
Copy link
Contributor

zanieb commented Apr 25, 2023

Hey @j-tr — they don't maintain this repository anymore. @desertaxle and I will review.

@achordia20
Copy link

@madkinsz any ideas on when this fix might make it into a release? Not sure we can responsibly leverage ray with prefect without this so curious on the timeline.

@zanieb
Copy link
Contributor

zanieb commented Apr 26, 2023

@achordia20 this is failing all of the test suites for what looks like an unrelated reason so it looks like some maintenance will need to be done first.

Edit: Investigating in #84

@@ -175,7 +181,7 @@ def exchange_prefect_for_ray_future(expr):
return_data=True,
)

return kwargs_ray_futures
return kwargs_ray_futures, upstream_ray_obj_refs

@staticmethod
def _run_prefect_task(func, *args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename *args to *upstream_ray_obj_refs and add a note about what's going on here?

@@ -145,7 +145,9 @@ async def submit(
"The task runner must be started before submitting work."
)

call_kwargs = self._exchange_prefect_for_ray_futures(call.keywords)
call_kwargs, upstream_ray_obj_refs = self._exchange_prefect_for_ray_futures(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may be able to just extract upstream ray object refs in this step then do the exchange with the resolved objects in _run_prefect_task — seems feasible to do as a follow-up though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. however, we need to exchange the prefect futures with something else as they are not serializable and cannot be passed to a ray remote function. I think we would need in any case some sort of placeholder object which is then exchanged by the resolved object in _run_prefect_task. but a ray object ref is already a good candidate for such a placeholder and saves us from yet another indirection.

@j-tr j-tr marked this pull request as ready for review April 27, 2023 14:34
@j-tr j-tr requested a review from a team April 27, 2023 14:34
@desertaxle desertaxle merged commit ebe388b into PrefectHQ:main Apr 27, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
5 participants