Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add vcctl jobflow command #3543

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions cmd/cli/jobflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main
googs1025 marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/spf13/cobra"

"volcano.sh/volcano/cmd/cli/util"
"volcano.sh/volcano/pkg/cli/jobflow"
)

func buildJobFlowCmd() *cobra.Command {
jobFlowCmd := &cobra.Command{
Use: "jobflow",
Short: "vcctl command line operation jobflow",
}

jobFlowCommandMap := map[string]struct {
Short string
RunFunction func(cmd *cobra.Command, args []string)
InitFlags func(cmd *cobra.Command)
}{
"create": {
Short: "create a jobflow",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.CreateJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitCreateFlags,
},
"list": {
Short: "list jobflows",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.ListJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitListFlags,
},
"get": {
Short: "get a jobflow",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.GetJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitGetFlags,
},
"delete": {
Short: "delete a jobflow",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.DeleteJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitDeleteFlags,
},
"describe": {
Short: "describe a jobflow",
RunFunction: func(cmd *cobra.Command, args []string) {
util.CheckError(cmd, jobflow.DescribeJobFlow(cmd.Context()))
},
InitFlags: jobflow.InitDescribeFlags,
},
}

for command, config := range jobFlowCommandMap {
cmd := &cobra.Command{
Use: command,
Short: config.Short,
Run: config.RunFunction,
}
config.InitFlags(cmd)
jobFlowCmd.AddCommand(cmd)
}

return jobFlowCmd
}
1 change: 1 addition & 0 deletions cmd/cli/vcctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func main() {
rootCmd.AddCommand(buildJobCmd())
rootCmd.AddCommand(buildQueueCmd())
rootCmd.AddCommand(buildJobTemplateCmd())
rootCmd.AddCommand(buildJobFlowCmd())
rootCmd.AddCommand(versionCommand())

code := cli.Run(&rootCmd)
Expand Down
76 changes: 76 additions & 0 deletions pkg/cli/jobflow/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package jobflow

import (
"context"
"fmt"
"os"
"strings"

"github.com/spf13/cobra"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

flowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/cli/util"
)

type createFlags struct {
util.CommonFlags
// FilePath is the file path of jobflow
FilePath string
}

var createJobFlowFlags = &createFlags{}

// InitCreateFlags is used to init all flags during queue creating.
func InitCreateFlags(cmd *cobra.Command) {
util.InitFlags(cmd, &createJobFlowFlags.CommonFlags)
cmd.Flags().StringVarP(&createJobFlowFlags.FilePath, "file", "f", "", "the path to the YAML file containing the jobflow")
}

// CreateJobFlow create a jobflow.
func CreateJobFlow(ctx context.Context) error {
config, err := util.BuildConfig(createJobFlowFlags.Master, createJobFlowFlags.Kubeconfig)
if err != nil {
return err
}

// Read YAML data from a file.
yamlData, err := os.ReadFile(createJobFlowFlags.FilePath)
if err != nil {
return err
}
// Split YAML data into individual documents.
yamlDocs := strings.Split(string(yamlData), "---")

jobFlowClient := versioned.NewForConfigOrDie(config)
createdCount := 0
for _, doc := range yamlDocs {
// Skip empty documents or documents with only whitespace.
doc = strings.TrimSpace(doc)
if doc == "" {
continue
}

// Parse each YAML document into a JobFlow object.
obj := &flowv1alpha1.JobFlow{}
if err = yaml.Unmarshal([]byte(doc), obj); err != nil {
return err
}
// Set the namespace if it's not specified.
if obj.Namespace == "" {
obj.Namespace = "default"
}

_, err = jobFlowClient.FlowV1alpha1().JobFlows(obj.Namespace).Create(ctx, obj, metav1.CreateOptions{})
if err == nil {
fmt.Printf("Created JobFlow: %s/%s\n", obj.Namespace, obj.Name)
createdCount++
} else {
fmt.Printf("Failed to create JobFlow: %v\n", err)
}
}
return nil
}
104 changes: 104 additions & 0 deletions pkg/cli/jobflow/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package jobflow

import (
"context"
"fmt"
"os"
"strings"

"github.com/spf13/cobra"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

flowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/cli/util"
)

type deleteFlags struct {
util.CommonFlags

// Name is name of jobflow
Name string
// Namespace is namespace of jobflow
Namespace string
// FilePath is the file path of jobflow
FilePath string
}

var deleteJobFlowFlags = &deleteFlags{}

// InitDeleteFlags is used to init all flags during jobflow deleting.
func InitDeleteFlags(cmd *cobra.Command) {
util.InitFlags(cmd, &deleteJobFlowFlags.CommonFlags)
cmd.Flags().StringVarP(&deleteJobFlowFlags.Name, "name", "N", "", "the name of jobflow")
cmd.Flags().StringVarP(&deleteJobFlowFlags.Namespace, "namespace", "n", "default", "the namespace of jobflow")
cmd.Flags().StringVarP(&deleteJobFlowFlags.FilePath, "file", "f", "", "the path to the YAML file containing the jobflow")
}

// DeleteJobFlow is used to delete a jobflow.
func DeleteJobFlow(ctx context.Context) error {
config, err := util.BuildConfig(deleteJobFlowFlags.Master, deleteJobFlowFlags.Kubeconfig)
if err != nil {
return err
}

jobFlowClient := versioned.NewForConfigOrDie(config)
if err != nil {
return err
}

if deleteJobFlowFlags.FilePath != "" {
yamlData, err := os.ReadFile(deleteJobFlowFlags.FilePath)
if err != nil {
return err
}

yamlDocs := strings.Split(string(yamlData), "---")

deletedCount := 0
for _, doc := range yamlDocs {
doc = strings.TrimSpace(doc)
if doc == "" {
continue
}

jobFlow := &flowv1alpha1.JobFlow{}
if err := yaml.Unmarshal([]byte(doc), jobFlow); err != nil {
return err
}

if jobFlow.Namespace == "" {
jobFlow.Namespace = "default"
}

err := jobFlowClient.FlowV1alpha1().JobFlows(jobFlow.Namespace).Delete(ctx, jobFlow.Name, metav1.DeleteOptions{})
if err == nil {
fmt.Printf("Deleted JobFlow: %s/%s\n", jobFlow.Namespace, jobFlow.Name)
deletedCount++
} else {
fmt.Printf("Failed to delete JobFlow: %v\n", err)
}
}
return nil
}

if deleteJobFlowFlags.Name == "" {
return fmt.Errorf("jobflow name must be specified")
}

jobFlow, err := jobFlowClient.FlowV1alpha1().JobFlows(deleteJobFlowFlags.Namespace).Get(ctx, deleteJobFlowFlags.Name, metav1.GetOptions{})
if err != nil {
return err
}

err = jobFlowClient.FlowV1alpha1().JobFlows(jobFlow.Namespace).Delete(ctx, jobFlow.Name, metav1.DeleteOptions{})
if err != nil {
return err
}

fmt.Printf("Deleted JobFlow: %s/%s\n", jobFlow.Namespace, jobFlow.Name)

return nil
}
111 changes: 111 additions & 0 deletions pkg/cli/jobflow/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package jobflow

import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/spf13/cobra"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

"volcano.sh/apis/pkg/apis/flow/v1alpha1"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/cli/util"
)

type describeFlags struct {
util.CommonFlags

// Name is name of jobflow
Name string
// Namespace is namespace of jobflow
Namespace string
// Format print format: yaml or json format
Format string
}

var describeJobFlowFlags = &describeFlags{}

// InitDescribeFlags is used to init all flags.
func InitDescribeFlags(cmd *cobra.Command) {
util.InitFlags(cmd, &describeJobFlowFlags.CommonFlags)
cmd.Flags().StringVarP(&describeJobFlowFlags.Name, "name", "N", "", "the name of jobflow")
cmd.Flags().StringVarP(&describeJobFlowFlags.Namespace, "namespace", "n", "default", "the namespace of jobflow")
cmd.Flags().StringVarP(&describeJobFlowFlags.Format, "format", "o", "yaml", "the format of output")
}

// DescribeJobFlow is used to get the particular jobflow details.
func DescribeJobFlow(ctx context.Context) error {
config, err := util.BuildConfig(describeJobFlowFlags.Master, describeJobFlowFlags.Kubeconfig)
if err != nil {
return err
}
jobFlowClient := versioned.NewForConfigOrDie(config)

// Get jobflow list detail
if describeJobFlowFlags.Name == "" {
jobFlows, err := jobFlowClient.FlowV1alpha1().JobFlows(describeJobFlowFlags.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for i, jobFlow := range jobFlows.Items {
// Remove managedFields
jobFlow.ManagedFields = nil
PrintJobFlowDetail(&jobFlow, describeJobFlowFlags.Format)
// Print a separator if it's not the last element
if len(jobFlows.Items) != 1 && i < len(jobFlows.Items)-1 {
fmt.Println("---------------------------------")
}
}
// Get jobflow detail
} else {
jobFlow, err := jobFlowClient.FlowV1alpha1().JobFlows(describeJobFlowFlags.Namespace).Get(ctx, describeJobFlowFlags.Name, metav1.GetOptions{})
if err != nil {
return err
}
// Remove managedFields
jobFlow.ManagedFields = nil
// Set APIVersion and Kind if not set
if jobFlow.APIVersion == "" || jobFlow.Kind == "" {
jobFlow.APIVersion = v1alpha1.SchemeGroupVersion.String()
jobFlow.Kind = "JobFlow"
}
PrintJobFlowDetail(jobFlow, describeJobFlowFlags.Format)
googs1025 marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

// PrintJobFlowDetail print jobflow details
func PrintJobFlowDetail(jobFlow *v1alpha1.JobFlow, format string) {
switch format {
case "json":
printJSON(jobFlow)
case "yaml":
printYAML(jobFlow)
default:
fmt.Printf("Unsupported format: %s", format)
}
}

func printJSON(jobFlow *v1alpha1.JobFlow) {
b, err := json.MarshalIndent(jobFlow, "", " ")
if err != nil {
fmt.Printf("Error marshaling JSON: %v\n", err)
return
}
os.Stdout.Write(b)
fmt.Println("")
}

func printYAML(jobFlow *v1alpha1.JobFlow) {
b, err := yaml.Marshal(jobFlow)
if err != nil {
fmt.Printf("Error marshaling YAML: %v\n", err)
return
}
os.Stdout.Write(b)
}
Loading
Loading