-
Notifications
You must be signed in to change notification settings - Fork 5
Prefect-Ray spins up workers before the inputs for the worker tasks are ready / before the task's dependencies are complete #80
Comments
Hey @alexub thanks for the issue! We made a trade-off here on task run orchestration: if we submit early we can orchestrate more task runs at once because orchestration itself is distributed. Is there a way to stop Ray from starting a worker per task or a way to cede a task from consuming its annotated resources temporarily? This will probably requires changes over in PrefectHQ/prefect to have a reasonable solution. We may need to include a toggle that allows orchestration of the task in the flow run rather than on the task runner or we could remove annotations for the function we use to orchestrate the task and then submit the task again from inside the orchestration task with annotations? Complicated 😬 |
Hi @madkinsz, thanks for the quick response. I think what we need is simpler than that. The default TaskRunner already does not start the task until all the dependencies clear. We just want to have similar behavior when using Ray, so that a worker for task2 doesn't start or provision resources until task1 is finished. |
Thanks @madkinsz . Early submission is a problem because for chains of dependent GPU tasks, this will cause them to use O(N^2) gpu hours instead of O(N), for a chain of length N. It looks like the early submission may have been intentionally introduced by #69 ? As a stopgap, is there a way to add a flag in the RayTaskRunner to let the user choose between local orchestration (the old behavior, with its bottlenecks) or early submission (with its extra spend)? I'd also potentially suggest adding a warning to documentation or at runtime w.r.t. early submission as this behavior isn't what users might expect after using the default TaskRunner. Longer term, it seems like a solution could be some form of distributed orchestration which does not happen on the same workers the actual job tasks run on. e.g. perhaps the RayTaskRunner could spin up some orchestration tasks, which themselves submit the job tasks, and the number of orchestration tasks could be user configurable to allow for very large task loads. |
possibly related to PrefectHQ/prefect#8539 |
Hey together, I expect that Ray should be able to do the instance provisioning for a task not before the object references of a former task are resolvable.
Our observations with the current implementation:
I will ping the Anyscale Ray people to see if they can have an eye on this issue and perhaps help with an idea or solution. |
ray will wait for upstream object refs as long as they are directly passed as parameters. with prefect, the input futures are wrapped in the parameter kwargs so that ray won't use them. created a draft PR #81 that solves this problem by passing the object refs additionally directly as args. |
Hi folks, we're using Prefect and Ray to orchestrate distributed workflows. When we generate a future x = task_1.submit() , if we pass that future as an input to another task, y = task_2.submit(x=x), Ray will spin up a worker for task_2 with the requested resources, and this worker will wait on x.result().
This is obviously undesirable! If we had x1 = task1(); x2 = task2(x1=x1); x3=task3(x2=x2); ..., all the later tasks in the pipeline would be wasting worker resources while waiting for the earlier workers to complete. We would like the worker for a task to only launch when the task inputs become ready.
Is this known behavior / does anyone know how to solve this?
Here's a working example, using
"prefect==2.9.0", "prefect-ray==0.2.4", "ray[default]==2.2.0"
When run, we immediately get 11 workers created, per the image.
The text was updated successfully, but these errors were encountered: