Skip to content

Commit

Permalink
Enhance Any Sequencer (kubeflow#641)
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchihe authored Jun 28, 2021
1 parent f76951e commit ad25492
Showing 1 changed file with 37 additions and 38 deletions.
75 changes: 37 additions & 38 deletions tekton-catalog/any-sequencer/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/Knetic/govaluate"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
v1alpha1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
v1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
Expand Down Expand Up @@ -64,11 +63,11 @@ type conditionResult struct {

func exitWithStatus(statusToWrite string, osStatus int) {
if statusPath == "" {
log.Printf("Program exit status is %d.", osStatus)
fmt.Printf("Program exit status is %d.\n", osStatus)
os.Exit(osStatus)
}
if statusToWrite == skippedStatus {
log.Printf("All the tasks or conditions to watch does not meet, skipping.")
fmt.Println("All the tasks or conditions to watch does not meet, skipping.")
if skippingPolicy == skipOnNoMatch {
if err := writeStringToFile(skippedStatus, statusPath); err != nil {
os.Exit(1)
Expand Down Expand Up @@ -102,18 +101,18 @@ func writeStringToFile(s string, path string) error {
if os.IsNotExist(err) {
dstFile, err = os.Create(path)
if err != nil {
log.Printf("Error creating file: " + err.Error())
fmt.Println("Error creating file: " + err.Error())
return err
}
}
}
defer dstFile.Close()
_, err = dstFile.WriteString(s)
if err != nil {
log.Printf("Error writing to file: " + err.Error())
fmt.Println("Error writing to file: " + err.Error())
return err
}
log.Printf("Wrote %s to file %s.", s, path)
fmt.Printf("Wrote %s to file %s.\n", s, path)
return nil
}

Expand All @@ -135,7 +134,7 @@ func parse_conditions(condtions []string, tasks *[]string) {
for _, condition := range condtions {
operands := regexp.MustCompile(" +").Split(condition, -1)
if len(operands) != 3 {
log.Printf("The conditon must be as format 'operand1 operator operand2'.")
fmt.Println("The conditon must be as format 'operand1 operator operand2'.")
exitWithStatus(failedStatus, 1)
}
var taskName string
Expand All @@ -144,12 +143,12 @@ func parse_conditions(condtions []string, tasks *[]string) {
operand1Results := resultMatcher.FindAllStringSubmatch(operands[0], -1)
operand2Results := resultMatcher.FindAllStringSubmatch(operands[2], -1)
if len(operand1Results) == 0 && len(operand2Results) == 0 {
log.Printf("Must at least contain one result and at most two in one condition for a task.")
fmt.Println("Must at least contain one result and at most two in one condition for a task.")
exitWithStatus(failedStatus, 1)
}
if len(operand1Results) > 0 && len(operand2Results) > 0 {
if operand1Results[0][1] != operand2Results[0][1] {
log.Printf("The conditon can only contain results in one task, here's two.")
fmt.Println("The conditon can only contain results in one task, here's two.")
exitWithStatus(failedStatus, 1)
}
taskName = operand1Results[0][1]
Expand Down Expand Up @@ -188,9 +187,9 @@ func Execute() {
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "any-taskrun",
Short: "Watch taskrun and exit when any taskrun complete",
Long: `Watch taskrun and exit when any of below is true:
1: taskrun complete
Short: "Watch taskrun or run, and exit when any taskrun or run complete",
Long: `Watch taskrun or run, and exit when any of below is true:
1: taskrun or run complete
2: condition met`,
Run: watch,
}
Expand All @@ -200,10 +199,10 @@ func Execute() {
rootCmd.Flags().StringVar(&prName, "prName", "", "The name of the pipelinerun.")
rootCmd.MarkFlagRequired("prName")
rootCmd.Flags().StringVar(&taskList, "taskList", "", "The comma separated list of the tasks.")
rootCmd.Flags().StringSliceVarP(&conditions, "condition", "c", []string{}, "The conditions to watch")
rootCmd.Flags().StringSliceVarP(&conditions, "condition", "c", []string{}, "The conditions to watch.")
rootCmd.Flags().StringVar(&statusPath, "statusPath", "", "The path to write the status when finished.")
rootCmd.Flags().StringVar(&skippingPolicy, "skippingPolicy", "", "The name of the pipelinerun.")
rootCmd.Flags().StringVar(&errorPolicy, "errorPolicy", "", "The namespace of the pipelinerun.")
rootCmd.Flags().StringVar(&skippingPolicy, "skippingPolicy", "", "Determines for reacting to no-dependency-condition-matching case. \"skip_on_no_match\" or \"error_on_no_match\".")
rootCmd.Flags().StringVar(&errorPolicy, "errorPolicy", "", "An action taken when the run has failed. One of: \"fail_on_error\", \"continue_on_error\"")

if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
Expand Down Expand Up @@ -239,18 +238,18 @@ func checkTaskrunConditions(crs []conditionResult, tr *v1beta1.TaskRun) (string,
}
}
if !found {
log.Printf("The result %s does not exist in taskrun %s.", result, trLabel)
fmt.Printf("The result %s does not exist in taskrun %s.\n", result, trLabel)
return cr.condition, false
}
}
expr, err := govaluate.NewEvaluableExpression(sanitize_parameter_name(cr.condition))
if err != nil {
log.Fatal("syntax error:", err)
fmt.Println("syntax error:", err)
}

evaluateresult, err := expr.Evaluate(parameters)
if err != nil {
log.Fatal("evaluate error:", err)
fmt.Println("evaluate error:", err)
}

if result, ok := evaluateresult.(bool); ok {
Expand Down Expand Up @@ -278,18 +277,18 @@ func checkRunConditions(crs []conditionResult, run *v1alpha1.Run) (string, bool)
}
}
if !found {
log.Printf("The result %s does not exist in run %s.", result, runLabel)
fmt.Printf("The result %s does not exist in run %s.\n", result, runLabel)
return cr.condition, false
}
}
expr, err := govaluate.NewEvaluableExpression(sanitize_parameter_name(cr.condition))
if err != nil {
log.Fatal("syntax error:", err)
fmt.Println("syntax error:", err)
}

evaluateresult, err := expr.Evaluate(parameters)
if err != nil {
log.Fatal("evaluate error:", err)
fmt.Println("evaluate error:", err)
}

if result, ok := evaluateresult.(bool); ok {
Expand All @@ -305,22 +304,22 @@ func checkRunConditions(crs []conditionResult, run *v1alpha1.Run) (string, bool)
func watchTaskRun(labelSelector string, tasks []string, failedTasksCh chan string) {
config, err := rest.InClusterConfig()
if err != nil {
log.Errorf("Get config of the cluster failed: %+v", err)
fmt.Printf("Get config of the cluster failed: %+v \n", err)
exitWithStatus(failedStatus, 1)
}

tektonClient, err := tektoncdclientset.NewForConfig(config)
if err != nil {
log.Errorf("Get client of tekton failed: %+v", err)
fmt.Printf("Get client of tekton failed: %+v \n", err)
exitWithStatus(failedStatus, 1)
}

for {
trWatcher, err := tektonClient.TektonV1beta1().TaskRuns(namespace).Watch(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})

if err != nil {
log.Printf("TaskRun Watcher error:" + err.Error())
log.Printf("Please ensure the service account has permission to get taskRun.")
fmt.Println("TaskRun Watcher error:" + err.Error())
fmt.Println("Please ensure the service account has permission to get taskRun.")
exitWithStatus(failedStatus, 1)
}

Expand All @@ -331,7 +330,7 @@ func watchTaskRun(labelSelector string, tasks []string, failedTasksCh chan strin
taskrunStatus := taskrun.Status.GetCondition(apis.ConditionSucceeded)
var taskFailed bool
if taskrunStatus.IsTrue() {
log.Printf("The TaskRun of %s succeeded.", taskLabel)
fmt.Printf("The TaskRun of %s succeeded.\n", taskLabel)
conditions, ok := conditionMap[taskLabel]
if !ok { // no conditions to be passed --> any-sequencer success
trWatcher.Stop()
Expand All @@ -344,7 +343,7 @@ func watchTaskRun(labelSelector string, tasks []string, failedTasksCh chan strin
exitWithStatus(succeededStatus, 0)
}
taskFailed = true
log.Printf("The condition %s for the task %s does not meet.", condition, taskLabel)
fmt.Printf("The condition %s for the task %s does not meet.\n", condition, taskLabel)
}

if taskrunStatus.IsFalse() {
Expand All @@ -365,22 +364,22 @@ func watchTaskRun(labelSelector string, tasks []string, failedTasksCh chan strin
func watchRun(labelSelector string, tasks []string, failedTasksCh chan string) {
config, err := rest.InClusterConfig()
if err != nil {
log.Errorf("Get config of the cluster failed: %+v", err)
fmt.Printf("Get config of the cluster failed: %+v \n", err)
exitWithStatus(failedStatus, 1)
}

tektonClient, err := tektoncdclientset.NewForConfig(config)
if err != nil {
log.Errorf("Get client of tekton failed: %+v", err)
fmt.Printf("Get client of tekton failed: %+v \n", err)
exitWithStatus(failedStatus, 1)
}

for {
runWatcher, err := tektonClient.TektonV1alpha1().Runs(namespace).Watch(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})

if err != nil {
log.Printf("Run Watcher error:" + err.Error())
log.Printf("Please ensure the service account has permission to get Run.")
fmt.Println("Run Watcher error:" + err.Error())
fmt.Println("Please ensure the service account has permission to get Run.")
exitWithStatus(failedStatus, 1)
}

Expand All @@ -391,7 +390,7 @@ func watchRun(labelSelector string, tasks []string, failedTasksCh chan string) {
runStatus := run.Status.GetCondition(apis.ConditionSucceeded)
var taskFailed bool
if runStatus.IsTrue() {
log.Printf("The Run of %s succeeded.", taskLabel)
fmt.Printf("The Run of %s succeeded.\n", taskLabel)
conditions, ok := conditionMap[taskLabel]
if !ok { // no conditions to be passed --> any-sequencer success
runWatcher.Stop()
Expand All @@ -404,7 +403,7 @@ func watchRun(labelSelector string, tasks []string, failedTasksCh chan string) {
exitWithStatus(succeededStatus, 0)
}
taskFailed = true
log.Printf("The condition %s for the task %s does not meet.", condition, taskLabel)
fmt.Printf("The condition %s for the task %s does not meet.\n", condition, taskLabel)
}

if runStatus.IsFalse() {
Expand All @@ -421,7 +420,7 @@ func watchRun(labelSelector string, tasks []string, failedTasksCh chan string) {

func watch(cmd *cobra.Command, args []string) {
if taskList == "" && len(conditions) == 0 {
log.Printf("Should provide either taskList or conditions to watch.")
fmt.Println("Should provide either taskList or conditions to watch.")
exitWithStatus(failedStatus, 1)
}

Expand All @@ -430,21 +429,21 @@ func watch(cmd *cobra.Command, args []string) {
skippingPolicy = skipOnNoMatch
} else {
if skippingPolicy != skipOnNoMatch && skippingPolicy != errorOnNoMatch {
log.Printf("skippingPolicy value must be one of %s or %s.", skipOnNoMatch, errorOnNoMatch)
fmt.Printf("skippingPolicy value must be one of %s or %s.\n", skipOnNoMatch, errorOnNoMatch)
exitWithStatus(failedStatus, 1)
}
}
if errorPolicy == "" {
errorPolicy = continueOnError
} else {
if errorPolicy != continueOnError && errorPolicy != failOnError {
log.Printf("skippingPolicy value must be one of %s or %s.", continueOnError, failOnError)
fmt.Printf("skippingPolicy value must be one of %s or %s.\n", continueOnError, failOnError)
exitWithStatus(failedStatus, 1)
}
}
}

log.Printf("Starting to watch taskrun or run for '%s' and condition in %s/%s.", taskList, namespace, prName)
fmt.Printf("Starting to watch taskrun or run for '%s' and condition in %s/%s.\n", taskList, namespace, prName)

var tasks []string
if taskList != "" {
Expand All @@ -466,7 +465,7 @@ func watch(cmd *cobra.Command, args []string) {
failedTasks = append(failedTasks, failedTask)
}
if len(failedTasks) >= len(tasks) {
log.Printf("All specified TaskRun(s) or Run(s) failed.")
fmt.Println("All specified TaskRun(s) or Run(s) failed.")
exitWithStatus(skippedStatus, 1)
}
}
Expand Down

0 comments on commit ad25492

Please sign in to comment.