From 77963c3ceaf042bba1278797c349fe83dc79701b Mon Sep 17 00:00:00 2001 From: Jin Chi He Date: Mon, 5 Jul 2021 16:25:51 +0800 Subject: [PATCH] Enhance Any Sequencer to hanle cases that some tasks skipped (#653) --- frontend/src/lib/StaticGraphParser.ts | 2 +- frontend/src/lib/WorkflowParser.ts | 2 +- sdk/FEATURES.md | 8 +- sdk/python/kfp_tekton/tekton.py | 2 +- .../compiler/testdata/any_sequencer.yaml | 2 +- .../testdata/any_sequencer_noninlined.yaml | 2 +- tekton-catalog/any-sequencer/Dockerfile | 2 +- tekton-catalog/any-sequencer/Makefile | 6 +- tekton-catalog/any-sequencer/README.md | 4 +- tekton-catalog/any-sequencer/cmd/root.go | 213 +++++++++--------- tekton-catalog/any-sequencer/main.go | 2 +- 11 files changed, 127 insertions(+), 118 deletions(-) diff --git a/frontend/src/lib/StaticGraphParser.ts b/frontend/src/lib/StaticGraphParser.ts index 2f051e3f24b..d32ef8d4ccf 100644 --- a/frontend/src/lib/StaticGraphParser.ts +++ b/frontend/src/lib/StaticGraphParser.ts @@ -152,7 +152,7 @@ function buildTektonDag(graph: dagre.graphlib.Graph, template: any): void { task['taskSpec']['steps'][0]['args'] && task['taskSpec']['steps'][0]['command'] && task['taskSpec']['steps'][0]['command'][0] && - task['taskSpec']['steps'][0]['command'][0] === 'any-taskrun' + task['taskSpec']['steps'][0]['command'][0] === 'any-task' ) { let isNextTaskList = false; let isNextCondition = false; diff --git a/frontend/src/lib/WorkflowParser.ts b/frontend/src/lib/WorkflowParser.ts index b2dfef1f53f..706e6ceb9fe 100644 --- a/frontend/src/lib/WorkflowParser.ts +++ b/frontend/src/lib/WorkflowParser.ts @@ -140,7 +140,7 @@ export default class WorkflowParser { task['taskSpec']['steps'][0]['args'] && task['taskSpec']['steps'][0]['command'] && task['taskSpec']['steps'][0]['command'][0] && - task['taskSpec']['steps'][0]['command'][0] === 'any-taskrun' + task['taskSpec']['steps'][0]['command'][0] === 'any-task' ) { let isNextTaskList = false; let isNextCondition = false; diff --git a/sdk/FEATURES.md b/sdk/FEATURES.md index d7cb16c92ed..8e1bfb0dae1 100644 --- a/sdk/FEATURES.md +++ b/sdk/FEATURES.md @@ -144,9 +144,11 @@ To see how the Python SDK provides this feature, refer to the examples below: When any one of the task dependencies completes successfully and the conditions meet, the dependent task will be started. Order of execution of the dependencies doesn’t matter, and the pipeline doesn't wait for all the task dependencies to complete before moving to the next step. Condition can be applied to enforce the task dependencies completes as expected. The condition expression should be the same format as is in Kubeflow ConditionOperator, and the result of containerOps can be used in expression. Notice the expression should only contain results from only one task because the purpose here is to check the simple condition for the task's output when a task complete. And also the operand in the expression should be int or string, other python types will be transferred to string automatically. -The exit status of any-sequencer is configurable through parameters, `statusPath` must be specified for any-sequencer to write status to, then use `skippingPolicy` and `errorPolicy`. -- skippingPolicy --- determines for the AnySequencer reacts to no-dependency-condition-matching case. Values can be one of `skipOnNoMatch` or `errorOnNoMatch`, a status with value "Skipped" will be generated and the exit status will still be succeeded on `skipOnNoMatch`. -- errorPolicy --- the standard field, either `failOnError` or `continueOnError`. On `continueOnError`, a status with value "Failed" will be generated but the exit status will still be succeeded. For `Fail_on_error` the AnySequencer should truly fail in the Tekton terms, as it does now. +The Any Sequencer exits if all dependencies failed or skipped, or all conditions unmatched. + +The exit status of Any Sequencer is configurable through parameters, `statusPath` must be specified for Any Sequencer to write status to, then use `skippingPolicy` and `errorPolicy`. +- skippingPolicy --- determines for the Any Sequencer reacts to no-dependency-condition-matching case. Values can be one of `skipOnNoMatch` or `errorOnNoMatch`, a status with value "Skipped" will be generated and the exit status will still be succeeded on `skipOnNoMatch`. +- errorPolicy --- the standard field, either `failOnError` or `continueOnError`. On `continueOnError`, a status with value "Failed" will be generated but the exit status will still be succeeded. For `Fail_on_error` the Any Sequencer should truly fail in the Tekton terms, as it does now. Please follow the details of the implementation in the [design doc](https://docs.google.com/document/d/1oXOdiItI4GbEe_qzyBmMAqfLBjfYX1nM94WHY3EPa94/edit#heading=h.dt8bhna4spym). diff --git a/sdk/python/kfp_tekton/tekton.py b/sdk/python/kfp_tekton/tekton.py index 9070715d0da..e50778dbcee 100644 --- a/sdk/python/kfp_tekton/tekton.py +++ b/sdk/python/kfp_tekton/tekton.py @@ -76,7 +76,7 @@ def __init__(self, name=name, image=image, file_outputs=file_outputs, - command="any-taskrun", + command="any-task", arguments=arguments, ) diff --git a/sdk/python/tests/compiler/testdata/any_sequencer.yaml b/sdk/python/tests/compiler/testdata/any_sequencer.yaml index e12a234e5b0..8673a7d3fd3 100644 --- a/sdk/python/tests/compiler/testdata/any_sequencer.yaml +++ b/sdk/python/tests/compiler/testdata/any_sequencer.yaml @@ -165,7 +165,7 @@ spec: - -c - results_flipcoin_output == 'heads' command: - - any-taskrun + - any-task image: dspipelines/any-sequencer:latest name: main timeout: 0s diff --git a/sdk/python/tests/compiler/testdata/any_sequencer_noninlined.yaml b/sdk/python/tests/compiler/testdata/any_sequencer_noninlined.yaml index e12a234e5b0..8673a7d3fd3 100644 --- a/sdk/python/tests/compiler/testdata/any_sequencer_noninlined.yaml +++ b/sdk/python/tests/compiler/testdata/any_sequencer_noninlined.yaml @@ -165,7 +165,7 @@ spec: - -c - results_flipcoin_output == 'heads' command: - - any-taskrun + - any-task image: dspipelines/any-sequencer:latest name: main timeout: 0s diff --git a/tekton-catalog/any-sequencer/Dockerfile b/tekton-catalog/any-sequencer/Dockerfile index 14f6c2ca2e1..f9019d78fd5 100644 --- a/tekton-catalog/any-sequencer/Dockerfile +++ b/tekton-catalog/any-sequencer/Dockerfile @@ -2,5 +2,5 @@ FROM registry.access.redhat.com/ubi8/ubi-minimal ARG bin_dir=_output/bin -COPY ${bin_dir}/any-taskrun /usr/local/bin +COPY ${bin_dir}/any-task /usr/local/bin diff --git a/tekton-catalog/any-sequencer/Makefile b/tekton-catalog/any-sequencer/Makefile index 2c4b43ebd16..c05795dd809 100644 --- a/tekton-catalog/any-sequencer/Makefile +++ b/tekton-catalog/any-sequencer/Makefile @@ -1,4 +1,4 @@ -# Copyright 2020 kubeflow.org +# Copyright 2021 kubeflow.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,10 +24,10 @@ init: mkdir -p ${BIN_DIR} local: init - go build -o=${BIN_DIR}/any-taskrun + go build -o=${BIN_DIR}/any-task build-linux: init - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o=${BIN_DIR}/any-taskrun + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o=${BIN_DIR}/any-task image: build-linux docker build . -t docker.io/dspipelines/any-sequencer:$(TAG) diff --git a/tekton-catalog/any-sequencer/README.md b/tekton-catalog/any-sequencer/README.md index a57ae593380..551860ccdd2 100644 --- a/tekton-catalog/any-sequencer/README.md +++ b/tekton-catalog/any-sequencer/README.md @@ -2,9 +2,9 @@ `Any Sequencer`: When any one of the task dependencies complete successfully, the dependent task will be started. Order of execution of the dependencies doesn’t matter, e.g. if Job4 depends on Job1, Job2 and Job3, and when any one of the Job1, Job2 or Job3 complete successfully, Job4 will be started. Order doesn’t matter, and `Any Sequencer` doesn’t wait for all the job completions. -This implements a `taskRun` status watcher task to watch the list of `taskRun` it depends on (using Kubernetes RBAC). If one or more is completed, it exits the watching task and continues moving forward with Pipeline run. This status watcher task can be implemented in the same way as our “condition” task to make it “dependable”. +This implements a `pipelineRun` status watcher task to watch the list of `taskRun` or `Run` it depends on (using Kubernetes RBAC). If one or more is completed, it exits the watching task and continues moving forward with Pipeline run. This status watcher task can be implemented in the same way as our “condition” task to make it “dependable”. -Note that the service account of `Any Sequencer` needs `get` permission to watch the status of the specified `taskRun`. +Note that the service account of `Any Sequencer` needs `get` permission to watch the status of the `pipelineRun`. ### How to build binary diff --git a/tekton-catalog/any-sequencer/cmd/root.go b/tekton-catalog/any-sequencer/cmd/root.go index d20fc094dbc..42dd483ac1e 100644 --- a/tekton-catalog/any-sequencer/cmd/root.go +++ b/tekton-catalog/any-sequencer/cmd/root.go @@ -1,5 +1,5 @@ /* -Copyright 2020 kubeflow.org. +Copyright 2021 kubeflow.org. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -27,7 +27,6 @@ import ( "github.com/Knetic/govaluate" "github.com/spf13/cobra" - v1alpha1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" v1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" tektoncdclientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -125,6 +124,21 @@ func contains(s []string, str string) bool { return false } +func intersection(a, b []string) (c []string) { + m := make(map[string]bool) + + for _, item := range a { + m[item] = true + } + + for _, item := range b { + if _, ok := m[item]; ok { + c = append(c, item) + } + } + return +} + func parse_conditions(condtions []string, tasks *[]string) { conditionMap = make(map[string][]conditionResult) if len(condtions) == 0 { @@ -186,7 +200,7 @@ func parse_conditions(condtions []string, tasks *[]string) { func Execute() { // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ - Use: "any-taskrun", + Use: "any-task", Short: "Watch taskrun or run, and exit when any taskrun or run complete", Long: `Watch taskrun or run, and exit when any of below is true: 1: taskrun or run complete @@ -223,22 +237,22 @@ func sanitize_task_result(result string) interface{} { return i } -func checkTaskrunConditions(crs []conditionResult, tr *v1beta1.TaskRun) (string, bool) { +func checkTaskrunConditions(crs []conditionResult, taskRunStatus *v1beta1.PipelineRunTaskRunStatus) (string, bool) { for _, cr := range crs { parameters := make(map[string]interface{}) for _, result := range cr.results { - trLabel := tr.Labels["tekton.dev/pipelineTask"] + trName := taskRunStatus.PipelineTaskName var found bool - for _, taskRunResults := range tr.Status.TaskRunResults { + for _, taskRunResults := range taskRunStatus.Status.TaskRunResults { if result == taskRunResults.Name { // Do not need sanitize parameter name but only for expression for go valuate - parameters[`results_`+trLabel+`_`+result] = sanitize_task_result(taskRunResults.Value) + parameters[`results_`+trName+`_`+result] = sanitize_task_result(taskRunResults.Value) found = true break } } if !found { - fmt.Printf("The result %s does not exist in taskrun %s.\n", result, trLabel) + fmt.Printf("The result %s does not exist in taskrun %s.\n", result, trName) return cr.condition, false } } @@ -262,22 +276,22 @@ func checkTaskrunConditions(crs []conditionResult, tr *v1beta1.TaskRun) (string, return "", true } -func checkRunConditions(crs []conditionResult, run *v1alpha1.Run) (string, bool) { +func checkRunConditions(crs []conditionResult, runStatus *v1beta1.PipelineRunRunStatus) (string, bool) { for _, cr := range crs { parameters := make(map[string]interface{}) for _, result := range cr.results { - runLabel := run.Labels["tekton.dev/pipelineTask"] + runName := runStatus.PipelineTaskName var found bool - for _, runResults := range run.Status.Results { + for _, runResults := range runStatus.Status.Results { if result == runResults.Name { // Do not need sanitize parameter name but only for expression for go valuate - parameters[`results_`+runLabel+`_`+result] = sanitize_task_result(runResults.Value) + parameters[`results_`+runName+`_`+result] = sanitize_task_result(runResults.Value) found = true break } } if !found { - fmt.Printf("The result %s does not exist in run %s.\n", result, runLabel) + fmt.Printf("The result %s does not exist in run %s.\n", result, runName) return cr.condition, false } } @@ -301,119 +315,114 @@ func checkRunConditions(crs []conditionResult, run *v1alpha1.Run) (string, bool) return "", true } -func watchTaskRun(labelSelector string, tasks []string, failedTasksCh chan string) { - config, err := rest.InClusterConfig() - if err != nil { - fmt.Printf("Get config of the cluster failed: %+v \n", err) - exitWithStatus(failedStatus, 1) +func checkTaskRunStatus(taskRunStatus *v1beta1.PipelineRunTaskRunStatus, failedOrSkippedTasksCh chan string) { + var taskFailed bool + taskName := taskRunStatus.PipelineTaskName + taskrunStatusCondition := taskRunStatus.Status.GetCondition(apis.ConditionSucceeded) + if taskrunStatusCondition.IsTrue() { + fmt.Printf("The TaskRun of %s succeeded.\n", taskName) + conditions, ok := conditionMap[taskName] + if !ok { // no conditions to be passed --> any-sequencer success + exitWithStatus(succeededStatus, 0) + } + + condition, ok := checkTaskrunConditions(conditions, taskRunStatus) + if ok { // condition passed --> any-sequencer success + exitWithStatus(succeededStatus, 0) + } + taskFailed = true + fmt.Printf("The condition %s for the task %s does not meet.\n", condition, taskName) } - tektonClient, err := tektoncdclientset.NewForConfig(config) - if err != nil { - fmt.Printf("Get client of tekton failed: %+v \n", err) - exitWithStatus(failedStatus, 1) + if taskrunStatusCondition.IsFalse() { + taskFailed = true } - for { - trWatcher, err := tektonClient.TektonV1beta1().TaskRuns(namespace).Watch(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + if taskFailed { + failedOrSkippedTasksCh <- taskName + } +} - if err != nil { - fmt.Println("TaskRun Watcher error:" + err.Error()) - fmt.Println("Please ensure the service account has permission to get taskRun.") - exitWithStatus(failedStatus, 1) +func checkRunStatus(runStatus *v1beta1.PipelineRunRunStatus, failedOrSkippedTasksCh chan string) { + taskName := runStatus.PipelineTaskName + var taskFailed bool + runStatusCondition := runStatus.Status.GetCondition(apis.ConditionSucceeded) + if runStatusCondition.IsTrue() { + fmt.Printf("The Run of %s succeeded.\n", taskName) + conditions, ok := conditionMap[taskName] + if !ok { // no conditions to be passed --> any-sequencer success + exitWithStatus(succeededStatus, 0) + } + condition, ok := checkRunConditions(conditions, runStatus) + if ok { // condition passed --> any-sequencer success + exitWithStatus(succeededStatus, 0) } + taskFailed = true + fmt.Printf("The condition %s for the task %s does not meet.\n", condition, taskName) + } - for event := range trWatcher.ResultChan() { - taskrun := event.Object.(*v1beta1.TaskRun) - taskLabel := taskrun.Labels["tekton.dev/pipelineTask"] - if contains(tasks, taskLabel) { - taskrunStatus := taskrun.Status.GetCondition(apis.ConditionSucceeded) - var taskFailed bool - if taskrunStatus.IsTrue() { - fmt.Printf("The TaskRun of %s succeeded.\n", taskLabel) - conditions, ok := conditionMap[taskLabel] - if !ok { // no conditions to be passed --> any-sequencer success - trWatcher.Stop() - exitWithStatus(succeededStatus, 0) - } - - condition, ok := checkTaskrunConditions(conditions, taskrun) - if ok { // condition passed --> any-sequencer success - trWatcher.Stop() - exitWithStatus(succeededStatus, 0) - } - taskFailed = true - fmt.Printf("The condition %s for the task %s does not meet.\n", condition, taskLabel) - } + if runStatusCondition.IsFalse() { + taskFailed = true + } - if taskrunStatus.IsFalse() { - taskFailed = true - } + if taskFailed { + failedOrSkippedTasksCh <- taskName + } +} - if taskFailed { - failedTasksCh <- taskLabel - } - //if taskFailed && !contains(failedTasks, taskLabel) { - // failedTasks = append(failedTasks, taskLabel) - //} +func checkSkippedTasks(pr *v1beta1.PipelineRun, tasks []string, failedOrSkippedTasksCh chan string) { + var skippedTasks []string + + if pr.Status.SkippedTasks != nil { + for _, skippedTask := range pr.Status.SkippedTasks { + if !contains(skippedTasks, skippedTask.Name) { + skippedTasks = append(skippedTasks, skippedTask.Name) } } + interestedSkippedTasks := intersection(skippedTasks, tasks) + for _, interestedSkippedTask := range interestedSkippedTasks { + failedOrSkippedTasksCh <- interestedSkippedTask + } } } -func watchRun(labelSelector string, tasks []string, failedTasksCh chan string) { +func watchPipelineRun(tasks []string, failedOrSkippedTasksCh chan string) []string { + config, err := rest.InClusterConfig() if err != nil { fmt.Printf("Get config of the cluster failed: %+v \n", err) exitWithStatus(failedStatus, 1) } - tektonClient, err := tektoncdclientset.NewForConfig(config) if err != nil { fmt.Printf("Get client of tekton failed: %+v \n", err) exitWithStatus(failedStatus, 1) } + fieldSelector := "metadata.name=" + strings.TrimSpace(prName) + for { - runWatcher, err := tektonClient.TektonV1alpha1().Runs(namespace).Watch(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + prWatcher, err := tektonClient.TektonV1beta1().PipelineRuns(namespace).Watch(context.TODO(), metav1.ListOptions{FieldSelector: fieldSelector}) if err != nil { fmt.Println("Run Watcher error:" + err.Error()) - fmt.Println("Please ensure the service account has permission to get Run.") + fmt.Println("Please ensure the service account has permission to get PipelineRun.") exitWithStatus(failedStatus, 1) } - for event := range runWatcher.ResultChan() { - run := event.Object.(*v1alpha1.Run) - taskLabel := run.Labels["tekton.dev/pipelineTask"] - if contains(tasks, taskLabel) { - runStatus := run.Status.GetCondition(apis.ConditionSucceeded) - var taskFailed bool - if runStatus.IsTrue() { - fmt.Printf("The Run of %s succeeded.\n", taskLabel) - conditions, ok := conditionMap[taskLabel] - if !ok { // no conditions to be passed --> any-sequencer success - runWatcher.Stop() - exitWithStatus(succeededStatus, 0) - } - - condition, ok := checkRunConditions(conditions, run) - if ok { // condition passed --> any-sequencer success - runWatcher.Stop() - exitWithStatus(succeededStatus, 0) - } - taskFailed = true - fmt.Printf("The condition %s for the task %s does not meet.\n", condition, taskLabel) + for event := range prWatcher.ResultChan() { + pr := event.Object.(*v1beta1.PipelineRun) + for _, taskRunStatus := range pr.Status.TaskRuns { + if contains(tasks, taskRunStatus.PipelineTaskName) { + checkTaskRunStatus(taskRunStatus, failedOrSkippedTasksCh) } - - if runStatus.IsFalse() { - taskFailed = true - } - - if taskFailed { - failedTasksCh <- taskLabel + } + for _, runStatus := range pr.Status.Runs { + if contains(tasks, runStatus.PipelineTaskName) { + checkRunStatus(runStatus, failedOrSkippedTasksCh) } } + checkSkippedTasks(pr, tasks, failedOrSkippedTasksCh) } } } @@ -443,8 +452,6 @@ func watch(cmd *cobra.Command, args []string) { } } - fmt.Printf("Starting to watch taskrun or run for '%s' and condition in %s/%s.\n", taskList, namespace, prName) - var tasks []string if taskList != "" { tasks = strings.Split(taskList, ",") @@ -452,20 +459,20 @@ func watch(cmd *cobra.Command, args []string) { parse_conditions(conditions, &tasks) - labelSelector := "tekton.dev/pipelineRun=" + strings.TrimSpace(prName) + fmt.Printf("Starting to watch taskrun or run of '%s' and conditions in %s/%s.\n", taskList, namespace, prName) - failedTasksCh := make(chan string) + failedOrSkippedTasksCh := make(chan string) - go watchTaskRun(labelSelector, tasks, failedTasksCh) - go watchRun(labelSelector, tasks, failedTasksCh) + go watchPipelineRun(tasks, failedOrSkippedTasksCh) - var failedTasks []string - for failedTask := range failedTasksCh { - if !contains(failedTasks, failedTask) { - failedTasks = append(failedTasks, failedTask) + var failedOrSkippedTasks []string + for failedorSkippedTask := range failedOrSkippedTasksCh { + if !contains(failedOrSkippedTasks, failedorSkippedTask) { + fmt.Printf("The taskrun or run failed/skipped, or condition unmatched of task: %s.\n", failedorSkippedTask) + failedOrSkippedTasks = append(failedOrSkippedTasks, failedorSkippedTask) } - if len(failedTasks) >= len(tasks) { - fmt.Println("All specified TaskRun(s) or Run(s) failed.") + if len(failedOrSkippedTasks) >= len(tasks) { + fmt.Println("All specified TaskRun(s) or Run(s) failed or skipped.") exitWithStatus(skippedStatus, 1) } } diff --git a/tekton-catalog/any-sequencer/main.go b/tekton-catalog/any-sequencer/main.go index 8a6871af981..21d11fa59bd 100644 --- a/tekton-catalog/any-sequencer/main.go +++ b/tekton-catalog/any-sequencer/main.go @@ -1,5 +1,5 @@ /* -Copyright 2020 kubeflow.org. +Copyright 2021 kubeflow.org. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.