Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt CLI to use Theia Manager #115

Merged
merged 1 commit into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build/charts/theia/templates/theia-cli/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ rules:
verbs:
- get
- list
- create
- delete
{{- end }}
26 changes: 13 additions & 13 deletions docs/networkpolicy-recommendation.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ To see all options and usage examples of these commands, you may run
The `theia policy-recommendation run` command triggers a new policy
recommendation job.
If a new policy recommendation job is created successfully, the
`recommendation ID` of this job will be returned:
`name` of this job will be returned:

```bash
$ theia policy-recommendation run
Successfully created policy recommendation job with ID e998433e-accb-4888-9fc8-06563f073e86
Successfully created policy recommendation job with name pr-e998433e-accb-4888-9fc8-06563f073e86
```

`recommendation ID` is a universally unique identifier ([UUID](
The name of the policy recommendation job contains a universally unique identifier ([UUID](
https://en.wikipedia.org/wiki/Universally_unique_identifier)) that is
automatically generated when creating a new policy recommendation job. We use
`recommendation ID` to identify different policy recommendation jobs.
this UUID to identify different policy recommendation jobs.

A policy recommendation job may take a few minutes to more than an hour to
complete depending on the number of network flows. By default, this command
Expand All @@ -92,7 +92,7 @@ a previous policy recommendation job.
Given the job created above, we could check its status via:

```bash
$ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation status pr-e998433e-accb-4888-9fc8-06563f073e86
Status of this policy recommendation job is COMPLETED
```

Expand All @@ -110,7 +110,7 @@ written into the Clickhouse database. To retrieve results of the policy
recommendation job created above, run:

```bash
$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation retrieve pr-e998433e-accb-4888-9fc8-06563f073e86
apiVersion: crd.antrea.io/v1alpha1
kind: ClusterNetworkPolicy
metadata:
Expand Down Expand Up @@ -138,21 +138,21 @@ To apply recommended policies in the cluster, we can save the recommended
policies to a YAML file and apply it using `kubectl`:

```bash
theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 -f recommended_policies.yml
theia policy-recommendation retrieve pr-e998433e-accb-4888-9fc8-06563f073e86 -f recommended_policies.yml
kubectl apply -f recommended_policies.yml
```

### List all policy recommendation jobs

The `theia policy-recommendation list` command lists all undeleted policy
recommendation jobs. `CreationTime`, `CompletionTime`, `ID` and `Status` of each
recommendation jobs. `CreationTime`, `CompletionTime`, `Name` and `Status` of each
policy recommendation job will be displayed in table format. For example:

```bash
$ theia policy-recommendation list
CreationTime CompletionTime ID Status
2022-06-17 18:33:15 N/A 2cf13427-cbe5-454c-b9d3-e1124af7baa2 RUNNING
2022-06-17 18:06:56 2022-06-17 18:08:37 e998433e-accb-4888-9fc8-06563f073e86 COMPLETED
CreationTime CompletionTime Name Status
2022-06-17 18:33:15 N/A pr-2cf13427-cbe5-454c-b9d3-e1124af7baa2 RUNNING
2022-06-17 18:06:56 2022-06-17 18:08:37 pr-e998433e-accb-4888-9fc8-06563f073e86 COMPLETED
```

### Delete a policy recommendation job
Expand All @@ -162,6 +162,6 @@ recommendation job. Please proceed with caution since deletion cannot be
undone. To delete the policy recommendation job created above, run:

```bash
$ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86
Successfully deleted policy recommendation job with ID e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation delete pr-e998433e-accb-4888-9fc8-06563f073e86
Successfully deleted policy recommendation job with name: pr-e998433e-accb-4888-9fc8-06563f073e86
```
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ func (r *REST) copyNetworkPolicyRecommendation(intelli *intelligence.NetworkPoli
intelli.Status.SparkApplication = crd.Status.SparkApplication
intelli.Status.CompletedStages = crd.Status.CompletedStages
intelli.Status.TotalStages = crd.Status.TotalStages
intelli.Status.RecommendedNetworkPolicy = crd.Status.RecommendedNP.Spec.Yamls
if crd.Status.RecommendedNP != nil {
intelli.Status.RecommendedNetworkPolicy = crd.Status.RecommendedNP.Spec.Yamls
}
intelli.Status.ErrorMsg = crd.Status.ErrorMsg
intelli.Status.StartTime = crd.Status.StartTime
intelli.Status.EndTime = crd.Status.EndTime
Expand Down
12 changes: 7 additions & 5 deletions pkg/theia/commands/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import "time"
const (
FlowVisibilityNS = "flow-visibility"
K8sQuantitiesReg = "^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$"
SparkImage = "projects.registry.vmware.com/antrea/theia-policy-recommendation:latest"
SparkImagePullPolicy = "IfNotPresent"
SparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py"
SparkServiceAccount = "policy-recommendation-spark"
SparkVersion = "3.1.1"
StatusCheckPollInterval = 5 * time.Second
StatusCheckPollTimeout = 60 * time.Minute
DeletionPollInterval = 5 * time.Second
DeletionPollTimeout = 5 * time.Minute
CAConfigMapName = "theia-ca"
CAConfigMapKey = "ca.crt"
TheiaCliAccountName = "theia-cli-account-token"
ServiceAccountTokenKey = "token"
TheiaManagerServiceName = "theia-manager"
)
9 changes: 2 additions & 7 deletions pkg/theia/commands/policy_recommendation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,10 @@ Must specify a subcommand like run, status or retrieve.`,

func init() {
rootCmd.AddCommand(policyRecommendationCmd)
policyRecommendationCmd.PersistentFlags().String(
"clickhouse-endpoint",
"",
"The ClickHouse Service endpoint.",
)
policyRecommendationCmd.PersistentFlags().Bool(
"use-cluster-ip",
false,
`Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse Service
and Spark Monitoring Service. It can only be used when running in cluster.`,
`Enable this option will use ClusterIP instead of port forwarding when connecting to the Theia
Manager Service. It can only be used when running in cluster.`,
)
}
130 changes: 31 additions & 99 deletions pkg/theia/commands/policy_recommendation_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,132 +19,64 @@ import (
"fmt"

"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"

"antrea.io/theia/pkg/theia/commands/config"
sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2"
)

// policyRecommendationDeleteCmd represents the policy-recommendation delete command
var policyRecommendationDeleteCmd = &cobra.Command{
Use: "delete",
Short: "Delete a policy recommendation Spark job",
Long: `Delete a policy recommendation Spark job by ID.`,
Short: "Delete a policy recommendation job",
Long: `Delete a policy recommendation job by Name.`,
Aliases: []string{"del"},
Args: cobra.RangeArgs(0, 1),
Example: `
Delete the policy recommendation job with ID e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86
Delete the network policy recommendation job with Name pr-e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation delete pr-e998433e-accb-4888-9fc8-06563f073e86
`,
RunE: func(cmd *cobra.Command, args []string) error {
recoID, err := cmd.Flags().GetString("id")
if err != nil {
return err
}
if recoID == "" && len(args) == 1 {
recoID = args[0]
}
err = ParseRecommendationID(recoID)
if err != nil {
return err
}
kubeconfig, err := ResolveKubeConfig(cmd)
if err != nil {
return err
}
endpoint, err := cmd.Flags().GetString("clickhouse-endpoint")
if err != nil {
return err
}
if endpoint != "" {
err = ParseEndpoint(endpoint)
if err != nil {
return err
}
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
if err != nil {
return err
}

clientset, err := CreateK8sClient(kubeconfig)
if err != nil {
return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err)
}

idMap, err := getPolicyRecommendationIdMap(clientset, kubeconfig, endpoint, useClusterIP)
if err != nil {
return fmt.Errorf("err when getting policy recommendation ID map, %v", err)
}

if _, ok := idMap[recoID]; !ok {
return fmt.Errorf("could not find the policy recommendation job with given ID")
}

clientset.CoreV1().RESTClient().Delete().
AbsPath("/apis/sparkoperator.k8s.io/v1beta2").
Namespace(config.FlowVisibilityNS).
Resource("sparkapplications").
Name("pr-" + recoID).
Do(context.TODO())

err = deletePolicyRecommendationResult(clientset, kubeconfig, endpoint, useClusterIP, recoID)
if err != nil {
return err
}

fmt.Printf("Successfully deleted policy recommendation job with ID %s\n", recoID)
return nil
},
RunE: policyRecommendationDelete,
}

func getPolicyRecommendationIdMap(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (idMap map[string]bool, err error) {
idMap = make(map[string]bool)
sparkApplicationList := &sparkv1.SparkApplicationList{}
err = clientset.CoreV1().RESTClient().Get().
AbsPath("/apis/sparkoperator.k8s.io/v1beta2").
Namespace(config.FlowVisibilityNS).
Resource("sparkapplications").
Do(context.TODO()).Into(sparkApplicationList)
func policyRecommendationDelete(cmd *cobra.Command, args []string) error {
prName, err := cmd.Flags().GetString("name")
if err != nil {
return idMap, err
return err
}
for _, sparkApplication := range sparkApplicationList.Items {
id := sparkApplication.ObjectMeta.Name[3:]
idMap[id] = true
if prName == "" && len(args) == 1 {
prName = args[0]
}
completedPolicyRecommendationList, err := getCompletedPolicyRecommendationList(clientset, kubeconfig, endpoint, useClusterIP)
err = ParseRecommendationName(prName)
if err != nil {
return idMap, err
}
for _, completedPolicyRecommendation := range completedPolicyRecommendationList {
idMap[completedPolicyRecommendation.id] = true
}
return idMap, nil
}

func deletePolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, recoID string) (err error) {
connect, portForward, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP)
if portForward != nil {
defer portForward.Stop()
return err
}
useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip")
if err != nil {
return err
}
query := "ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);"
_, err = connect.Exec(query, recoID)
theiaClient, pf, err := SetupTheiaClientAndConnection(cmd, useClusterIP)
if err != nil {
return fmt.Errorf("failed to delete recommendation result with id %s: %v", recoID, err)
return fmt.Errorf("couldn't setup Theia manager client, %v", err)
}
if pf != nil {
defer pf.Stop()
}
err = theiaClient.Delete().
AbsPath("/apis/intelligence.theia.antrea.io/v1alpha1/").
Resource("networkpolicyrecommendations").
Name(prName).
Do(context.TODO()).
Error()
if err != nil {
return fmt.Errorf("error when deleting policy recommendation job: %v", err)
}
fmt.Printf("Successfully deleted policy recommendation job with name: %s\n", prName)
return nil
}

func init() {
policyRecommendationCmd.AddCommand(policyRecommendationDeleteCmd)
policyRecommendationDeleteCmd.Flags().StringP(
"id",
"i",
"name",
"",
"",
"ID of the policy recommendation Spark job.",
"Name of the policy recommendation job.",
)
}
89 changes: 89 additions & 0 deletions pkg/theia/commands/policy_recommendation_delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package commands

import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"

"antrea.io/theia/pkg/theia/portforwarder"
)

func TestPolicyRecommendationDelete(t *testing.T) {
nprName := "pr-e292395c-3de1-11ed-b878-0242ac120002"
testCases := []struct {
name string
testServer *httptest.Server
expectedErrorMsg string
}{
{
name: "Valid case",
testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/networkpolicyrecommendations/%s", nprName):
if r.Method == "DELETE" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
} else {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
})),
expectedErrorMsg: "",
},
{
name: "SparkApplication not found",
testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/networkpolicyrecommendations/%s", nprName):
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
})),
expectedErrorMsg: "error when deleting policy recommendation job",
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
defer tt.testServer.Close()
oldFunc := SetupTheiaClientAndConnection
SetupTheiaClientAndConnection = func(cmd *cobra.Command, useClusterIP bool) (restclient.Interface, *portforwarder.PortForwarder, error) {
clientConfig := &restclient.Config{Host: tt.testServer.URL, TLSClientConfig: restclient.TLSClientConfig{Insecure: true}}
clientset, _ := kubernetes.NewForConfig(clientConfig)
return clientset.CoreV1().RESTClient(), nil, nil
}
defer func() {
SetupTheiaClientAndConnection = oldFunc
}()
cmd := new(cobra.Command)
cmd.Flags().String("name", nprName, "")
cmd.Flags().Bool("use-cluster-ip", true, "")
err := policyRecommendationDelete(cmd, []string{})
if tt.expectedErrorMsg == "" {
assert.NoError(t, err)
} else {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.expectedErrorMsg)
}
})
}
}
Loading