Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Add parallelism for dsl.ParallelFor #4089

Closed
hlu09 opened this issue Jun 26, 2020 · 11 comments
Closed

[Feature Request] Add parallelism for dsl.ParallelFor #4089

hlu09 opened this issue Jun 26, 2020 · 11 comments
Assignees
Labels
area/sdk/dsl kind/feature status/triaged Whether the issue has been explicitly triaged

Comments

@hlu09
Copy link
Contributor

hlu09 commented Jun 26, 2020

Use Case

Large data mining job is often split into many small jobs. Given the limit shared resource of external services (e.g., DataFlow), we can only run a few small jobs simultaneously.

Global parallelism works to some degree but lack flexibility, e.g., with global parallelism at 1, any in-cluster task can block launching jobs to external services.

Argo supports template-level parallelism
https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-workflow.yaml#L19
https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-dag.yaml#L15

Feature request

with dsl.ParallelFor(loopidy_doop, parallelism=2) as item:
    // DAG in loop here

There will be at most 2 loop-DAG running in parallel.

@Bobgy
Copy link
Contributor

Bobgy commented Jun 28, 2020

/assign @Ark-kun
/area sdk/dsl

@Bobgy Bobgy added kind/feature status/triaged Whether the issue has been explicitly triaged labels Jun 28, 2020
@NikeNano
Copy link
Member

I would be happy to help out as well!
/assign

@Ark-kun
Copy link
Contributor

Ark-kun commented Jun 29, 2020

This is an interesting feature request. It's not hard to implement, but I wonder whether the parallelism control is common in orchestrators.
It looks like in Argo the parallelism option can be applied to any DAG. I wonder whether we should do the same and make the max_parallel_executions the property of the OpsGroup.

@hlu09
Copy link
Contributor Author

hlu09 commented Jun 29, 2020

More generic way might be

    with dsl.Parallelism(2):
        with dsl.ParallelFor(loopidy_doop) as item:

Such block can be applied to any DAG outside parallel-for.

@hlu09
Copy link
Contributor Author

hlu09 commented Jun 29, 2020

The tricky part is to design a way to define the unit of max_parallel_executions/parallelism. The argo yaml below will enforce parallelism limit at task level, rather than sub-DAG level (in this case, 2-step sequential DAG under ParallelFor).

  - name: for-loop-for-loop-0535d69b-1
    parallelism: 2
    inputs:
      parameters:
      - {name: loopidy_doop-loop-item-subvar-a}
    dag:
      tasks:
      - name: my-in-cop1
        template: my-in-cop1
        dependencies: [sleep-10-seconds]
        arguments:
          parameters:
          - {name: loopidy_doop-loop-item-subvar-a, value: '{{inputs.parameters.loopidy_doop-loop-item-subvar-a}}'}
      - {name: sleep-10-seconds, template: sleep-10-seconds}
        with dsl.ParallelFor(loopidy_doop, parallelism=2) as item:
            sleep = sleep_op(10)
            op1 = dsl.ContainerOp(
                name="my-in-cop1",
                image="library/bash",
                command=["sh", "-c"],
                arguments=["echo no output global op1, item.a: %s" % item.a],
            ).after(sleep)

@NikeNano
Copy link
Member

The tricky part is to design a way to define the unit of max_parallel_executions/parallelism. The argo yaml below will enforce parallelism limit at task level, rather than sub-DAG level (in this case, 2-step sequential DAG under ParallelFor).

What do you mean by task levelvs ´sub-DAG` level? Do you mean that the for each DAG within the for loop there should be a limit not on the task it self? @hlu09?

I wonder whether we should do the same and make the max_parallel_executions the property of the OpsGroup. I think it makes sense for any ops that share resources to have the option and thus set it in the OpsGroup @Ark-kun.

@hlu09
Copy link
Contributor Author

hlu09 commented Jun 30, 2020

Suppose the ParallelFor above generates 100 sub-DAGs, each contains 2 ops: sleep_op followed echo.

One way to enforce parallelism (2) is: 2 of these 100 sub-DAGs are executed first, followed by the next 2, and so on.

A different way: put 100 sleep_ops and 100 echo ops together in a group, 2 of these 200 ops are executed first, followed by the next 2, and so on. Certainly the echo.after(seelp_op) is still enforced for a single sub-DAG.

@NikeNano
Copy link
Member

NikeNano commented Jul 1, 2020

Thanks for the clarification @hlu09. I as I understand you suggest to keep the parallelism on the sub-DAGs level? Thus allowing X nbr of sub-DAG:s to run in parallel. I also think this makes the most sense from a users perspective.

@NikeNano
Copy link
Member

NikeNano commented Jul 1, 2020

I think this example use parallelism as you suggest @hlu09 : https://github.com/argoproj/argo/blob/master/examples/parallelism-nested-dag.yaml

@hlu09
Copy link
Contributor Author

hlu09 commented Jul 1, 2020

@NikeNano right, it makes sense to keep the parallelism on the sub-DAGs level, since ParallelFor generates many sub-DAGs.

@NikeNano
Copy link
Member

NikeNano commented Jul 6, 2020

Will start the work this work today/tomorrow so we get it rolling!

Jeffwan pushed a commit to Jeffwan/pipelines that referenced this issue Dec 9, 2020
…low#4149)

* Added parallism at sub-dag level

* updated the parallism

* remove yaml file

* reformatting

* Update sdk/python/kfp/compiler/compiler.py

* Update sdk/python/kfp/compiler/compiler.py

* Update samples/core/loop_parallelism/loop_parallelism.py

Co-authored-by: Alexey Volkov <alexey.volkov@ark-kun.com>

Co-authored-by: Alexey Volkov <alexey.volkov@ark-kun.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sdk/dsl kind/feature status/triaged Whether the issue has been explicitly triaged
Projects
None yet
Development

No branches or pull requests

5 participants