diff --git a/cmd/cli/jobflow.go b/cmd/cli/jobflow.go new file mode 100644 index 0000000000..0763a65a4f --- /dev/null +++ b/cmd/cli/jobflow.go @@ -0,0 +1,69 @@ +package main + +import ( + "github.com/spf13/cobra" + + "volcano.sh/volcano/cmd/cli/util" + "volcano.sh/volcano/pkg/cli/jobflow" +) + +func buildJobFlowCmd() *cobra.Command { + jobFlowCmd := &cobra.Command{ + Use: "jobflow", + Short: "vcctl command line operation jobflow", + } + + jobFlowCommandMap := map[string]struct { + Short string + RunFunction func(cmd *cobra.Command, args []string) + InitFlags func(cmd *cobra.Command) + }{ + "create": { + Short: "create a jobflow", + RunFunction: func(cmd *cobra.Command, args []string) { + util.CheckError(cmd, jobflow.CreateJobFlow(cmd.Context())) + }, + InitFlags: jobflow.InitCreateFlags, + }, + "list": { + Short: "list jobflows", + RunFunction: func(cmd *cobra.Command, args []string) { + util.CheckError(cmd, jobflow.ListJobFlow(cmd.Context())) + }, + InitFlags: jobflow.InitListFlags, + }, + "get": { + Short: "get a jobflow", + RunFunction: func(cmd *cobra.Command, args []string) { + util.CheckError(cmd, jobflow.GetJobFlow(cmd.Context())) + }, + InitFlags: jobflow.InitGetFlags, + }, + "delete": { + Short: "delete a jobflow", + RunFunction: func(cmd *cobra.Command, args []string) { + util.CheckError(cmd, jobflow.DeleteJobFlow(cmd.Context())) + }, + InitFlags: jobflow.InitDeleteFlags, + }, + "describe": { + Short: "describe a jobflow", + RunFunction: func(cmd *cobra.Command, args []string) { + util.CheckError(cmd, jobflow.DescribeJobFlow(cmd.Context())) + }, + InitFlags: jobflow.InitDescribeFlags, + }, + } + + for command, config := range jobFlowCommandMap { + cmd := &cobra.Command{ + Use: command, + Short: config.Short, + Run: config.RunFunction, + } + config.InitFlags(cmd) + jobFlowCmd.AddCommand(cmd) + } + + return jobFlowCmd +} diff --git a/cmd/cli/vcctl.go b/cmd/cli/vcctl.go index 9ea077a906..734825143a 100644 --- a/cmd/cli/vcctl.go +++ b/cmd/cli/vcctl.go @@ -36,6 +36,7 @@ func main() { rootCmd.AddCommand(buildJobCmd()) rootCmd.AddCommand(buildQueueCmd()) rootCmd.AddCommand(buildJobTemplateCmd()) + rootCmd.AddCommand(buildJobFlowCmd()) rootCmd.AddCommand(versionCommand()) code := cli.Run(&rootCmd) diff --git a/pkg/cli/jobflow/create.go b/pkg/cli/jobflow/create.go new file mode 100644 index 0000000000..2757698610 --- /dev/null +++ b/pkg/cli/jobflow/create.go @@ -0,0 +1,76 @@ +package jobflow + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/spf13/cobra" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" + + flowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1" + "volcano.sh/apis/pkg/client/clientset/versioned" + "volcano.sh/volcano/pkg/cli/util" +) + +type createFlags struct { + util.CommonFlags + // FilePath is the file path of jobflow + FilePath string +} + +var createJobFlowFlags = &createFlags{} + +// InitCreateFlags is used to init all flags during queue creating. +func InitCreateFlags(cmd *cobra.Command) { + util.InitFlags(cmd, &createJobFlowFlags.CommonFlags) + cmd.Flags().StringVarP(&createJobFlowFlags.FilePath, "file", "f", "", "the path to the YAML file containing the jobflow") +} + +// CreateJobFlow create a jobflow. +func CreateJobFlow(ctx context.Context) error { + config, err := util.BuildConfig(createJobFlowFlags.Master, createJobFlowFlags.Kubeconfig) + if err != nil { + return err + } + + // Read YAML data from a file. + yamlData, err := os.ReadFile(createJobFlowFlags.FilePath) + if err != nil { + return err + } + // Split YAML data into individual documents. + yamlDocs := strings.Split(string(yamlData), "---") + + jobFlowClient := versioned.NewForConfigOrDie(config) + createdCount := 0 + for _, doc := range yamlDocs { + // Skip empty documents or documents with only whitespace. + doc = strings.TrimSpace(doc) + if doc == "" { + continue + } + + // Parse each YAML document into a JobFlow object. + obj := &flowv1alpha1.JobFlow{} + if err = yaml.Unmarshal([]byte(doc), obj); err != nil { + return err + } + // Set the namespace if it's not specified. + if obj.Namespace == "" { + obj.Namespace = "default" + } + + _, err = jobFlowClient.FlowV1alpha1().JobFlows(obj.Namespace).Create(ctx, obj, metav1.CreateOptions{}) + if err == nil { + fmt.Printf("Created JobFlow: %s/%s\n", obj.Namespace, obj.Name) + createdCount++ + } else { + fmt.Printf("Failed to create JobFlow: %v\n", err) + } + } + return nil +} diff --git a/pkg/cli/jobflow/delete.go b/pkg/cli/jobflow/delete.go new file mode 100644 index 0000000000..cd009d99ae --- /dev/null +++ b/pkg/cli/jobflow/delete.go @@ -0,0 +1,104 @@ +package jobflow + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/spf13/cobra" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" + + flowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1" + "volcano.sh/apis/pkg/client/clientset/versioned" + "volcano.sh/volcano/pkg/cli/util" +) + +type deleteFlags struct { + util.CommonFlags + + // Name is name of jobflow + Name string + // Namespace is namespace of jobflow + Namespace string + // FilePath is the file path of jobflow + FilePath string +} + +var deleteJobFlowFlags = &deleteFlags{} + +// InitDeleteFlags is used to init all flags during jobflow deleting. +func InitDeleteFlags(cmd *cobra.Command) { + util.InitFlags(cmd, &deleteJobFlowFlags.CommonFlags) + cmd.Flags().StringVarP(&deleteJobFlowFlags.Name, "name", "N", "", "the name of jobflow") + cmd.Flags().StringVarP(&deleteJobFlowFlags.Namespace, "namespace", "n", "default", "the namespace of jobflow") + cmd.Flags().StringVarP(&deleteJobFlowFlags.FilePath, "file", "f", "", "the path to the YAML file containing the jobflow") +} + +// DeleteJobFlow is used to delete a jobflow. +func DeleteJobFlow(ctx context.Context) error { + config, err := util.BuildConfig(deleteJobFlowFlags.Master, deleteJobFlowFlags.Kubeconfig) + if err != nil { + return err + } + + jobFlowClient := versioned.NewForConfigOrDie(config) + if err != nil { + return err + } + + if deleteJobFlowFlags.FilePath != "" { + yamlData, err := os.ReadFile(deleteJobFlowFlags.FilePath) + if err != nil { + return err + } + + yamlDocs := strings.Split(string(yamlData), "---") + + deletedCount := 0 + for _, doc := range yamlDocs { + doc = strings.TrimSpace(doc) + if doc == "" { + continue + } + + jobFlow := &flowv1alpha1.JobFlow{} + if err := yaml.Unmarshal([]byte(doc), jobFlow); err != nil { + return err + } + + if jobFlow.Namespace == "" { + jobFlow.Namespace = "default" + } + + err := jobFlowClient.FlowV1alpha1().JobFlows(jobFlow.Namespace).Delete(ctx, jobFlow.Name, metav1.DeleteOptions{}) + if err == nil { + fmt.Printf("Deleted JobFlow: %s/%s\n", jobFlow.Namespace, jobFlow.Name) + deletedCount++ + } else { + fmt.Printf("Failed to delete JobFlow: %v\n", err) + } + } + return nil + } + + if deleteJobFlowFlags.Name == "" { + return fmt.Errorf("jobflow name must be specified") + } + + jobFlow, err := jobFlowClient.FlowV1alpha1().JobFlows(deleteJobFlowFlags.Namespace).Get(ctx, deleteJobFlowFlags.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + err = jobFlowClient.FlowV1alpha1().JobFlows(jobFlow.Namespace).Delete(ctx, jobFlow.Name, metav1.DeleteOptions{}) + if err != nil { + return err + } + + fmt.Printf("Deleted JobFlow: %s/%s\n", jobFlow.Namespace, jobFlow.Name) + + return nil +} diff --git a/pkg/cli/jobflow/describe.go b/pkg/cli/jobflow/describe.go new file mode 100644 index 0000000000..b9636e80c4 --- /dev/null +++ b/pkg/cli/jobflow/describe.go @@ -0,0 +1,111 @@ +package jobflow + +import ( + "context" + "encoding/json" + "fmt" + "os" + + "github.com/spf13/cobra" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" + + "volcano.sh/apis/pkg/apis/flow/v1alpha1" + "volcano.sh/apis/pkg/client/clientset/versioned" + "volcano.sh/volcano/pkg/cli/util" +) + +type describeFlags struct { + util.CommonFlags + + // Name is name of jobflow + Name string + // Namespace is namespace of jobflow + Namespace string + // Format print format: yaml or json format + Format string +} + +var describeJobFlowFlags = &describeFlags{} + +// InitDescribeFlags is used to init all flags. +func InitDescribeFlags(cmd *cobra.Command) { + util.InitFlags(cmd, &describeJobFlowFlags.CommonFlags) + cmd.Flags().StringVarP(&describeJobFlowFlags.Name, "name", "N", "", "the name of jobflow") + cmd.Flags().StringVarP(&describeJobFlowFlags.Namespace, "namespace", "n", "default", "the namespace of jobflow") + cmd.Flags().StringVarP(&describeJobFlowFlags.Format, "format", "o", "yaml", "the format of output") +} + +// DescribeJobFlow is used to get the particular jobflow details. +func DescribeJobFlow(ctx context.Context) error { + config, err := util.BuildConfig(describeJobFlowFlags.Master, describeJobFlowFlags.Kubeconfig) + if err != nil { + return err + } + jobFlowClient := versioned.NewForConfigOrDie(config) + + // Get jobflow list detail + if describeJobFlowFlags.Name == "" { + jobFlows, err := jobFlowClient.FlowV1alpha1().JobFlows(describeJobFlowFlags.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + for i, jobFlow := range jobFlows.Items { + // Remove managedFields + jobFlow.ManagedFields = nil + PrintJobFlowDetail(&jobFlow, describeJobFlowFlags.Format) + // Print a separator if it's not the last element + if len(jobFlows.Items) != 1 && i < len(jobFlows.Items)-1 { + fmt.Println("---------------------------------") + } + } + // Get jobflow detail + } else { + jobFlow, err := jobFlowClient.FlowV1alpha1().JobFlows(describeJobFlowFlags.Namespace).Get(ctx, describeJobFlowFlags.Name, metav1.GetOptions{}) + if err != nil { + return err + } + // Remove managedFields + jobFlow.ManagedFields = nil + // Set APIVersion and Kind if not set + if jobFlow.APIVersion == "" || jobFlow.Kind == "" { + jobFlow.APIVersion = v1alpha1.SchemeGroupVersion.String() + jobFlow.Kind = "JobFlow" + } + PrintJobFlowDetail(jobFlow, describeJobFlowFlags.Format) + } + + return nil +} + +// PrintJobFlowDetail print jobflow details +func PrintJobFlowDetail(jobFlow *v1alpha1.JobFlow, format string) { + switch format { + case "json": + printJSON(jobFlow) + case "yaml": + printYAML(jobFlow) + default: + fmt.Printf("Unsupported format: %s", format) + } +} + +func printJSON(jobFlow *v1alpha1.JobFlow) { + b, err := json.MarshalIndent(jobFlow, "", " ") + if err != nil { + fmt.Printf("Error marshaling JSON: %v\n", err) + return + } + os.Stdout.Write(b) + fmt.Println("") +} + +func printYAML(jobFlow *v1alpha1.JobFlow) { + b, err := yaml.Marshal(jobFlow) + if err != nil { + fmt.Printf("Error marshaling YAML: %v\n", err) + return + } + os.Stdout.Write(b) +} diff --git a/pkg/cli/jobflow/get.go b/pkg/cli/jobflow/get.go new file mode 100644 index 0000000000..7a9e8bbf94 --- /dev/null +++ b/pkg/cli/jobflow/get.go @@ -0,0 +1,96 @@ +package jobflow + +import ( + "context" + "fmt" + "io" + "os" + + "github.com/spf13/cobra" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "volcano.sh/apis/pkg/apis/flow/v1alpha1" + "volcano.sh/apis/pkg/client/clientset/versioned" + "volcano.sh/volcano/pkg/cli/util" +) + +type getFlags struct { + util.CommonFlags + // Name of the jobflow + Name string + // Namespace of the jobflow + Namespace string +} + +var getJobFlowFlags = &getFlags{} + +// InitGetFlags is used to init all flags. +func InitGetFlags(cmd *cobra.Command) { + util.InitFlags(cmd, &getJobFlowFlags.CommonFlags) + cmd.Flags().StringVarP(&getJobFlowFlags.Name, "name", "N", "", "the name of jobflow") + cmd.Flags().StringVarP(&getJobFlowFlags.Namespace, "namespace", "n", "default", "the namespace of jobflow") +} + +// GetJobFlow gets a jobflow. +func GetJobFlow(ctx context.Context) error { + config, err := util.BuildConfig(getJobFlowFlags.Master, getJobFlowFlags.Kubeconfig) + if err != nil { + return err + } + + if getJobFlowFlags.Name == "" { + err := fmt.Errorf("name is mandatory to get the particular jobflow details") + return err + } + + jobFlowClient := versioned.NewForConfigOrDie(config) + jobFlow, err := jobFlowClient.FlowV1alpha1().JobFlows(getJobFlowFlags.Namespace).Get(ctx, getJobFlowFlags.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + PrintJobFlow(jobFlow, os.Stdout) + + return nil +} + +// PrintJobFlow prints the jobflow details. +func PrintJobFlow(jobFlow *v1alpha1.JobFlow, writer io.Writer) { + maxNameLen := len(Name) + maxNamespaceLen := len(Namespace) + maxPhaseLen := len(Phase) + maxAgeLen := len(Age) + if len(jobFlow.Name) > maxNameLen { + maxNameLen = len(jobFlow.Name) + } + if len(jobFlow.Namespace) > maxNamespaceLen { + maxNamespaceLen = len(jobFlow.Namespace) + } + if len(jobFlow.Status.State.Phase) > maxPhaseLen { + maxPhaseLen = len(jobFlow.Status.State.Phase) + } + age := translateTimestampSince(jobFlow.CreationTimestamp) + if len(age) > maxAgeLen { + maxAgeLen = len(age) + } + + columnSpacing := 4 + maxNameLen += columnSpacing + maxNamespaceLen += columnSpacing + maxPhaseLen += columnSpacing + maxAgeLen += columnSpacing + // Find the max length of the name, namespace. + formatStr := fmt.Sprintf("%%-%ds%%-%ds%%-%ds%%-%ds\n", maxNameLen, maxNamespaceLen, maxPhaseLen, maxAgeLen) + + // Print the header. + _, err := fmt.Fprintf(writer, formatStr, Name, Namespace, Phase, Age) + if err != nil { + fmt.Printf("Failed to print JobFlow command result: %s.\n", err) + } + // Print the separator. + _, err = fmt.Fprintf(writer, formatStr, jobFlow.Name, jobFlow.Namespace, jobFlow.Status.State.Phase, age) + if err != nil { + fmt.Printf("Failed to print JobFlow command result: %s.\n", err) + } +} diff --git a/pkg/cli/jobflow/jobflow_test.go b/pkg/cli/jobflow/jobflow_test.go new file mode 100644 index 0000000000..889ea194a0 --- /dev/null +++ b/pkg/cli/jobflow/jobflow_test.go @@ -0,0 +1,498 @@ +package jobflow + +import ( + "context" + "encoding/json" + "github.com/spf13/cobra" + "io" + "net/http" + "net/http/httptest" + "os" + "reflect" + "strings" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + flowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1" +) + +func TestListJobFlow(t *testing.T) { + testCases := []struct { + name string + Response interface{} + Namespace string + ExpectedErr error + ExpectedOutput string + }{ + { + name: "Normal Case", + Response: &flowv1alpha1.JobFlowList{ + Items: []flowv1alpha1.JobFlow{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-jobflow", + Namespace: "default", + CreationTimestamp: metav1.Now(), + }, + Status: flowv1alpha1.JobFlowStatus{ + State: flowv1alpha1.State{ + Phase: "Succeed", + }, + }, + }, + }, + }, + Namespace: "default", + ExpectedErr: nil, + ExpectedOutput: `Name Namespace Phase Age +test-jobflow default Succeed 0s`, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + server := createTestServer(testCase.Response) + defer server.Close() + // Set the server URL as the master flag + listJobFlowFlags.Master = server.URL + listJobFlowFlags.Namespace = testCase.Namespace + + r, oldStdout := redirectStdout() + defer r.Close() + err := ListJobFlow(context.TODO()) + gotOutput := captureOutput(r, oldStdout) + + if !reflect.DeepEqual(err, testCase.ExpectedErr) { + t.Fatalf("test case: %s failed: got: %v, want: %v", testCase.name, err, testCase.ExpectedErr) + } + if gotOutput != testCase.ExpectedOutput { + t.Errorf("test case: %s failed: got: %s, want: %s", testCase.name, gotOutput, testCase.ExpectedOutput) + } + }) + } +} + +func TestGetJobFlow(t *testing.T) { + testCases := []struct { + name string + Response *flowv1alpha1.JobFlow + Namespace string + Name string + ExpectedErr error + ExpectedOutput string + }{ + { + name: "Normal Case", + Response: &flowv1alpha1.JobFlow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-jobflow", + Namespace: "default", + CreationTimestamp: metav1.Now(), + }, + Status: flowv1alpha1.JobFlowStatus{ + State: flowv1alpha1.State{ + Phase: "Succeed", + }, + }, + }, + Namespace: "default", + Name: "test-jobflow", + ExpectedErr: nil, + ExpectedOutput: `Name Namespace Phase Age +test-jobflow default Succeed 0s`, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + server := createTestServer(testCase.Response) + defer server.Close() + // Set the server URL as the master flag + getJobFlowFlags.Master = server.URL + // Set the namespace and name as the flags + getJobFlowFlags.Namespace = testCase.Namespace + getJobFlowFlags.Name = testCase.Name + + r, oldStdout := redirectStdout() + defer r.Close() + err := GetJobFlow(context.TODO()) + gotOutput := captureOutput(r, oldStdout) + if !reflect.DeepEqual(err, testCase.ExpectedErr) { + t.Fatalf("test case: %s failed: got: %v, want: %v", testCase.name, err, testCase.ExpectedErr) + } + if gotOutput != testCase.ExpectedOutput { + t.Fatalf("test case: %s failed: got: %s, want: %s", testCase.name, gotOutput, testCase.ExpectedOutput) + } + }) + } +} + +func TestDeleteJobFlow(t *testing.T) { + testCases := []struct { + name string + Response *flowv1alpha1.JobFlow + Namespace string + Name string + FilePath string + ExpectedErr error + ExpectedOutput string + }{ + { + name: "Normal Case", + Response: &flowv1alpha1.JobFlow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-jobflow", + Namespace: "default", + }, + }, + Namespace: "default", + Name: "test-jobflow", + ExpectedErr: nil, + ExpectedOutput: `Deleted JobFlow: default/test-jobflow`, + }, + { + name: "Normal Case", + Response: &flowv1alpha1.JobFlow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-jobflow", + Namespace: "default", + }, + }, + FilePath: "test.yaml", + ExpectedErr: nil, + ExpectedOutput: `Deleted JobFlow: default/test-a +Deleted JobFlow: default/test-b`, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + server := createTestServer(testCase.Response) + defer server.Close() + // Set the server URL as the master flag + deleteJobFlowFlags.Master = server.URL + deleteJobFlowFlags.Namespace = testCase.Namespace + deleteJobFlowFlags.Name = testCase.name + deleteJobFlowFlags.FilePath = testCase.FilePath + + if testCase.FilePath != "" { + err := createAndWriteFile(testCase.FilePath, content) + if err != nil { + t.Fatalf("Failed to create and write file: %v", err) + } + // Delete the file after the test + defer func() { + err := os.Remove(testCase.FilePath) + if err != nil { + t.Fatalf("Failed to remove file: %v", err) + } + }() + } + + r, oldStdout := redirectStdout() + defer r.Close() + err := DeleteJobFlow(context.TODO()) + gotOutput := captureOutput(r, oldStdout) + if !reflect.DeepEqual(err, testCase.ExpectedErr) { + t.Fatalf("test case: %s failed: got: %v, want: %v", testCase.name, err, testCase.ExpectedErr) + } + if gotOutput != testCase.ExpectedOutput { + t.Fatalf("test case: %s failed: got: %s, want: %s", testCase.name, gotOutput, testCase.ExpectedOutput) + } + }) + } +} + +func TestCreateJobFlow(t *testing.T) { + testCases := []struct { + name string + Response *flowv1alpha1.JobFlow + FilePath string + ExpectedErr error + ExpectedOutput string + }{ + { + name: "Normal Case", + Response: &flowv1alpha1.JobFlow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-jobflow", + Namespace: "default", + }, + }, + FilePath: "test.yaml", + ExpectedErr: nil, + ExpectedOutput: `Created JobFlow: default/test-a +Created JobFlow: default/test-b`, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + server := createTestServer(testCase.Response) + defer server.Close() + // Set the server URL as the master flag + createJobFlowFlags.Master = server.URL + createJobFlowFlags.FilePath = testCase.FilePath + + if testCase.FilePath != "" { + err := createAndWriteFile(testCase.FilePath, content) + if err != nil { + t.Fatalf("Failed to create and write file: %v", err) + } + // Delete the file after the test + defer func() { + err := os.Remove(testCase.FilePath) + if err != nil { + t.Fatalf("Failed to remove file: %v", err) + } + }() + } + r, oldStdout := redirectStdout() + defer r.Close() + err := CreateJobFlow(context.TODO()) + gotOutput := captureOutput(r, oldStdout) + if !reflect.DeepEqual(err, testCase.ExpectedErr) { + t.Fatalf("test case: %s failed: got: %v, want: %v", testCase.name, err, testCase.ExpectedErr) + } + if gotOutput != testCase.ExpectedOutput { + t.Fatalf("test case: %s failed: got: %s, want: %s", testCase.name, gotOutput, testCase.ExpectedOutput) + } + }) + } +} + +func TestDescribeJobFlow(t *testing.T) { + testCases := []struct { + name string + Response *flowv1alpha1.JobFlow + Namespace string + Name string + Format string + ExpectedErr error + ExpectedOutput string + }{ + { + name: "Normal Case, use yaml format", + Response: &flowv1alpha1.JobFlow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: flowv1alpha1.SchemeGroupVersion.String(), + Kind: "JobFlow", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-jobflow", + Namespace: "default", + }, + }, + Namespace: "default", + Name: "test-jobflow", + Format: "yaml", + ExpectedErr: nil, + ExpectedOutput: `apiVersion: flow.volcano.sh/v1alpha1 +kind: JobFlow +metadata: + creationTimestamp: null + name: test-jobflow + namespace: default +spec: {} +status: + state: {}`, + }, + { + name: "Normal Case, use json format", + Response: &flowv1alpha1.JobFlow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: flowv1alpha1.SchemeGroupVersion.String(), + Kind: "JobFlow", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-jobflow", + Namespace: "default", + }, + }, + Namespace: "default", + Name: "test-jobflow", + Format: "json", + ExpectedErr: nil, + ExpectedOutput: `{ + "kind": "JobFlow", + "apiVersion": "flow.volcano.sh/v1alpha1", + "metadata": { + "name": "test-jobflow", + "namespace": "default", + "creationTimestamp": null + }, + "spec": {}, + "status": { + "state": {} + } +}`, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + server := createTestServer(testCase.Response) + defer server.Close() + // Set the server URL as the master flag + describeJobFlowFlags.Master = server.URL + describeJobFlowFlags.Namespace = testCase.Namespace + describeJobFlowFlags.Name = testCase.name + describeJobFlowFlags.Format = testCase.Format + + r, oldStdout := redirectStdout() + defer r.Close() + err := DescribeJobFlow(context.TODO()) + gotOutput := captureOutput(r, oldStdout) + if !reflect.DeepEqual(err, testCase.ExpectedErr) { + t.Fatalf("test case: %s failed: got: %v, want: %v", testCase.name, err, testCase.ExpectedErr) + } + if gotOutput != testCase.ExpectedOutput { + t.Fatalf("test case: %s failed: got: %s, want: %s", testCase.name, gotOutput, testCase.ExpectedOutput) + } + }) + } +} + +func createTestServer(response interface{}) *httptest.Server { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + val, err := json.Marshal(response) + if err == nil { + w.Write(val) + } + }) + + server := httptest.NewServer(handler) + return server +} + +// redirectStdout redirects os.Stdout to a pipe and returns the read and write ends of the pipe. +func redirectStdout() (*os.File, *os.File) { + r, w, _ := os.Pipe() + oldStdout := os.Stdout + os.Stdout = w + return r, oldStdout +} + +// captureOutput reads from r until EOF and returns the result as a string. +func captureOutput(r *os.File, oldStdout *os.File) string { + w := os.Stdout + os.Stdout = oldStdout + w.Close() + gotOutput, _ := io.ReadAll(r) + return strings.TrimSpace(string(gotOutput)) +} + +func createAndWriteFile(filePath, content string) error { + if _, err := os.Stat(filePath); os.IsNotExist(err) { + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + _, err = io.WriteString(file, content) + if err != nil { + return err + } + } + return nil +} + +func TestInitCreateFlags(t *testing.T) { + var cmd cobra.Command + InitCreateFlags(&cmd) + + if cmd.Flag("file") == nil { + t.Errorf("Could not find the flag file") + } +} + +func TestInitGetFlags(t *testing.T) { + var cmd cobra.Command + InitGetFlags(&cmd) + + if cmd.Flag("name") == nil { + t.Errorf("Could not find the flag name") + } + if cmd.Flag("namespace") == nil { + t.Errorf("Could not find the flag name") + } +} + +func TestInitListFlags(t *testing.T) { + var cmd cobra.Command + InitListFlags(&cmd) + + if cmd.Flag("namespace") == nil { + t.Errorf("Could not find the flag namespace") + } +} + +func TestInitDescribeFlags(t *testing.T) { + var cmd cobra.Command + InitDescribeFlags(&cmd) + if cmd.Flag("name") == nil { + t.Errorf("Could not find the flag name") + } + if cmd.Flag("namespace") == nil { + t.Errorf("Could not find the flag namespace") + } + if cmd.Flag("format") == nil { + t.Errorf("Could not find the flag format") + } +} + +func TestInitDeleteFlags(t *testing.T) { + var cmd cobra.Command + InitDeleteFlags(&cmd) + if cmd.Flag("name") == nil { + t.Errorf("Could not find the flag name") + } + if cmd.Flag("namespace") == nil { + t.Errorf("Could not find the flag namespace") + } + if cmd.Flag("file") == nil { + t.Errorf("Could not find the flag file") + } +} + +var content = `apiVersion: flow.volcano.sh/v1alpha1 +kind: JobFlow +metadata: + name: test-a + namespace: default +spec: + jobRetainPolicy: delete # After jobflow runs, keep the generated job. Otherwise, delete it. + flows: + - name: a + - name: b + dependsOn: + targets: ['a'] + - name: c + dependsOn: + targets: ['b'] + - name: d + dependsOn: + targets: ['b'] + - name: e + dependsOn: + targets: ['c','d'] +--- +apiVersion: flow.volcano.sh/v1alpha1 +kind: JobFlow +metadata: + name: test-b + namespace: default +spec: + jobRetainPolicy: delete # After jobflow runs, keep the generated job. Otherwise, delete it. + flows: + - name: a + - name: b + dependsOn: + targets: ['a'] + - name: c + dependsOn: + targets: ['b'] + - name: d + dependsOn: + targets: ['b'] + - name: e + dependsOn: + targets: ['c','d'] +---` diff --git a/pkg/cli/jobflow/list.go b/pkg/cli/jobflow/list.go new file mode 100644 index 0000000000..0297e92843 --- /dev/null +++ b/pkg/cli/jobflow/list.go @@ -0,0 +1,127 @@ +package jobflow + +import ( + "context" + "fmt" + "io" + "os" + "time" + + "github.com/spf13/cobra" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/duration" + + "volcano.sh/apis/pkg/apis/flow/v1alpha1" + "volcano.sh/apis/pkg/client/clientset/versioned" + "volcano.sh/volcano/pkg/cli/util" +) + +const ( + // Name jobflow name + Name string = "Name" + // Namespace jobflow namespace + Namespace string = "Namespace" + // Phase jobflow phase + Phase string = "Phase" + // Age jobflow age + Age string = "Age" +) + +type listFlags struct { + util.CommonFlags + // Namespace jobflow namespace + Namespace string + // AllNamespace all namespace flag + AllNamespace bool +} + +var listJobFlowFlags = &listFlags{} + +// InitListFlags inits all flags. +func InitListFlags(cmd *cobra.Command) { + util.InitFlags(cmd, &listJobFlowFlags.CommonFlags) + cmd.Flags().StringVarP(&listJobFlowFlags.Namespace, "namespace", "n", "default", "the namespace of jobflow") + cmd.Flags().BoolVarP(&listJobFlowFlags.AllNamespace, "all-namespaces", "", false, "list jobflows in all namespaces") +} + +// ListJobFlow lists all jobflow. +func ListJobFlow(ctx context.Context) error { + config, err := util.BuildConfig(listJobFlowFlags.Master, listJobFlowFlags.Kubeconfig) + if err != nil { + return err + } + + if listJobFlowFlags.AllNamespace { + listJobFlowFlags.Namespace = "" + } + + jobClient := versioned.NewForConfigOrDie(config) + jobFlows, err := jobClient.FlowV1alpha1().JobFlows(listJobFlowFlags.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + if len(jobFlows.Items) == 0 { + fmt.Printf("No resources found\n") + return nil + } + PrintJobFlows(jobFlows, os.Stdout) + + return nil +} + +// PrintJobFlows prints all the jobflows. +func PrintJobFlows(jobFlows *v1alpha1.JobFlowList, writer io.Writer) { + // Calculate the max length of the name, namespace phase age on list. + maxNameLen, maxNamespaceLen, maxPhaseLen, maxAgeLen := calculateMaxInfoLength(jobFlows) + columnSpacing := 4 + maxNameLen += columnSpacing + maxNamespaceLen += columnSpacing + maxPhaseLen += columnSpacing + maxAgeLen += columnSpacing + formatStr := fmt.Sprintf("%%-%ds%%-%ds%%-%ds%%-%ds\n", maxNameLen, maxNamespaceLen, maxPhaseLen, maxAgeLen) + // Print the header. + _, err := fmt.Fprintf(writer, formatStr, Name, Namespace, Phase, Age) + if err != nil { + fmt.Printf("Failed to print JobFlow command result: %s.\n", err) + } + // Print the jobflows. + for _, jobFlow := range jobFlows.Items { + _, err := fmt.Fprintf(writer, formatStr, jobFlow.Name, jobFlow.Namespace, jobFlow.Status.State.Phase, translateTimestampSince(jobFlow.CreationTimestamp)) + if err != nil { + fmt.Printf("Failed to print JobFlow command result: %s.\n", err) + } + } +} + +// calculateMaxInfoLength calculates the maximum length of the Name, Namespace Phase fields. +func calculateMaxInfoLength(jobFlows *v1alpha1.JobFlowList) (int, int, int, int) { + maxNameLen := len(Name) + maxNamespaceLen := len(Namespace) + maxStatusLen := len(Phase) + maxAgeLen := len(Age) + for _, jobFlow := range jobFlows.Items { + if len(jobFlow.Name) > maxNameLen { + maxNameLen = len(jobFlow.Name) + } + if len(jobFlow.Namespace) > maxNamespaceLen { + maxNamespaceLen = len(jobFlow.Namespace) + } + if len(jobFlow.Status.State.Phase) > maxStatusLen { + maxStatusLen = len(jobFlow.Status.State.Phase) + } + ageLen := translateTimestampSince(jobFlow.CreationTimestamp) + if len(ageLen) > maxAgeLen { + maxAgeLen = len(ageLen) + } + } + return maxNameLen, maxNamespaceLen, maxStatusLen, maxAgeLen +} + +// translateTimestampSince translates a timestamp into a human-readable string using time.Since. +func translateTimestampSince(timestamp metav1.Time) string { + if timestamp.IsZero() { + return "" + } + return duration.HumanDuration(time.Since(timestamp.Time)) +} diff --git a/test/e2e/vcctl/vcctl.go b/test/e2e/vcctl/vcctl.go index 07d336849f..91799aabf8 100644 --- a/test/e2e/vcctl/vcctl.go +++ b/test/e2e/vcctl/vcctl.go @@ -34,6 +34,7 @@ Usage: Available Commands: help Help about any command job vcctl command line operation job + jobflow vcctl command line operation jobflow jobtemplate vcctl command line operation jobtemplate queue Queue Operations version Print the version information