Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle retried node properly #1669

Merged
merged 1 commit into from
Oct 17, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
woc.log.Debugf("Node %s already completed", nodeName)
return node, nil
}
woc.log.Debugf("Executing node %s is %s", nodeName, node.Phase)
woc.log.Debugf("Executing node %s of %s is %s", nodeName, node.Type, node.Phase)
// Memoized nodes don't have StartedAt.
if node.StartedAt.IsZero() {
node.StartedAt = metav1.Time{Time: time.Now().UTC()}
Expand Down Expand Up @@ -1151,42 +1151,49 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
// This node acts as a parent of all retries that will be done for
// the container. The status of this node should be "Success" if any
// of the retries succeed. Otherwise, it is "Failed".
workNodeName := nodeName
retryNodeName := ""
if processedTmpl.IsLeaf() && processedTmpl.RetryStrategy != nil {
retryNodeName = nodeName
retryParentNode := node
if retryParentNode == nil {
woc.log.Debugf("Inject a retry node for node %s", retryNodeName)
retryParentNode = woc.initializeNode(retryNodeName, wfv1.NodeTypeRetry, orgTmpl, boundaryID, wfv1.NodeRunning)
retryParentNode = woc.initializeExecutableNode(retryNodeName, wfv1.NodeTypeRetry, newTmplCtx, processedTmpl, orgTmpl, boundaryID, wfv1.NodeRunning)
}
processedRetryParentNode, err := woc.processNodeRetries(retryParentNode, *processedTmpl.RetryStrategy)
if err != nil {
woc.log.Errorf("Error: %+v", err)
return woc.markNodeError(retryNodeName, err), err
}
retryParentNode = processedRetryParentNode
// The retry node might have completed by now.
if retryParentNode.Completed() {
woc.log.Errorf("Error: %+v", err)
return retryParentNode, nil
}
lastChildNode, err := woc.getLastChildNode(retryParentNode)
if err != nil {
woc.log.Errorf("Error: %+v", err)
return woc.markNodeError(retryNodeName, err), err
}
if lastChildNode != nil && !lastChildNode.Completed() {
// Last child node is still running.
return retryParentNode, nil
if lastChildNode != nil {
if !lastChildNode.Completed() {
// Last child node is still running.
return retryParentNode, nil
}
// All work is done in a child
nodeName = lastChildNode.Name
node = lastChildNode
} else {
// This is the first try.
nodeName = fmt.Sprintf("%s(%d)", retryNodeName, len(retryParentNode.Children))
node = nil
}
// All work is done in a child
childNodeName := fmt.Sprintf("%s(%d)", retryNodeName, len(retryParentNode.Children))
workNodeName = childNodeName
node = nil
}

// Initialize node based on the template type.
if node == nil {
var nodeType wfv1.NodeType
switch basedTmpl.GetType() {
switch processedTmpl.GetType() {
case wfv1.TemplateTypeContainer, wfv1.TemplateTypeScript, wfv1.TemplateTypeResource:
nodeType = wfv1.NodeTypePod
case wfv1.TemplateTypeSteps:
Expand All @@ -1196,10 +1203,10 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
case wfv1.TemplateTypeSuspend:
nodeType = wfv1.NodeTypeSuspend
default:
err := errors.InternalErrorf("Template '%s' has unknown node type", basedTmpl.Name)
return woc.initializeNode(workNodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, wfv1.NodeError, err.Error()), err
err := errors.InternalErrorf("Template '%s' has unknown node type", processedTmpl.Name)
return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, wfv1.NodeError, err.Error()), err
}
node = woc.initializeExecutableNode(workNodeName, nodeType, newTmplCtx, basedTmpl, orgTmpl, boundaryID, wfv1.NodePending)
node = woc.initializeExecutableNode(nodeName, nodeType, newTmplCtx, processedTmpl, orgTmpl, boundaryID, wfv1.NodePending)
}

switch processedTmpl.GetType() {
Expand Down