Skip to content

Commit

Permalink
Adapt CLI to use Theia Manager (#115)
Browse files Browse the repository at this point in the history
1.Create a theia client with the ca.cert and token.
2.Use port-forwarder to connect to Theia Manager when running theia out of the cluster.
3.Add unit test.

Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu authored Oct 19, 2022
1 parent dd818d5 commit e7bd73a
Show file tree
Hide file tree
Showing 19 changed files with 1,240 additions and 1,000 deletions.
2 changes: 2 additions & 0 deletions build/charts/theia/templates/theia-cli/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ rules:
verbs:
- get
- list
- create
- delete
{{- end }}
26 changes: 13 additions & 13 deletions docs/networkpolicy-recommendation.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ To see all options and usage examples of these commands, you may run
The `theia policy-recommendation run` command triggers a new policy
recommendation job.
If a new policy recommendation job is created successfully, the
`recommendation ID` of this job will be returned:
`name` of this job will be returned:

```bash
$ theia policy-recommendation run
Successfully created policy recommendation job with ID e998433e-accb-4888-9fc8-06563f073e86
Successfully created policy recommendation job with name pr-e998433e-accb-4888-9fc8-06563f073e86
```

`recommendation ID` is a universally unique identifier ([UUID](
The name of the policy recommendation job contains a universally unique identifier ([UUID](
https://en.wikipedia.org/wiki/Universally_unique_identifier)) that is
automatically generated when creating a new policy recommendation job. We use
`recommendation ID` to identify different policy recommendation jobs.
this UUID to identify different policy recommendation jobs.

A policy recommendation job may take a few minutes to more than an hour to
complete depending on the number of network flows. By default, this command
Expand All @@ -92,7 +92,7 @@ a previous policy recommendation job.
Given the job created above, we could check its status via:

```bash
$ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation status pr-e998433e-accb-4888-9fc8-06563f073e86
Status of this policy recommendation job is COMPLETED
```

Expand All @@ -110,7 +110,7 @@ written into the Clickhouse database. To retrieve results of the policy
recommendation job created above, run:

```bash
$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation retrieve pr-e998433e-accb-4888-9fc8-06563f073e86
apiVersion: crd.antrea.io/v1alpha1
kind: ClusterNetworkPolicy
metadata:
Expand Down Expand Up @@ -138,21 +138,21 @@ To apply recommended policies in the cluster, we can save the recommended
policies to a YAML file and apply it using `kubectl`:

```bash
theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 -f recommended_policies.yml
theia policy-recommendation retrieve pr-e998433e-accb-4888-9fc8-06563f073e86 -f recommended_policies.yml
kubectl apply -f recommended_policies.yml
```

### List all policy recommendation jobs

The `theia policy-recommendation list` command lists all undeleted policy
recommendation jobs. `CreationTime`, `CompletionTime`, `ID` and `Status` of each
recommendation jobs. `CreationTime`, `CompletionTime`, `Name` and `Status` of each
policy recommendation job will be displayed in table format. For example:

```bash
$ theia policy-recommendation list
CreationTime CompletionTime ID Status
2022-06-17 18:33:15 N/A 2cf13427-cbe5-454c-b9d3-e1124af7baa2 RUNNING
2022-06-17 18:06:56 2022-06-17 18:08:37 e998433e-accb-4888-9fc8-06563f073e86 COMPLETED
CreationTime CompletionTime Name Status
2022-06-17 18:33:15 N/A pr-2cf13427-cbe5-454c-b9d3-e1124af7baa2 RUNNING
2022-06-17 18:06:56 2022-06-17 18:08:37 pr-e998433e-accb-4888-9fc8-06563f073e86 COMPLETED
```

### Delete a policy recommendation job
Expand All @@ -162,6 +162,6 @@ recommendation job. Please proceed with caution since deletion cannot be
undone. To delete the policy recommendation job created above, run:

```bash
$ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86
Successfully deleted policy recommendation job with ID e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation delete pr-e998433e-accb-4888-9fc8-06563f073e86
Successfully deleted policy recommendation job with name: pr-e998433e-accb-4888-9fc8-06563f073e86
```
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ func (r *REST) copyNetworkPolicyRecommendation(intelli *intelligence.NetworkPoli
intelli.Status.SparkApplication = crd.Status.SparkApplication
intelli.Status.CompletedStages = crd.Status.CompletedStages
intelli.Status.TotalStages = crd.Status.TotalStages
intelli.Status.RecommendedNetworkPolicy = crd.Status.RecommendedNP.Spec.Yamls
if crd.Status.RecommendedNP != nil {
intelli.Status.RecommendedNetworkPolicy = crd.Status.RecommendedNP.Spec.Yamls
}
intelli.Status.ErrorMsg = crd.Status.ErrorMsg
intelli.Status.StartTime = crd.Status.StartTime
intelli.Status.EndTime = crd.Status.EndTime
Expand Down
12 changes: 7 additions & 5 deletions pkg/theia/commands/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import "time"
const (
FlowVisibilityNS = "flow-visibility"
K8sQuantitiesReg = "^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$"
SparkImage = "projects.registry.vmware.com/antrea/theia-policy-recommendation:latest"
SparkImagePullPolicy = "IfNotPresent"
SparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py"
SparkServiceAccount = "policy-recommendation-spark"
SparkVersion = "3.1.1"
StatusCheckPollInterval = 5 * time.Second
StatusCheckPollTimeout = 60 * time.Minute
DeletionPollInterval = 5 * time.Second
DeletionPollTimeout = 5 * time.Minute
CAConfigMapName = "theia-ca"
CAConfigMapKey = "ca.crt"
TheiaCliAccountName = "theia-cli-account-token"
ServiceAccountTokenKey = "token"
TheiaManagerServiceName = "theia-manager"
)
9 changes: 2 additions & 7 deletions pkg/theia/commands/policy_recommendation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,10 @@ Must specify a subcommand like run, status or retrieve.`,

func init() {
rootCmd.AddCommand(policyRecommendationCmd)
policyRecommendationCmd.PersistentFlags().String(
"clickhouse-endpoint",
"",
"The ClickHouse Service endpoint.",
)
policyRecommendationCmd.PersistentFlags().Bool(
"use-cluster-ip",
false,
`Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse Service
and Spark Monitoring Service. It can only be used when running in cluster.`,
`Enable this option will use ClusterIP instead of port forwarding when connecting to the Theia
Manager Service. It can only be used when running in cluster.`,
)
}
130 changes: 31 additions & 99 deletions pkg/theia/commands/policy_recommendation_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,132 +19,64 @@ import (
"fmt"

"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"

"antrea.io/theia/pkg/theia/commands/config"
sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2"
)

// policyRecommendationDeleteCmd represents the policy-recommendation delete command
var policyRecommendationDeleteCmd = &cobra.Command{
Use: "delete",
Short: "Delete a policy recommendation Spark job",
Long: `Delete a policy recommendation Spark job by ID.`,
Short: "Delete a policy recommendation job",
Long: `Delete a policy recommendation job by Name.`,
Aliases: []string{"del"},
Args: cobra.RangeArgs(0, 1),
Example: `
Delete the policy recommendation job with ID e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86
Delete the network policy recommendation job with Name pr-e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation delete pr-e998433e-accb-4888-9fc8-06563f073e86
`,
RunE: func(cmd *cobra.Command, args []string) error {
recoID, err := cmd.Flags().GetString("id")
if err != nil {
return err
}
if recoID == "" && len(args) == 1 {
recoID = args[0]
}
err = ParseRecommendationID(recoID)
if err != nil {
return err
}
kubeconfig, err := ResolveKubeConfig(cmd)
if err != nil {
return err
}
endpoint, err := cmd.Flags().GetString("clickhouse-endpoint")
if err != nil {
return err
}
if endpoint != "" {
err = ParseEndpoint(endpoint)
if err != nil {
return err
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
if err != nil {
return err
}

clientset, err := CreateK8sClient(kubeconfig)
if err != nil {
return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err)
}

idMap, err := getPolicyRecommendationIdMap(clientset, kubeconfig, endpoint, useClusterIP)
if err != nil {
return fmt.Errorf("err when getting policy recommendation ID map, %v", err)
}

if _, ok := idMap[recoID]; !ok {
return fmt.Errorf("could not find the policy recommendation job with given ID")
}

clientset.CoreV1().RESTClient().Delete().
AbsPath("/apis/sparkoperator.k8s.io/v1beta2").
Namespace(config.FlowVisibilityNS).
Resource("sparkapplications").
Name("pr-" + recoID).
Do(context.TODO())

err = deletePolicyRecommendationResult(clientset, kubeconfig, endpoint, useClusterIP, recoID)
if err != nil {
return err
}

fmt.Printf("Successfully deleted policy recommendation job with ID %s\n", recoID)
return nil
},
RunE: policyRecommendationDelete,
}

func getPolicyRecommendationIdMap(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (idMap map[string]bool, err error) {
idMap = make(map[string]bool)
sparkApplicationList := &sparkv1.SparkApplicationList{}
err = clientset.CoreV1().RESTClient().Get().
AbsPath("/apis/sparkoperator.k8s.io/v1beta2").
Namespace(config.FlowVisibilityNS).
Resource("sparkapplications").
Do(context.TODO()).Into(sparkApplicationList)
func policyRecommendationDelete(cmd *cobra.Command, args []string) error {
prName, err := cmd.Flags().GetString("name")
if err != nil {
return idMap, err
return err
}
for _, sparkApplication := range sparkApplicationList.Items {
id := sparkApplication.ObjectMeta.Name[3:]
idMap[id] = true
if prName == "" && len(args) == 1 {
prName = args[0]
}
completedPolicyRecommendationList, err := getCompletedPolicyRecommendationList(clientset, kubeconfig, endpoint, useClusterIP)
err = ParseRecommendationName(prName)
if err != nil {
return idMap, err
}
for _, completedPolicyRecommendation := range completedPolicyRecommendationList {
idMap[completedPolicyRecommendation.id] = true
}
return idMap, nil
}

func deletePolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, recoID string) (err error) {
connect, portForward, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP)
if portForward != nil {
defer portForward.Stop()
return err
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
if err != nil {
return err
}
query := "ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);"
_, err = connect.Exec(query, recoID)
theiaClient, pf, err := SetupTheiaClientAndConnection(cmd, useClusterIP)
if err != nil {
return fmt.Errorf("failed to delete recommendation result with id %s: %v", recoID, err)
return fmt.Errorf("couldn't setup Theia manager client, %v", err)
}
if pf != nil {
defer pf.Stop()
}
err = theiaClient.Delete().
AbsPath("/apis/intelligence.theia.antrea.io/v1alpha1/").
Resource("networkpolicyrecommendations").
Name(prName).
Do(context.TODO()).
Error()
if err != nil {
return fmt.Errorf("error when deleting policy recommendation job: %v", err)
}
fmt.Printf("Successfully deleted policy recommendation job with name: %s\n", prName)
return nil
}

func init() {
policyRecommendationCmd.AddCommand(policyRecommendationDeleteCmd)
policyRecommendationDeleteCmd.Flags().StringP(
"id",
"i",
"name",
"",
"",
"ID of the policy recommendation Spark job.",
"Name of the policy recommendation job.",
)
}
89 changes: 89 additions & 0 deletions pkg/theia/commands/policy_recommendation_delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package commands

import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"

"antrea.io/theia/pkg/theia/portforwarder"
)

func TestPolicyRecommendationDelete(t *testing.T) {
nprName := "pr-e292395c-3de1-11ed-b878-0242ac120002"
testCases := []struct {
name string
testServer *httptest.Server
expectedErrorMsg string
}{
{
name: "Valid case",
testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/networkpolicyrecommendations/%s", nprName):
if r.Method == "DELETE" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
} else {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
})),
expectedErrorMsg: "",
},
{
name: "SparkApplication not found",
testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/networkpolicyrecommendations/%s", nprName):
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
})),
expectedErrorMsg: "error when deleting policy recommendation job",
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
defer tt.testServer.Close()
oldFunc := SetupTheiaClientAndConnection
SetupTheiaClientAndConnection = func(cmd *cobra.Command, useClusterIP bool) (restclient.Interface, *portforwarder.PortForwarder, error) {
clientConfig := &restclient.Config{Host: tt.testServer.URL, TLSClientConfig: restclient.TLSClientConfig{Insecure: true}}
clientset, _ := kubernetes.NewForConfig(clientConfig)
return clientset.CoreV1().RESTClient(), nil, nil
}
defer func() {
SetupTheiaClientAndConnection = oldFunc
}()
cmd := new(cobra.Command)
cmd.Flags().String("name", nprName, "")
cmd.Flags().Bool("use-cluster-ip", true, "")
err := policyRecommendationDelete(cmd, []string{})
if tt.expectedErrorMsg == "" {
assert.NoError(t, err)
} else {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.expectedErrorMsg)
}
})
}
}
Loading

0 comments on commit e7bd73a

Please sign in to comment.