Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yongming Ding <dyongming@vmware.com>
  • Loading branch information
Yongming Ding committed Jul 14, 2022
1 parent f55ec63 commit 4d2f21e
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 125 deletions.
18 changes: 10 additions & 8 deletions docs/networkpolicy-recommendation.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ CLI. `theia` is the command-line tool which provides access to Theia network
flow visibility capabilities. To get more information about `theia`, please
refer to its [user guide](theia-cli.md).

There are 5 `theia` commands for the NetworkPolicy Recommendation feature:
The following `theia` commands for the NetworkPolicy Recommendation feature are
available:

- `theia policy-recommendation run`
- `theia policy-recommendation status`
Expand Down Expand Up @@ -143,21 +144,22 @@ kubectl apply -f recommended_policies.yml

### List all policy recommendation jobs

The `theia policy-recommendation list` command lists all policy recommendation
jobs. `CreateTime`, `CompleteTime`, `ID` and `Status` of each policy
recommendation job will be displayed in the form of a table. For example:
The `theia policy-recommendation list` command lists all undeleted policy
recommendation jobs. `CreationTime`, `CompletionTime`, `ID` and `Status` of each
policy recommendation job will be displayed in table format. For example:

```bash
> theia policy-recommendation list
CreateTime CompleteTime ID Status
2022-06-17 18:33:15 N/A 2cf13427-cbe5-454c-b9d3-e1124af7baa2 RUNNING
2022-06-17 18:06:56 2022-06-17 18:08:37 e998433e-accb-4888-9fc8-06563f073e86 COMPLETED
CreationTime CompletionTime ID Status
2022-06-17 18:33:15 N/A 2cf13427-cbe5-454c-b9d3-e1124af7baa2 RUNNING
2022-06-17 18:06:56 2022-06-17 18:08:37 e998433e-accb-4888-9fc8-06563f073e86 COMPLETED
```

### Delete a policy recommendation job

The `theia policy-recommendation delete` command is used to delete a policy
recommendation job. To delete the policy recommendation job created above, run:
recommendation job. Please proceed with caution since deletion cannot be
undone. To delete the policy recommendation job created above, run:

```bash
$ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86
Expand Down
57 changes: 8 additions & 49 deletions pkg/theia/commands/policy_recommendation_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ package commands
import (
"context"
"fmt"
"net/url"

"github.com/google/uuid"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"

Expand All @@ -45,9 +43,9 @@ $ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86
if recoID == "" && len(args) == 1 {
recoID = args[0]
}
_, err = uuid.Parse(recoID)
err = ParseRecommendationID(recoID)
if err != nil {
return fmt.Errorf("failed to decode input id %s into a UUID, err: %v", recoID, err)
return err
}
kubeconfig, err := ResolveKubeConfig(cmd)
if err != nil {
Expand All @@ -58,9 +56,9 @@ $ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86
return err
}
if endpoint != "" {
_, err := url.ParseRequestURI(endpoint)
err = ParseEndpoint(endpoint)
if err != nil {
return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err)
return err
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
Expand All @@ -75,7 +73,7 @@ $ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86

idMap, err := getPolicyRecommendationIdMap(clientset, kubeconfig, endpoint, useClusterIP)
if err != nil {
return fmt.Errorf("err when get policy recommendation ID map, %v", err)
return fmt.Errorf("err when getting policy recommendation ID map, %v", err)
}

if _, ok := idMap[recoID]; !ok {
Expand Down Expand Up @@ -125,41 +123,13 @@ func getPolicyRecommendationIdMap(clientset kubernetes.Interface, kubeconfig str
}

func deletePolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, recoID string) (err error) {
if endpoint == "" {
service := "clickhouse-clickhouse"
if useClusterIP {
serviceIP, servicePort, err := GetServiceAddr(clientset, service)
if err != nil {
return fmt.Errorf("error when getting the ClickHouse Service address: %v", err)
}
endpoint = fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort)
} else {
listenAddress := "localhost"
listenPort := 9000
_, servicePort, err := GetServiceAddr(clientset, service)
if err != nil {
return fmt.Errorf("error when getting the ClickHouse Service port: %v", err)
}
// Forward the ClickHouse service port
pf, err := StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort)
if err != nil {
return fmt.Errorf("error when forwarding port: %v", err)
}
defer pf.Stop()
endpoint = fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort)
}
connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP)
if portForward != nil {
defer portForward.Stop()
}

// Connect to ClickHouse and get the result
username, password, err := getClickHouseSecret(clientset)
if err != nil {
return err
}
url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password)
connect, err := connectClickHouse(clientset, url)
if err != nil {
return fmt.Errorf("error when connecting to ClickHouse, %v", err)
}
query := "ALTER TABLE recommendations DELETE WHERE id = (?);"
_, err = connect.Exec(query, recoID)
if err != nil {
Expand All @@ -176,15 +146,4 @@ func init() {
"",
"ID of the policy recommendation Spark job.",
)
policyRecommendationDeleteCmd.Flags().String(
"clickhouse-endpoint",
"",
"The ClickHouse service endpoint.",
)
policyRecommendationDeleteCmd.Flags().Bool(
"use-cluster-ip",
false,
`Enable this option will use Service ClusterIP instead of port forwarding when connecting to the ClickHouse service.
It can only be used when running theia in cluster.`,
)
}
54 changes: 7 additions & 47 deletions pkg/theia/commands/policy_recommendation_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package commands
import (
"context"
"fmt"
"net/url"
"strings"
"time"

Expand Down Expand Up @@ -56,9 +55,9 @@ $ theia policy-recommendation list
return err
}
if endpoint != "" {
_, err := url.ParseRequestURI(endpoint)
err = ParseEndpoint(endpoint)
if err != nil {
return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err)
return err
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
Expand Down Expand Up @@ -88,7 +87,7 @@ $ theia policy-recommendation list
}

sparkApplicationTable := [][]string{
{"CreateTime", "CompleteTime", "ID", "Status"},
{"CreationTime", "CompletionTime", "ID", "Status"},
}
idMap := make(map[string]bool)
for _, sparkApplication := range sparkApplicationList.Items {
Expand Down Expand Up @@ -122,41 +121,13 @@ $ theia policy-recommendation list
}

func getCompletedPolicyRecommendationList(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (completedPolicyRecommendationList []policyRecommendationRow, err error) {
if endpoint == "" {
service := "clickhouse-clickhouse"
if useClusterIP {
serviceIP, servicePort, err := GetServiceAddr(clientset, service)
if err != nil {
return completedPolicyRecommendationList, fmt.Errorf("error when getting the ClickHouse Service address: %v", err)
}
endpoint = fmt.Sprintf("tcp://%s:%d", serviceIP, servicePort)
} else {
listenAddress := "localhost"
listenPort := 9000
_, servicePort, err := GetServiceAddr(clientset, service)
if err != nil {
return completedPolicyRecommendationList, fmt.Errorf("error when getting the ClickHouse Service port: %v", err)
}
// Forward the ClickHouse service port
pf, err := StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort)
if err != nil {
return completedPolicyRecommendationList, fmt.Errorf("error when forwarding port: %v", err)
}
defer pf.Stop()
endpoint = fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort)
}
connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP)
if portForward != nil {
defer portForward.Stop()
}

// Connect to ClickHouse and get the result
username, password, err := getClickHouseSecret(clientset)
if err != nil {
return completedPolicyRecommendationList, err
}
url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password)
connect, err := connectClickHouse(clientset, url)
if err != nil {
return completedPolicyRecommendationList, fmt.Errorf("error when connecting to ClickHouse, %v", err)
}
query := "SELECT timeCreated, id FROM recommendations;"
rows, err := connect.Query(query)
if err != nil {
Expand All @@ -167,7 +138,7 @@ func getCompletedPolicyRecommendationList(clientset kubernetes.Interface, kubeco
var row policyRecommendationRow
err := rows.Scan(&row.timeComplete, &row.id)
if err != nil {
return completedPolicyRecommendationList, fmt.Errorf("err when scaning recommendations row %v", err)
return completedPolicyRecommendationList, fmt.Errorf("err when scanning recommendations row %v", err)
}
completedPolicyRecommendationList = append(completedPolicyRecommendationList, row)
}
Expand All @@ -176,15 +147,4 @@ func getCompletedPolicyRecommendationList(clientset kubernetes.Interface, kubeco

func init() {
policyRecommendationCmd.AddCommand(policyRecommendationListCmd)
policyRecommendationListCmd.Flags().String(
"clickhouse-endpoint",
"",
"The ClickHouse service endpoint.",
)
policyRecommendationListCmd.Flags().Bool(
"use-cluster-ip",
false,
`Enable this option will use Service ClusterIP instead of port forwarding when connecting to the ClickHouse service.
It can only be used when running theia in cluster.`,
)
}
10 changes: 4 additions & 6 deletions pkg/theia/commands/policy_recommendation_retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
"database/sql"
"fmt"
"io/ioutil"
"net/url"

"github.com/google/uuid"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
)
Expand Down Expand Up @@ -53,9 +51,9 @@ $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --us
if recoID == "" && len(args) == 1 {
recoID = args[0]
}
_, err = uuid.Parse(recoID)
err = ParseRecommendationID(recoID)
if err != nil {
return fmt.Errorf("failed to decode input id %s into a UUID, err: %v", recoID, err)
return err
}
kubeconfig, err := ResolveKubeConfig(cmd)
if err != nil {
Expand All @@ -66,9 +64,9 @@ $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --us
return err
}
if endpoint != "" {
_, err := url.ParseRequestURI(endpoint)
err = ParseEndpoint(endpoint)
if err != nil {
return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err)
return err
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
Expand Down
5 changes: 2 additions & 3 deletions pkg/theia/commands/policy_recommendation_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -332,9 +331,9 @@ Job is still running. Please check completion status for job via CLI later.`, re
return err
}
if endpoint != "" {
_, err := url.ParseRequestURI(endpoint)
err = ParseEndpoint(endpoint)
if err != nil {
return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err)
return err
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
Expand Down
10 changes: 4 additions & 6 deletions pkg/theia/commands/policy_recommendation_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

"github.com/google/uuid"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -56,9 +54,9 @@ $ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 --use-
if recoID == "" && len(args) == 1 {
recoID = args[0]
}
_, err = uuid.Parse(recoID)
err = ParseRecommendationID(recoID)
if err != nil {
return fmt.Errorf("failed to decode input id %s into a UUID, err: %v", recoID, err)
return err
}
kubeconfig, err := ResolveKubeConfig(cmd)
if err != nil {
Expand All @@ -73,9 +71,9 @@ $ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 --use-
return err
}
if endpoint != "" {
_, err := url.ParseRequestURI(endpoint)
err = ParseEndpoint(endpoint)
if err != nil {
return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err)
return err
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
Expand Down
18 changes: 18 additions & 0 deletions pkg/theia/commands/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"context"
"database/sql"
"fmt"
"net/url"
"os"
"strings"
"text/tabwriter"
"time"

"github.com/ClickHouse/clickhouse-go"
"github.com/google/uuid"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -261,3 +263,19 @@ func FormatTimestamp(timestamp time.Time) string {
}
return timestamp.UTC().Format("2006-01-02 15:04:05")
}

func ParseEndpoint(endpoint string) error {
_, err := url.ParseRequestURI(endpoint)
if err != nil {
return fmt.Errorf("input endpoint %s does not seem a valid URL, parsing error: %v", endpoint, err)
}
return nil
}

func ParseRecommendationID(recommendationID string) error {
_, err := uuid.Parse(recommendationID)
if err != nil {
return fmt.Errorf("input id %s does not seem a valid UUID, parsing error:: %v", recommendationID, err)
}
return nil
}
13 changes: 7 additions & 6 deletions test/e2e/policyrecommendation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,19 @@ func testPolicyRecommendationStatus(t *testing.T, data *TestData) {
}

// Example output:
// CreateTime ID Status
// 2022-06-10 15:03:24 615026a0-1856-4107-87d9-08f7d69819ae RUNNING
// 2022-06-10 15:03:22 7bebe4f9-408b-4dd8-9d63-9dc538073089 COMPLETED
// 2022-06-10 15:03:39 c7a9e768-559a-4bfb-b0c8-a0291b4c208c SUBMITTED

// CreationTime CompletionTime ID Status
// 2022-06-17 15:03:24 N/A 615026a0-1856-4107-87d9-08f7d69819ae RUNNING
// 2022-06-17 15:03:22 2022-06-17 18:08:37 7bebe4f9-408b-4dd8-9d63-9dc538073089 COMPLETED
// 2022-06-17 15:03:39 N/A c7a9e768-559a-4bfb-b0c8-a0291b4c208c SUBMITTED
func testPolicyRecommendationList(t *testing.T, data *TestData) {
_, jobId, err := runJob(t, data)
require.NoError(t, err)
stdout, err := listJobs(t, data)
require.NoError(t, err)
assert := assert.New(t)
assert.Containsf(stdout, "CreateTime", "stdout: %s", stdout)
assert.Containsf(stdout, "CompleteTime", "stdout: %s", stdout)
assert.Containsf(stdout, "CreationTime", "stdout: %s", stdout)
assert.Containsf(stdout, "CompletionTime", "stdout: %s", stdout)
assert.Containsf(stdout, "ID", "stdout: %s", stdout)
assert.Containsf(stdout, "Status", "stdout: %s", stdout)
assert.Containsf(stdout, jobId, "stdout: %s", stdout)
Expand Down

0 comments on commit 4d2f21e

Please sign in to comment.