Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added status of previous steps as variables #1681

Merged
merged 8 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ The following variables are made available to reference various metadata of a wo
| Variable | Description|
|----------|------------|
| `steps.<STEPNAME>.ip` | IP address of a previous daemon container step |
| `steps.<STEPNAME>.outputs.result` | Output result of a previous script step |
| `steps.<STEPNAME>.outputs.parameters.<NAME>` | Output parameter of a previous step |
| `steps.<STEPNAME>.outputs.artifacts.<NAME>` | Output artifact of a previous step |
| `steps.<STEPNAME>.status` | Phase status of any previous script step |
| `steps.<STEPNAME>.outputs.result` | Output result of any previous script step |
| `steps.<STEPNAME>.outputs.parameters.<NAME>` | Output parameter of any previous step |
| `steps.<STEPNAME>.outputs.artifacts.<NAME>` | Output artifact of any previous step |

## DAG Templates:
| Variable | Description|
|----------|------------|
| `tasks.<TASKNAME>.ip` | IP address of a previous daemon container task |
| `tasks.<TASKNAME>.outputs.result` | Output result of a previous script task |
| `tasks.<TASKNAME>.outputs.parameters.<NAME>` | Output parameter of a previous task |
| `tasks.<TASKNAME>.outputs.artifacts.<NAME>` | Output artifact of a previous task |
| `tasks.<STEPNAME>.status` | Phase status of any previous task step |
| `tasks.<TASKNAME>.outputs.result` | Output result of any previous script task |
| `tasks.<TASKNAME>.outputs.parameters.<NAME>` | Output parameter of any previous task |
| `tasks.<TASKNAME>.outputs.artifacts.<NAME>` | Output artifact of any previous task |

## Container/Script Templates:
| Variable | Description|
Expand Down
41 changes: 41 additions & 0 deletions examples/status-reference.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# The status reference example combines the use of a status result,
# along with conditionals, to take a dynamic path in the
# workflow. In this example, depending on the status of 'flakey-container'
# the template will either run the 'succeeded' step or the 'failed' step.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: status-reference-
spec:
entrypoint: status-reference
templates:
- name: status-reference
steps:
- - name: flakey-container
template: flakey-container
continueOn:
failed: true
- - name: failed
template: failed
when: "{{steps.flakey-container.status}} == Failed"
- name: succeeded
template: succeeded
when: "{{steps.flakey-container.status}} == Succeeded"

- name: flakey-container
script:
image: alpine:3.6
command: [sh, -c]
args: ["exit 1"]

- name: failed
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"the flakey container failed\""]

- name: succeeded
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"the flakey container passed\""]
4 changes: 4 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,10 @@ func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, nod
key := fmt.Sprintf("%s.ip", prefix)
scope.addParamToScope(key, node.PodIP)
}
if node.Phase != "" {
key := fmt.Sprintf("%s.status", prefix)
scope.addParamToScope(key, string(node.Phase))
}
woc.addOutputsToScope(prefix, node.Outputs, scope)
}

Expand Down
56 changes: 55 additions & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"strings"
"testing"

"sigs.k8s.io/yaml"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test"
Expand Down Expand Up @@ -1102,6 +1102,60 @@ func TestResolvePlaceholdersInOutputValues(t *testing.T) {
assert.Equal(t, "output-value-placeholders-wf", *parameterValue)
}

var outputStatuses = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: scripts-bash-
spec:
entrypoint: bash-script-example
templates:
- name: bash-script-example
steps:
- - name: first
template: flakey-container
continueOn:
failed: true
- - name: print
template: print-message
arguments:
parameters:
- name: message
value: "{{steps.first.status}}"


- name: flakey-container
script:
image: busybox
command: [sh, -c]
args: ["exit 0"]

- name: print-message
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo result was: {{inputs.parameters.message}}"]
`

func TestResolveStatuses(t *testing.T) {

controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(outputStatuses)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
jsonValue, err := json.Marshal(&wf.Spec.Templates[0])
assert.NoError(t, err)

assert.Contains(t, string(jsonValue), "{{steps.first.status}}")
assert.NotContains(t, string(jsonValue), "{{steps.print.status}}")
}

var resourceTemplate = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
14 changes: 9 additions & 5 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ func (ctx *templateValidationCtx) validateSteps(scope map[string]interface{}, tm
}
stepNames[step.Name] = true
prefix := fmt.Sprintf("steps.%s", step.Name)
scope[fmt.Sprintf("%s.status", prefix)] = true
err := addItemsToScope(prefix, step.WithItems, step.WithParam, step.WithSequence, scope)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
Expand All @@ -576,7 +577,7 @@ func (ctx *templateValidationCtx) validateSteps(scope map[string]interface{}, tm
for i, step := range stepGroup {
aggregate := len(step.WithItems) > 0 || step.WithParam != ""
resolvedTmpl := resolvedTemplates[step.Name]
ctx.addOutputsToScope(resolvedTmpl, fmt.Sprintf("steps.%s", step.Name), scope, aggregate)
ctx.addOutputsToScope(resolvedTmpl, fmt.Sprintf("steps.%s", step.Name), scope, aggregate, false)

// Validate the template again with actual arguments.
_, err = ctx.validateTemplateHolder(&step, tmplCtx, &step.Arguments, scope)
Expand Down Expand Up @@ -629,7 +630,7 @@ func addItemsToScope(prefix string, withItems []wfv1.Item, withParam string, wit
return nil
}

func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix string, scope map[string]interface{}, aggregate bool) {
func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix string, scope map[string]interface{}, aggregate bool, isAncestor bool) {
if tmpl.Daemon != nil && *tmpl.Daemon {
scope[fmt.Sprintf("%s.ip", prefix)] = true
}
Expand Down Expand Up @@ -660,6 +661,9 @@ func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix
scope[fmt.Sprintf("%s.outputs.parameters", prefix)] = true
}
}
if isAncestor {
scope[fmt.Sprintf("%s.status", prefix)] = true
}
}

func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
Expand Down Expand Up @@ -878,7 +882,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
}
prefix := fmt.Sprintf("tasks.%s", task.Name)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false)
resolvedTemplates[task.Name] = resolvedTmpl
dupDependencies := make(map[string]bool)
for j, depName := range task.Dependencies {
Expand Down Expand Up @@ -912,7 +916,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
resolvedTmpl := resolvedTemplates[task.Name]
// add all tasks outputs to scope so that a nested DAGs can have outputs
prefix := fmt.Sprintf("tasks.%s", task.Name)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false)
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false)
taskBytes, err := json.Marshal(task)
if err != nil {
return errors.InternalWrapError(err)
Expand All @@ -927,7 +931,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
resolvedTmpl := resolvedTemplates[ancestor]
ancestorPrefix := fmt.Sprintf("tasks.%s", ancestor)
aggregate := len(ancestorTask.WithItems) > 0 || ancestorTask.WithParam != ""
ctx.addOutputsToScope(resolvedTmpl, ancestorPrefix, taskScope, aggregate)
ctx.addOutputsToScope(resolvedTmpl, ancestorPrefix, taskScope, aggregate, true)
}
err = addItemsToScope(prefix, task.WithItems, task.WithParam, task.WithSequence, taskScope)
if err != nil {
Expand Down
Loading