Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prefect-Ray spins up workers before the inputs for the worker tasks are ready / before the task's dependencies are complete #80

Closed
alexub opened this issue Apr 12, 2023 · 6 comments · Fixed by #81

Comments

@alexub
Copy link

alexub commented Apr 12, 2023

Hi folks, we're using Prefect and Ray to orchestrate distributed workflows. When we generate a future x = task_1.submit() , if we pass that future as an input to another task, y = task_2.submit(x=x), Ray will spin up a worker for task_2 with the requested resources, and this worker will wait on x.result().

This is obviously undesirable! If we had x1 = task1(); x2 = task2(x1=x1); x3=task3(x2=x2); ..., all the later tasks in the pipeline would be wasting worker resources while waiting for the earlier workers to complete. We would like the worker for a task to only launch when the task inputs become ready.

Is this known behavior / does anyone know how to solve this?

Here's a working example, using "prefect==2.9.0", "prefect-ray==0.2.4", "ray[default]==2.2.0"

from prefect import flow, task
from prefect_ray import RayTaskRunner
from prefect_ray.context import remote_options
import time

@task
def task_a():
    time.sleep(100)
    return 'a'

@task
def task_b():
    return 'b'

@flow(
    task_runner=RayTaskRunner(
        address="ray://ray-cluster-kuberay-head-svc:10001"
    )
)
def multi_test():
    with remote_options(num_cpus=1, num_gpus=1):
        x = task_a.submit()
    for i in range(10):
        with remote_options(num_cpus=1, num_gpus=1):
            task_b.submit(wait_for=[x])

if __name__ == '__main__':
    print(multi_test())

When run, we immediately get 11 workers created, per the image.
image

@zanieb
Copy link
Contributor

zanieb commented Apr 12, 2023

Hey @alexub thanks for the issue! We made a trade-off here on task run orchestration: if we submit early we can orchestrate more task runs at once because orchestration itself is distributed.

Is there a way to stop Ray from starting a worker per task or a way to cede a task from consuming its annotated resources temporarily?

This will probably requires changes over in PrefectHQ/prefect to have a reasonable solution. We may need to include a toggle that allows orchestration of the task in the flow run rather than on the task runner or we could remove annotations for the function we use to orchestrate the task and then submit the task again from inside the orchestration task with annotations? Complicated 😬

@AndreySantrosyan
Copy link

Hi @madkinsz, thanks for the quick response. I think what we need is simpler than that. The default TaskRunner already does not start the task until all the dependencies clear. We just want to have similar behavior when using Ray, so that a worker for task2 doesn't start or provision resources until task1 is finished.

@alexub
Copy link
Author

alexub commented Apr 12, 2023

Thanks @madkinsz . Early submission is a problem because for chains of dependent GPU tasks, this will cause them to use O(N^2) gpu hours instead of O(N), for a chain of length N.

It looks like the early submission may have been intentionally introduced by #69 ? As a stopgap, is there a way to add a flag in the RayTaskRunner to let the user choose between local orchestration (the old behavior, with its bottlenecks) or early submission (with its extra spend)?

I'd also potentially suggest adding a warning to documentation or at runtime w.r.t. early submission as this behavior isn't what users might expect after using the default TaskRunner.

Longer term, it seems like a solution could be some form of distributed orchestration which does not happen on the same workers the actual job tasks run on. e.g. perhaps the RayTaskRunner could spin up some orchestration tasks, which themselves submit the job tasks, and the number of orchestration tasks could be user configurable to allow for very large task loads.

@j-tr
Copy link
Contributor

j-tr commented Apr 13, 2023

possibly related to PrefectHQ/prefect#8539

@toro-berlin
Copy link
Contributor

Hey together,

I expect that Ray should be able to do the instance provisioning for a task not before the object references of a former task are resolvable.

Our observations with the current implementation:

  • With CPUs, we do not observe this behavior: The (Ray) tasks are in the state running_in_ray_get until the futures are resolved. During this time, the cluster does stop scaling and does not get overprovisioned.
  • With a chain of tasks, whereas several CPU tasks are followed by one GPU task, we see the behavior you describe: The GPUs are started directly at the beginning and are not used till the CPU tasks are ready. This is something we encountered just this week.

I will ping the Anyscale Ray people to see if they can have an eye on this issue and perhaps help with an idea or solution.

@j-tr
Copy link
Contributor

j-tr commented Apr 14, 2023

ray will wait for upstream object refs as long as they are directly passed as parameters. with prefect, the input futures are wrapped in the parameter kwargs so that ray won't use them. created a draft PR #81 that solves this problem by passing the object refs additionally directly as args.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants