Skip to content

Commit

Permalink
feat: add vcctl jobflow command
Browse files Browse the repository at this point in the history
Signed-off-by: googs1025 <googs1025@gmail.com>
  • Loading branch information
googs1025 committed Jun 24, 2024
1 parent 2e673bc commit c326d09
Show file tree
Hide file tree
Showing 8 changed files with 1,069 additions and 0 deletions.
69 changes: 69 additions & 0 deletions cmd/cli/jobflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"github.com/spf13/cobra"

"volcano.sh/volcano/cmd/cli/util"
"volcano.sh/volcano/pkg/cli/jobflow"
)

func buildJobFlowCmd() *cobra.Command {
jobFlowCmd := &cobra.Command{
Use: "jobflow",
Short: "vcctl command line operation jobflow",
}

jobFlowCommandMap := map[string]struct {
Short string
RunFunction func(cmd *cobra.Command, args []string)
InitFlags func(cmd *cobra.Command)
}{
"create": {
Short: "create a jobflow",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.CreateJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitCreateFlags,
},
"list": {
Short: "list jobflows",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.ListJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitListFlags,
},
"get": {
Short: "get a jobflow",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.GetJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitGetFlags,
},
"delete": {
Short: "delete a jobflow",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.DeleteJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitDeleteFlags,
},
"describe": {
Short: "describe a jobflow",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.DescribeJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitDescribeFlags,
},
}

for command, config := range jobFlowCommandMap {
cmd := &cobra.Command{
Use: command,
Short: config.Short,
Run: config.RunFunction,
}
config.InitFlags(cmd)
jobFlowCmd.AddCommand(cmd)
}

return jobFlowCmd
}
1 change: 1 addition & 0 deletions cmd/cli/vcctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func main() {
rootCmd.AddCommand(buildJobCmd())
rootCmd.AddCommand(buildQueueCmd())
rootCmd.AddCommand(buildJobTemplateCmd())
rootCmd.AddCommand(buildJobFlowCmd())
rootCmd.AddCommand(versionCommand())

code := cli.Run(&rootCmd)
Expand Down
76 changes: 76 additions & 0 deletions pkg/cli/jobflow/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package jobflow

import (
"context"
"fmt"
"os"
"strings"

"github.com/spf13/cobra"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

flowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/cli/util"
)

type createFlags struct {
util.CommonFlags
// FilePath is the file path of jobflow
FilePath string
}

var createJobFlowFlags = &createFlags{}

// InitCreateFlags is used to init all flags during queue creating.
func InitCreateFlags(cmd *cobra.Command) {
util.InitFlags(cmd, &createJobFlowFlags.CommonFlags)
cmd.Flags().StringVarP(&createJobFlowFlags.FilePath, "file", "f", "", "the path to the YAML file containing the jobflow")
}

// CreateJobFlow create a jobflow.
func CreateJobFlow(ctx context.Context) error {
config, err := util.BuildConfig(createJobFlowFlags.Master, createJobFlowFlags.Kubeconfig)
if err != nil {
return err
}

// Read YAML data from a file.
yamlData, err := os.ReadFile(createJobFlowFlags.FilePath)
if err != nil {
return err
}
// Split YAML data into individual documents.
yamlDocs := strings.Split(string(yamlData), "---")

jobFlowClient := versioned.NewForConfigOrDie(config)
createdCount := 0
for _, doc := range yamlDocs {
// Skip empty documents or documents with only whitespace.
doc = strings.TrimSpace(doc)
if doc == "" {
continue
}

// Parse each YAML document into a JobFlow object.
obj := &flowv1alpha1.JobFlow{}
if err = yaml.Unmarshal([]byte(doc), obj); err != nil {
return err
}
// Set the namespace if it's not specified.
if obj.Namespace == "" {
obj.Namespace = "default"
}

_, err = jobFlowClient.FlowV1alpha1().JobFlows(obj.Namespace).Create(ctx, obj, metav1.CreateOptions{})
if err == nil {
fmt.Printf("Created JobFlow: %s/%s\n", obj.Namespace, obj.Name)
createdCount++
} else {
fmt.Printf("Failed to create JobFlow: %v\n", err)
}
}
return nil
}
104 changes: 104 additions & 0 deletions pkg/cli/jobflow/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package jobflow

import (
"context"
"fmt"
"os"
"strings"

"github.com/spf13/cobra"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

flowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/cli/util"
)

type deleteFlags struct {
util.CommonFlags

// Name is name of jobflow
Name string
// Namespace is namespace of jobflow
Namespace string
// FilePath is the file path of jobflow
FilePath string
}

var deleteJobFlowFlags = &deleteFlags{}

// InitDeleteFlags is used to init all flags during jobflow deleting.
func InitDeleteFlags(cmd *cobra.Command) {
util.InitFlags(cmd, &deleteJobFlowFlags.CommonFlags)
cmd.Flags().StringVarP(&deleteJobFlowFlags.Name, "name", "N", "", "the name of jobflow")
cmd.Flags().StringVarP(&deleteJobFlowFlags.Namespace, "namespace", "n", "default", "the namespace of jobflow")
cmd.Flags().StringVarP(&deleteJobFlowFlags.FilePath, "file", "f", "", "the path to the YAML file containing the jobflow")
}

// DeleteJobFlow is used to delete a jobflow.
func DeleteJobFlow(ctx context.Context) error {
config, err := util.BuildConfig(deleteJobFlowFlags.Master, deleteJobFlowFlags.Kubeconfig)
if err != nil {
return err
}

jobFlowClient := versioned.NewForConfigOrDie(config)
if err != nil {
return err
}

if deleteJobFlowFlags.FilePath != "" {
yamlData, err := os.ReadFile(deleteJobFlowFlags.FilePath)
if err != nil {
return err
}

yamlDocs := strings.Split(string(yamlData), "---")

deletedCount := 0
for _, doc := range yamlDocs {
doc = strings.TrimSpace(doc)
if doc == "" {
continue
}

jobFlow := &flowv1alpha1.JobFlow{}
if err := yaml.Unmarshal([]byte(doc), jobFlow); err != nil {
return err
}

if jobFlow.Namespace == "" {
jobFlow.Namespace = "default"
}

err := jobFlowClient.FlowV1alpha1().JobFlows(jobFlow.Namespace).Delete(ctx, jobFlow.Name, metav1.DeleteOptions{})
if err == nil {
fmt.Printf("Deleted JobFlow: %s/%s\n", jobFlow.Namespace, jobFlow.Name)
deletedCount++
} else {
fmt.Printf("Failed to delete JobFlow: %v\n", err)
}
}
return nil
}

if deleteJobFlowFlags.Name == "" {
return fmt.Errorf("jobflow name must be specified")
}

jobFlow, err := jobFlowClient.FlowV1alpha1().JobFlows(deleteJobFlowFlags.Namespace).Get(ctx, deleteJobFlowFlags.Name, metav1.GetOptions{})
if err != nil {
return err
}

err = jobFlowClient.FlowV1alpha1().JobFlows(jobFlow.Namespace).Delete(ctx, jobFlow.Name, metav1.DeleteOptions{})
if err != nil {
return err
}

fmt.Printf("Deleted JobFlow: %s/%s\n", jobFlow.Namespace, jobFlow.Name)

return nil
}
105 changes: 105 additions & 0 deletions pkg/cli/jobflow/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package jobflow

import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/spf13/cobra"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

"volcano.sh/apis/pkg/apis/flow/v1alpha1"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/cli/util"
)

type describeFlags struct {
util.CommonFlags

// Name is name of jobflow
Name string
// Namespace is namespace of jobflow
Namespace string
// Format print format: yaml or json format
Format string
}

var describeJobFlowFlags = &describeFlags{}

// InitDescribeFlags is used to init all flags.
func InitDescribeFlags(cmd *cobra.Command) {
util.InitFlags(cmd, &describeJobFlowFlags.CommonFlags)
cmd.Flags().StringVarP(&describeJobFlowFlags.Name, "name", "N", "", "the name of jobflow")
cmd.Flags().StringVarP(&describeJobFlowFlags.Namespace, "namespace", "n", "default", "the namespace of jobflow")
cmd.Flags().StringVarP(&describeJobFlowFlags.Format, "format", "o", "yaml", "the format of output")
}

// DescribeJobFlow is used to get the particular jobflow details.
func DescribeJobFlow(ctx context.Context) error {
config, err := util.BuildConfig(describeJobFlowFlags.Master, describeJobFlowFlags.Kubeconfig)
if err != nil {
return err
}
jobFlowClient := versioned.NewForConfigOrDie(config)

// Get jobflow list detail
if describeJobFlowFlags.Name == "" {
jobFlows, err := jobFlowClient.FlowV1alpha1().JobFlows(describeJobFlowFlags.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for _, jobFlow := range jobFlows.Items {
PrintJobFlowDetail(&jobFlow, describeJobFlowFlags.Format)
if len(jobFlows.Items) != 1 {
fmt.Println("---------------------------------")
}
}
// Get jobflow detail
} else {
jobFlow, err := jobFlowClient.FlowV1alpha1().JobFlows(describeJobFlowFlags.Namespace).Get(ctx, describeJobFlowFlags.Name, metav1.GetOptions{})
if err != nil {
return err
}
// Set APIVersion and Kind if not set
if jobFlow.APIVersion == "" || jobFlow.Kind == "" {
jobFlow.APIVersion = v1alpha1.SchemeGroupVersion.String()
jobFlow.Kind = "JobFlow"
}
PrintJobFlowDetail(jobFlow, describeJobFlowFlags.Format)
}

return nil
}

// PrintJobFlowDetail print jobflow details
func PrintJobFlowDetail(jobFlow *v1alpha1.JobFlow, format string) {
switch format {
case "json":
printJSON(jobFlow)
case "yaml":
printYAML(jobFlow)
default:
fmt.Printf("Unsupported format: %s", format)
}
}

func printJSON(jobFlow *v1alpha1.JobFlow) {
b, err := json.MarshalIndent(jobFlow, "", " ")
if err != nil {
fmt.Printf("Error marshaling JSON: %v\n", err)
return
}
os.Stdout.Write(b)
}

func printYAML(jobFlow *v1alpha1.JobFlow) {
b, err := yaml.Marshal(jobFlow)
if err != nil {
fmt.Printf("Error marshaling YAML: %v\n", err)
return
}
os.Stdout.Write(b)
}
Loading

0 comments on commit c326d09

Please sign in to comment.