-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature Request] Add parallelism for dsl.ParallelFor #4089
Comments
/assign @Ark-kun |
I would be happy to help out as well! |
This is an interesting feature request. It's not hard to implement, but I wonder whether the parallelism control is common in orchestrators. |
More generic way might be with dsl.Parallelism(2):
with dsl.ParallelFor(loopidy_doop) as item: Such block can be applied to any DAG outside parallel-for. |
The tricky part is to design a way to define the unit of max_parallel_executions/parallelism. The argo yaml below will enforce parallelism limit at task level, rather than sub-DAG level (in this case, 2-step sequential DAG under ParallelFor). - name: for-loop-for-loop-0535d69b-1
parallelism: 2
inputs:
parameters:
- {name: loopidy_doop-loop-item-subvar-a}
dag:
tasks:
- name: my-in-cop1
template: my-in-cop1
dependencies: [sleep-10-seconds]
arguments:
parameters:
- {name: loopidy_doop-loop-item-subvar-a, value: '{{inputs.parameters.loopidy_doop-loop-item-subvar-a}}'}
- {name: sleep-10-seconds, template: sleep-10-seconds} with dsl.ParallelFor(loopidy_doop, parallelism=2) as item:
sleep = sleep_op(10)
op1 = dsl.ContainerOp(
name="my-in-cop1",
image="library/bash",
command=["sh", "-c"],
arguments=["echo no output global op1, item.a: %s" % item.a],
).after(sleep) |
What do you mean by
|
Suppose the ParallelFor above generates 100 sub-DAGs, each contains 2 ops: sleep_op followed echo. One way to enforce parallelism (2) is: 2 of these 100 sub-DAGs are executed first, followed by the next 2, and so on. A different way: put 100 sleep_ops and 100 echo ops together in a group, 2 of these 200 ops are executed first, followed by the next 2, and so on. Certainly the echo.after(seelp_op) is still enforced for a single sub-DAG. |
Thanks for the clarification @hlu09. I as I understand you suggest to keep the parallelism on the sub-DAGs level? Thus allowing X nbr of sub-DAG:s to run in parallel. I also think this makes the most sense from a users perspective. |
I think this example use parallelism as you suggest @hlu09 : https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-dag.yaml |
@NikeNano right, it makes sense to keep the parallelism on the sub-DAGs level, since ParallelFor generates many sub-DAGs. |
Will start the work this work today/tomorrow so we get it rolling! |
…low#4149) * Added parallism at sub-dag level * updated the parallism * remove yaml file * reformatting * Update sdk/python/kfp/compiler/compiler.py * Update sdk/python/kfp/compiler/compiler.py * Update samples/core/loop_parallelism/loop_parallelism.py Co-authored-by: Alexey Volkov <alexey.volkov@ark-kun.com> Co-authored-by: Alexey Volkov <alexey.volkov@ark-kun.com>
Use Case
Large data mining job is often split into many small jobs. Given the limit shared resource of external services (e.g., DataFlow), we can only run a few small jobs simultaneously.
Global parallelism works to some degree but lack flexibility, e.g., with global parallelism at 1, any in-cluster task can block launching jobs to external services.
Argo supports template-level parallelism
https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-workflow.yaml#L19
https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-dag.yaml#L15
Feature request
There will be at most 2 loop-DAG running in parallel.
The text was updated successfully, but these errors were encountered: