Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #1136 - Fix metadata for DAG with loops #1149

Merged
merged 2 commits into from
Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions test/e2e/ui/ui-dag-with-params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,53 @@ kind: Workflow
metadata:
generateName: ui-dag-with-params-
spec:
entrypoint: diamond
entrypoint: pipeline

templates:
- name: diamond
dag:
tasks:
- name: A
template: nested-diamond
arguments:
parameters: [{name: message, value: A}]
- name: nested-diamond
- name: echo
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [echo, "{{inputs.parameters.message}}"]

- name: subpipeline-a
dag:
tasks:
- name: A
- name: A1
template: echo
- name: echo
container:
image: alpine:3.7
command: [echo, "hello"]
arguments:
parameters: [{name: message, value: "Hello World!"}]
- name: A2
template: echo
arguments:
parameters: [{name: message, value: "Hello World!"}]

- name: subpipeline-b
dag:
tasks:
- name: B1
template: echo
arguments:
parameters: [{name: message, value: "Hello World!"}]
- name: B2
template: echo
dependencies: [B1]
arguments:
parameters: [{name: message, value: "Hello World!"}]
withItems:
- 0
- 1

- name: pipeline
dag:
tasks:
- name: A
template: subpipeline-a
withItems:
- 0
- 1
- name: B
dependencies: [A]
template: subpipeline-b
42 changes: 25 additions & 17 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,21 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {

// All our dependencies were satisfied and successful. It's our turn to run

taskGroupNode := woc.getNodeByName(nodeName)
if taskGroupNode != nil && taskGroupNode.Type != wfv1.NodeTypeTaskGroup {
taskGroupNode = nil
}
// connectDependencies is a helper to connect our dependencies to current task as children
connectDependencies := func(taskNodeName string) {
if len(task.Dependencies) == 0 {
if len(task.Dependencies) == 0 || taskGroupNode != nil {
// if we had no dependencies, then we are a root task, and we should connect the
// boundary node as our parent
woc.addChildNode(dagCtx.boundaryName, taskNodeName)
if taskGroupNode == nil {
woc.addChildNode(dagCtx.boundaryName, taskNodeName)
} else {
woc.addChildNode(taskGroupNode.Name, taskNodeName)
}

} else {
// Otherwise, add all outbound nodes of our dependencies as parents to this node
for _, depName := range task.Dependencies {
Expand Down Expand Up @@ -287,6 +296,16 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
return
}

// If DAG task has withParam of with withSequence then we need to create virtual node of type TaskGroup.
// For example, if we had task A with withItems of ['foo', 'bar'] which expanded to ['A(0:foo)', 'A(1:bar)'], we still
// need to create a node for A.
if len(task.WithItems) > 0 || task.WithParam != "" || task.WithSequence != nil {
if taskGroupNode == nil {
connectDependencies(nodeName)
taskGroupNode = woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, task.Template, dagCtx.boundaryID, wfv1.NodeRunning, "")
}
}

for _, t := range expandedTasks {
node = dagCtx.getTaskNode(t.Name)
taskNodeName := dagCtx.taskNodeName(t.Name)
Expand All @@ -311,30 +330,19 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
_, _ = woc.executeTemplate(t.Template, t.Arguments, taskNodeName, dagCtx.boundaryID)
}

// If we expanded the task, we still need to create the task entry for the non-expanded node,
// since dependant tasks will look to it, when deciding when to execute. For example, if we had
// task A with withItems of ['foo', 'bar'] which expanded to ['A(0:foo)', 'A(1:bar)'], we still
// need to create a node for A, after the withItems have completed.
if len(task.WithItems) > 0 || task.WithParam != "" || task.WithSequence != nil {
nodeStatus := wfv1.NodeSucceeded
if taskGroupNode != nil {
groupPhase := wfv1.NodeSucceeded
for _, t := range expandedTasks {
// Add the child relationship from our dependency's outbound nodes to this node.
node := dagCtx.getTaskNode(t.Name)
if node == nil || !node.Completed() {
return
}
if !node.Successful() {
nodeStatus = node.Phase
}
}
woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, task.Template, dagCtx.boundaryID, nodeStatus, "")
if len(expandedTasks) > 0 {
for _, t := range expandedTasks {
woc.addChildNode(dagCtx.taskNodeName(t.Name), nodeName)
groupPhase = node.Phase
}
} else {
connectDependencies(nodeName)
}
woc.markNodePhase(taskGroupNode.Name, groupPhase)
}
}

Expand Down
11 changes: 10 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,8 +1180,17 @@ func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template
func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
node := woc.wf.Status.Nodes[nodeID]
switch node.Type {
case wfv1.NodeTypePod, wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend, wfv1.NodeTypeTaskGroup:
case wfv1.NodeTypePod, wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend:
return []string{node.ID}
case wfv1.NodeTypeTaskGroup:
if len(node.Children) == 0 {
return []string{node.ID}
}
outboundNodes := make([]string, 0)
for _, child := range node.Children {
outboundNodes = append(outboundNodes, woc.getOutboundNodes(child)...)
}
return outboundNodes
case wfv1.NodeTypeRetry:
numChildren := len(node.Children)
if numChildren > 0 {
Expand Down