-
Notifications
You must be signed in to change notification settings - Fork 5
Wait for upstream object refs before starting task #81
Conversation
@ahuang11 does this look good to you? what would be needed in order to merge this? |
Hey @j-tr — they don't maintain this repository anymore. @desertaxle and I will review. |
@madkinsz any ideas on when this fix might make it into a release? Not sure we can responsibly leverage ray with prefect without this so curious on the timeline. |
@achordia20 this is failing all of the test suites for what looks like an unrelated reason so it looks like some maintenance will need to be done first. Edit: Investigating in #84 |
prefect_ray/task_runners.py
Outdated
@@ -175,7 +181,7 @@ def exchange_prefect_for_ray_future(expr): | |||
return_data=True, | |||
) | |||
|
|||
return kwargs_ray_futures | |||
return kwargs_ray_futures, upstream_ray_obj_refs | |||
|
|||
@staticmethod | |||
def _run_prefect_task(func, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename *args
to *upstream_ray_obj_refs
and add a note about what's going on here?
@@ -145,7 +145,9 @@ async def submit( | |||
"The task runner must be started before submitting work." | |||
) | |||
|
|||
call_kwargs = self._exchange_prefect_for_ray_futures(call.keywords) | |||
call_kwargs, upstream_ray_obj_refs = self._exchange_prefect_for_ray_futures( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may be able to just extract upstream ray object refs in this step then do the exchange with the resolved objects in _run_prefect_task
— seems feasible to do as a follow-up though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true. however, we need to exchange the prefect futures with something else as they are not serializable and cannot be passed to a ray remote function. I think we would need in any case some sort of placeholder object which is then exchanged by the resolved object in _run_prefect_task. but a ray object ref is already a good candidate for such a placeholder and saves us from yet another indirection.
Closes #80
Ray has built-in functionality to wait for the execution of a remote function until all input object refs are available as long as these object refs are direct parameters of the function. If wrapped into other objects, this doesn't work.
Prefect features are located in call.keywords.parameters and therefore not directly passed to the remote function.
By extracting them in exchange_prefect_for_ray_future and explicitly passing them as args to the remote function, in addition to the wrapped parameters, the expected behavior is restored.
Example
Screenshots
Checklist
pre-commit
checks.pre-commit install && pre-commit run --all
locally for formatting and linting.mkdocs serve
view documentation locally.