Skip to content

Commit

Permalink
Add list and delete commands in policy recommendation CLI (#56)
Browse files Browse the repository at this point in the history
Signed-off-by: Yongming Ding <dyongming@vmware.com>
  • Loading branch information
Yongming Ding authored Jul 28, 2022
1 parent 741f566 commit 0e0c7c6
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 30 deletions.
33 changes: 32 additions & 1 deletion docs/networkpolicy-recommendation.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
- [Run a policy recommendation job](#run-a-policy-recommendation-job)
- [Check the status of a policy recommendation job](#check-the-status-of-a-policy-recommendation-job)
- [Retrieve the result of a policy recommendation job](#retrieve-the-result-of-a-policy-recommendation-job)
- [List all policy recommendation jobs](#list-all-policy-recommendation-jobs)
- [Delete a policy recommendation job](#delete-a-policy-recommendation-job)
<!-- /toc -->

## Introduction
Expand Down Expand Up @@ -36,17 +38,22 @@ CLI. `theia` is the command-line tool which provides access to Theia network
flow visibility capabilities. To get more information about `theia`, please
refer to its [user guide](theia-cli.md).

There are 3 `theia` commands for the NetworkPolicy Recommendation feature:
The following `theia` commands for the NetworkPolicy Recommendation feature are
available:

- `theia policy-recommendation run`
- `theia policy-recommendation status`
- `theia policy-recommendation retrieve`
- `theia policy-recommendation list`
- `theia policy-recommendation delete`

Or you could use `pr` as a short alias of `policy-recommendation`:

- `theia pr run`
- `theia pr status`
- `theia pr retrieve`
- `theia pr list`
- `theia pr delete`

To see all options and usage examples of these commands, you may run
`theia policy-recommendation [subcommand] --help`.
Expand Down Expand Up @@ -134,3 +141,27 @@ policies to a YAML file and apply it using `kubectl`:
theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 -f recommended_policies.yml
kubectl apply -f recommended_policies.yml
```

### List all policy recommendation jobs

The `theia policy-recommendation list` command lists all undeleted policy
recommendation jobs. `CreationTime`, `CompletionTime`, `ID` and `Status` of each
policy recommendation job will be displayed in table format. For example:

```bash
> theia policy-recommendation list
CreationTime CompletionTime ID Status
2022-06-17 18:33:15 N/A 2cf13427-cbe5-454c-b9d3-e1124af7baa2 RUNNING
2022-06-17 18:06:56 2022-06-17 18:08:37 e998433e-accb-4888-9fc8-06563f073e86 COMPLETED
```

### Delete a policy recommendation job

The `theia policy-recommendation delete` command is used to delete a policy
recommendation job. Please proceed with caution since deletion cannot be
undone. To delete the policy recommendation job created above, run:

```bash
$ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86
Successfully deleted policy recommendation job with ID e998433e-accb-4888-9fc8-06563f073e86
```
4 changes: 3 additions & 1 deletion docs/theia-cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ theia help
## Usage

To see the list of available commands and options, run `theia help`. Currently,
we have 3 commands for the NetworkPolicy Recommendation feature:
we have 5 commands for the NetworkPolicy Recommendation feature:

- `theia policy-recommendation run`
- `theia policy-recommendation status`
- `theia policy-recommendation retrieve`
- `theia policy-recommendation list`
- `theia policy-recommendation delete`

For details, please refer to [NetworkPolicy recommendation doc](
networkpolicy-recommendation.md)
149 changes: 149 additions & 0 deletions pkg/theia/commands/policy_recommendation_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package commands

import (
"context"
"fmt"

"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"

sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2"
)

// policyRecommendationDeleteCmd represents the policy-recommendation delete command
var policyRecommendationDeleteCmd = &cobra.Command{
Use: "delete",
Short: "Delete a policy recommendation Spark job",
Long: `Delete a policy recommendation Spark job by ID.`,
Aliases: []string{"del"},
Args: cobra.RangeArgs(0, 1),
Example: `
Delete the policy recommendation job with ID e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86
`,
RunE: func(cmd *cobra.Command, args []string) error {
recoID, err := cmd.Flags().GetString("id")
if err != nil {
return err
}
if recoID == "" && len(args) == 1 {
recoID = args[0]
}
err = ParseRecommendationID(recoID)
if err != nil {
return err
}
kubeconfig, err := ResolveKubeConfig(cmd)
if err != nil {
return err
}
endpoint, err := cmd.Flags().GetString("clickhouse-endpoint")
if err != nil {
return err
}
if endpoint != "" {
err = ParseEndpoint(endpoint)
if err != nil {
return err
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
if err != nil {
return err
}

clientset, err := CreateK8sClient(kubeconfig)
if err != nil {
return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err)
}

idMap, err := getPolicyRecommendationIdMap(clientset, kubeconfig, endpoint, useClusterIP)
if err != nil {
return fmt.Errorf("err when getting policy recommendation ID map, %v", err)
}

if _, ok := idMap[recoID]; !ok {
return fmt.Errorf("could not find the policy recommendation job with given ID")
}

clientset.CoreV1().RESTClient().Delete().
AbsPath("/apis/sparkoperator.k8s.io/v1beta2").
Namespace(flowVisibilityNS).
Resource("sparkapplications").
Name("pr-" + recoID).
Do(context.TODO())

err = deletePolicyRecommendationResult(clientset, kubeconfig, endpoint, useClusterIP, recoID)
if err != nil {
return err
}

fmt.Printf("Successfully deleted policy recommendation job with ID %s\n", recoID)
return nil
},
}

func getPolicyRecommendationIdMap(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (idMap map[string]bool, err error) {
idMap = make(map[string]bool)
sparkApplicationList := &sparkv1.SparkApplicationList{}
err = clientset.CoreV1().RESTClient().Get().
AbsPath("/apis/sparkoperator.k8s.io/v1beta2").
Namespace(flowVisibilityNS).
Resource("sparkapplications").
Do(context.TODO()).Into(sparkApplicationList)
if err != nil {
return idMap, err
}
for _, sparkApplication := range sparkApplicationList.Items {
id := sparkApplication.ObjectMeta.Name[3:]
idMap[id] = true
}
completedPolicyRecommendationList, err := getCompletedPolicyRecommendationList(clientset, kubeconfig, endpoint, useClusterIP)
if err != nil {
return idMap, err
}
for _, completedPolicyRecommendation := range completedPolicyRecommendationList {
idMap[completedPolicyRecommendation.id] = true
}
return idMap, nil
}

func deletePolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, recoID string) (err error) {
connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP)
if portForward != nil {
defer portForward.Stop()
}
if err != nil {
return err
}
query := "ALTER TABLE recommendations DELETE WHERE id = (?);"
_, err = connect.Exec(query, recoID)
if err != nil {
return fmt.Errorf("failed to delete recommendation result with id %s: %v", recoID, err)
}
return nil
}

func init() {
policyRecommendationCmd.AddCommand(policyRecommendationDeleteCmd)
policyRecommendationDeleteCmd.Flags().StringP(
"id",
"i",
"",
"ID of the policy recommendation Spark job.",
)
}
150 changes: 150 additions & 0 deletions pkg/theia/commands/policy_recommendation_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package commands

import (
"context"
"fmt"
"strings"
"time"

"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"

sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2"
)

type policyRecommendationRow struct {
timeComplete time.Time
id string
}

// policyRecommendationListCmd represents the policy-recommendation list command
var policyRecommendationListCmd = &cobra.Command{
Use: "list",
Short: "List all policy recommendation Spark jobs",
Long: `List all policy recommendation Spark jobs with name, creation time and status.`,
Aliases: []string{"ls"},
Example: `
List all policy recommendation Spark jobs
$ theia policy-recommendation list
`,
RunE: func(cmd *cobra.Command, args []string) error {
kubeconfig, err := ResolveKubeConfig(cmd)
if err != nil {
return err
}
clientset, err := CreateK8sClient(kubeconfig)
if err != nil {
return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err)
}
endpoint, err := cmd.Flags().GetString("clickhouse-endpoint")
if err != nil {
return err
}
if endpoint != "" {
err = ParseEndpoint(endpoint)
if err != nil {
return err
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
if err != nil {
return err
}

err = PolicyRecoPreCheck(clientset)
if err != nil {
return err
}

sparkApplicationList := &sparkv1.SparkApplicationList{}
err = clientset.CoreV1().RESTClient().Get().
AbsPath("/apis/sparkoperator.k8s.io/v1beta2").
Namespace(flowVisibilityNS).
Resource("sparkapplications").
Do(context.TODO()).Into(sparkApplicationList)
if err != nil {
return err
}

completedPolicyRecommendationList, err := getCompletedPolicyRecommendationList(clientset, kubeconfig, endpoint, useClusterIP)

if err != nil {
return err
}

sparkApplicationTable := [][]string{
{"CreationTime", "CompletionTime", "ID", "Status"},
}
idMap := make(map[string]bool)
for _, sparkApplication := range sparkApplicationList.Items {
id := sparkApplication.ObjectMeta.Name[3:]
idMap[id] = true
sparkApplicationTable = append(sparkApplicationTable,
[]string{
FormatTimestamp(sparkApplication.ObjectMeta.CreationTimestamp.Time),
FormatTimestamp(sparkApplication.Status.TerminationTime.Time),
id,
strings.TrimSpace(string(sparkApplication.Status.AppState.State)),
})
}

for _, completedPolicyRecommendation := range completedPolicyRecommendationList {
if _, ok := idMap[completedPolicyRecommendation.id]; !ok {
idMap[completedPolicyRecommendation.id] = true
sparkApplicationTable = append(sparkApplicationTable,
[]string{
"N/A",
FormatTimestamp(completedPolicyRecommendation.timeComplete),
completedPolicyRecommendation.id,
"COMPLETED",
})
}
}

TableOutput(sparkApplicationTable)
return nil
},
}

func getCompletedPolicyRecommendationList(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (completedPolicyRecommendationList []policyRecommendationRow, err error) {
connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP)
if portForward != nil {
defer portForward.Stop()
}
if err != nil {
return completedPolicyRecommendationList, err
}
query := "SELECT timeCreated, id FROM recommendations;"
rows, err := connect.Query(query)
if err != nil {
return completedPolicyRecommendationList, fmt.Errorf("failed to get recommendation jobs: %v", err)
}
defer rows.Close()
for rows.Next() {
var row policyRecommendationRow
err := rows.Scan(&row.timeComplete, &row.id)
if err != nil {
return completedPolicyRecommendationList, fmt.Errorf("err when scanning recommendations row %v", err)
}
completedPolicyRecommendationList = append(completedPolicyRecommendationList, row)
}
return completedPolicyRecommendationList, nil
}

func init() {
policyRecommendationCmd.AddCommand(policyRecommendationListCmd)
}
Loading

0 comments on commit 0e0c7c6

Please sign in to comment.