diff --git a/examples/parallelism-nested-dag.yaml b/examples/parallelism-nested-dag.yaml new file mode 100644 index 000000000000..bcc7bd6ca064 --- /dev/null +++ b/examples/parallelism-nested-dag.yaml @@ -0,0 +1,87 @@ +# Example on specifying parallelism on the outer DAG and limiting the number of its +# children DAGs to be run at the same time. +# +# As the parallelism of A is 2, only two of the three DAGs (b2, b3, b4) will start +# running after b1 is finished, and the left DAG will run after either one is finished. + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: parallelism-nested-dag- +spec: + entrypoint: A + templates: + - name: A + parallelism: 2 + dag: + tasks: + - name: b1 + template: B + arguments: + parameters: + - name: msg + value: "1" + - name: b2 + template: B + dependencies: [b1] + arguments: + parameters: + - name: msg + value: "2" + - name: b3 + template: B + dependencies: [b1] + arguments: + parameters: + - name: msg + value: "3" + - name: b4 + template: B + dependencies: [b1] + arguments: + parameters: + - name: msg + value: "4" + - name: b5 + template: B + dependencies: [b2, b3, b4] + arguments: + parameters: + - name: msg + value: "5" + + - name: B + inputs: + parameters: + - name: msg + dag: + tasks: + - name: c1 + template: one-job + arguments: + parameters: + - name: msg + value: "{{inputs.parameters.msg}} c1" + - name: c2 + template: one-job + dependencies: [c1] + arguments: + parameters: + - name: msg + value: "{{inputs.parameters.msg}} c2" + - name: c3 + template: one-job + dependencies: [c1] + arguments: + parameters: + - name: msg + value: "{{inputs.parameters.msg}} c3" + + - name: one-job + inputs: + parameters: + - name: msg + container: + image: alpine + command: ['/bin/sh', '-c'] + args: ["echo {{inputs.parameters.msg}}; sleep 10"] diff --git a/examples/parallelism-nested-workflow.yaml b/examples/parallelism-nested-workflow.yaml new file mode 100644 index 000000000000..5cba4de3391b --- /dev/null +++ b/examples/parallelism-nested-workflow.yaml @@ -0,0 +1,52 @@ +# Example on specifying parallelism on the outer workflow and limiting the number of its +# children workflowss to be run at the same time. +# +# As the parallelism of A is 1, the four steps of seq-step will run sequentially. + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: parallelism-nested-workflow- +spec: + arguments: + parameters: + - name: seq-list + value: | + ["a","b","c","d"] + entrypoint: A + templates: + - name: A + parallelism: 1 + inputs: + parameters: + - name: seq-list + steps: + - - name: seq-step + template: B + arguments: + parameters: + - name: seq-id + value: "{{item}}" + withParam: "{{inputs.parameters.seq-list}}" + + - name: B + inputs: + parameters: + - name: seq-id + steps: + - - name: jobs + template: one-job + arguments: + parameters: + - name: seq-id + value: "{{inputs.parameters.seq-id}}" + withParam: "[1, 2]" + + - name: one-job + inputs: + parameters: + - name: seq-id + container: + image: alpine + command: ['/bin/sh', '-c'] + args: ["echo {{inputs.parameters.seq-id}}; sleep 30"] diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index ad7379cc34b9..9480c9923c3f 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -495,6 +495,31 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 { return activePods } +// countActiveChildren counts the number of active (Pending/Running) children nodes of parent parentName +func (woc *wfOperationCtx) countActiveChildren(boundaryIDs ...string) int64 { + var boundaryID = "" + if len(boundaryIDs) > 0 { + boundaryID = boundaryIDs[0] + } + var activeChildren int64 + // if we care about parallelism, count the active pods at the template level + for _, node := range woc.wf.Status.Nodes { + if boundaryID != "" && node.BoundaryID != boundaryID { + continue + } + switch node.Type { + case wfv1.NodeTypePod, wfv1.NodeTypeSteps, wfv1.NodeTypeDAG: + default: + continue + } + switch node.Phase { + case wfv1.NodePending, wfv1.NodeRunning: + activeChildren++ + } + } + return activeChildren +} + // getAllWorkflowPods returns all pods related to the current workflow func (woc *wfOperationCtx) getAllWorkflowPods() (*apiv1.PodList, error) { options := metav1.ListOptions{ @@ -867,7 +892,8 @@ func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeSt // nodeName is the name to be used as the name of the node, and boundaryID indicates which template // boundary this node belongs to. func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string) (*wfv1.NodeStatus, error) { - woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName) + woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s", nodeName, templateName, boundaryID) + node := woc.getNodeByName(nodeName) if node != nil && node.Completed() { woc.log.Debugf("Node %s already completed", nodeName) @@ -1112,16 +1138,17 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node return ErrParallelismReached } } + fallthrough default: // if we are about to execute a pod, make our parent hasn't reached it's limit - if boundaryID != "" { + if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) { boundaryNode := woc.wf.Status.Nodes[boundaryID] boundaryTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName) if boundaryTemplate.Parallelism != nil { - templateActivePods := woc.countActivePods(boundaryID) - woc.log.Debugf("counted %d/%d active pods in boundary %s", templateActivePods, *boundaryTemplate.Parallelism, boundaryID) - if templateActivePods >= *boundaryTemplate.Parallelism { - woc.log.Infof("template (node %s) active pod parallelism reached %d/%d", boundaryID, templateActivePods, *boundaryTemplate.Parallelism) + activeSiblings := woc.countActiveChildren(boundaryID) + woc.log.Debugf("counted %d/%d active children in boundary %s", activeSiblings, *boundaryTemplate.Parallelism, boundaryID) + if activeSiblings >= *boundaryTemplate.Parallelism { + woc.log.Infof("template (node %s) active children parallelism reached %d/%d", boundaryID, activeSiblings, *boundaryTemplate.Parallelism) return ErrParallelismReached } }