diff --git a/test/e2e/ui/ui-dag-with-params.yaml b/test/e2e/ui/ui-dag-with-params.yaml index a954c0a8bb94..9756cda593e3 100644 --- a/test/e2e/ui/ui-dag-with-params.yaml +++ b/test/e2e/ui/ui-dag-with-params.yaml @@ -3,24 +3,53 @@ kind: Workflow metadata: generateName: ui-dag-with-params- spec: - entrypoint: diamond + entrypoint: pipeline + templates: - - name: diamond - dag: - tasks: - - name: A - template: nested-diamond - arguments: - parameters: [{name: message, value: A}] - - name: nested-diamond + - name: echo inputs: parameters: - name: message + container: + image: alpine:latest + command: [echo, "{{inputs.parameters.message}}"] + + - name: subpipeline-a dag: tasks: - - name: A + - name: A1 template: echo - - name: echo - container: - image: alpine:3.7 - command: [echo, "hello"] + arguments: + parameters: [{name: message, value: "Hello World!"}] + - name: A2 + template: echo + arguments: + parameters: [{name: message, value: "Hello World!"}] + + - name: subpipeline-b + dag: + tasks: + - name: B1 + template: echo + arguments: + parameters: [{name: message, value: "Hello World!"}] + - name: B2 + template: echo + dependencies: [B1] + arguments: + parameters: [{name: message, value: "Hello World!"}] + withItems: + - 0 + - 1 + + - name: pipeline + dag: + tasks: + - name: A + template: subpipeline-a + withItems: + - 0 + - 1 + - name: B + dependencies: [A] + template: subpipeline-b diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 438dc46c2be0..1f5601830751 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -251,12 +251,21 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { // All our dependencies were satisfied and successful. It's our turn to run + taskGroupNode := woc.getNodeByName(nodeName) + if taskGroupNode != nil && taskGroupNode.Type != wfv1.NodeTypeTaskGroup { + taskGroupNode = nil + } // connectDependencies is a helper to connect our dependencies to current task as children connectDependencies := func(taskNodeName string) { - if len(task.Dependencies) == 0 { + if len(task.Dependencies) == 0 || taskGroupNode != nil { // if we had no dependencies, then we are a root task, and we should connect the // boundary node as our parent - woc.addChildNode(dagCtx.boundaryName, taskNodeName) + if taskGroupNode == nil { + woc.addChildNode(dagCtx.boundaryName, taskNodeName) + } else { + woc.addChildNode(taskGroupNode.Name, taskNodeName) + } + } else { // Otherwise, add all outbound nodes of our dependencies as parents to this node for _, depName := range task.Dependencies { @@ -287,6 +296,16 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { return } + // If DAG task has withParam of with withSequence then we need to create virtual node of type TaskGroup. + // For example, if we had task A with withItems of ['foo', 'bar'] which expanded to ['A(0:foo)', 'A(1:bar)'], we still + // need to create a node for A. + if len(task.WithItems) > 0 || task.WithParam != "" || task.WithSequence != nil { + if taskGroupNode == nil { + connectDependencies(nodeName) + taskGroupNode = woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, task.Template, dagCtx.boundaryID, wfv1.NodeRunning, "") + } + } + for _, t := range expandedTasks { node = dagCtx.getTaskNode(t.Name) taskNodeName := dagCtx.taskNodeName(t.Name) @@ -311,12 +330,8 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { _, _ = woc.executeTemplate(t.Template, t.Arguments, taskNodeName, dagCtx.boundaryID) } - // If we expanded the task, we still need to create the task entry for the non-expanded node, - // since dependant tasks will look to it, when deciding when to execute. For example, if we had - // task A with withItems of ['foo', 'bar'] which expanded to ['A(0:foo)', 'A(1:bar)'], we still - // need to create a node for A, after the withItems have completed. - if len(task.WithItems) > 0 || task.WithParam != "" || task.WithSequence != nil { - nodeStatus := wfv1.NodeSucceeded + if taskGroupNode != nil { + groupPhase := wfv1.NodeSucceeded for _, t := range expandedTasks { // Add the child relationship from our dependency's outbound nodes to this node. node := dagCtx.getTaskNode(t.Name) @@ -324,17 +339,10 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { return } if !node.Successful() { - nodeStatus = node.Phase - } - } - woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, task.Template, dagCtx.boundaryID, nodeStatus, "") - if len(expandedTasks) > 0 { - for _, t := range expandedTasks { - woc.addChildNode(dagCtx.taskNodeName(t.Name), nodeName) + groupPhase = node.Phase } - } else { - connectDependencies(nodeName) } + woc.markNodePhase(taskGroupNode.Name, groupPhase) } } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 792f63a9aef9..41f605714b51 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1180,8 +1180,17 @@ func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string { node := woc.wf.Status.Nodes[nodeID] switch node.Type { - case wfv1.NodeTypePod, wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend, wfv1.NodeTypeTaskGroup: + case wfv1.NodeTypePod, wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend: return []string{node.ID} + case wfv1.NodeTypeTaskGroup: + if len(node.Children) == 0 { + return []string{node.ID} + } + outboundNodes := make([]string, 0) + for _, child := range node.Children { + outboundNodes = append(outboundNodes, woc.getOutboundNodes(child)...) + } + return outboundNodes case wfv1.NodeTypeRetry: numChildren := len(node.Children) if numChildren > 0 {