Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yanjun Zhou <zhouya@vmware.com>
  • Loading branch information
yanjunz97 committed Oct 7, 2022
1 parent a746f9d commit d142742
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 79 deletions.
87 changes: 49 additions & 38 deletions pkg/controller/networkpolicyrecommendation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package networkpolicyrecommendation

import (
"context"
"database/sql"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -63,15 +65,16 @@ const (
sparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py"
sparkServiceAccount = "policy-recommendation-spark"
sparkVersion = "3.1.1"
sparkPortName = "spark-driver-ui-port"
sparkPort = 4040
)

var (
// Spark Application CRUD functions, for unit tests
CreateSparkApplication = createSparkApplication
DeleteSparkApplication = deleteSparkApplication
ListSparkApplication = listSparkApplication
GetSparkApplication = getSparkApplication
CreateSparkApplication = createSparkApplication
DeleteSparkApplication = deleteSparkApplication
ListSparkApplication = listSparkApplication
GetSparkApplication = getSparkApplication
GetSparkMonitoringSvcDNS = getSparkMonitoringSvcDNS
// For NPR in scheduled or running state, check its status periodically
npRecommendationResyncPeriod = 10 * time.Second
)
Expand All @@ -89,6 +92,7 @@ type NPRecommendationController struct {
deletionQueue workqueue.RateLimitingInterface
periodicResyncSetMutex sync.Mutex
periodicResyncSet map[apimachinerytypes.NamespacedName]struct{}
clickhouseConnect *sql.DB
}

type NamespacedId struct {
Expand Down Expand Up @@ -124,12 +128,12 @@ func NewNPRecommendationController(
)

// Add SparkApplication ID index for RecommendedNetworkPolicy
recommendedNPInformer.Informer().AddIndexers(cache.Indexers{idIndex: idIndexFunc})
c.recommendedNPInformer.AddIndexers(cache.Indexers{idIndex: rnpIdIndexFunc})

return c
}

func idIndexFunc(obj interface{}) ([]string, error) {
func rnpIdIndexFunc(obj interface{}) ([]string, error) {
recommendedNP, ok := obj.(*crdv1alpha1.RecommendedNetworkPolicy)
if !ok {
return nil, fmt.Errorf("obj is not RecommendedNetworkPolicy: %v", obj)
Expand All @@ -140,7 +144,8 @@ func idIndexFunc(obj interface{}) ([]string, error) {
func (c *NPRecommendationController) addNPRecommendation(obj interface{}) {
npReco, ok := obj.(*crdv1alpha1.NetworkPolicyRecommendation)
if !ok {
klog.V(2).InfoS("fail to convert to NetworkPolicyRecommendation", "object", obj)
klog.V(2).ErrorS(nil, "fail to convert to NetworkPolicyRecommendation", "object", obj)
return
}
klog.V(2).InfoS("Processing NP Recommendation ADD event", "name", npReco.Name, "labels", npReco.Labels)
namespacedName := apimachinerytypes.NamespacedName{
Expand All @@ -153,7 +158,8 @@ func (c *NPRecommendationController) addNPRecommendation(obj interface{}) {
func (c *NPRecommendationController) updateNPRecommendation(_, new interface{}) {
npReco, ok := new.(*crdv1alpha1.NetworkPolicyRecommendation)
if !ok {
klog.V(2).InfoS("fail to convert to NetworkPolicyRecommendation", "object", new)
klog.V(2).ErrorS(nil, "fail to convert to NetworkPolicyRecommendation", "object", new)
return
}
klog.V(2).InfoS("Processing NP Recommendation UPDATE event", "name", npReco.Name, "labels", npReco.Labels)
namespacedName := apimachinerytypes.NamespacedName{
Expand Down Expand Up @@ -326,15 +332,25 @@ func (c *NPRecommendationController) cleanupNPRecommendation(namespace string, s
if err != nil {
return fmt.Errorf("failed to get the RecommendedNetworkPolicy when deleting, error: %v", err)
}
var aggregatedErrorMsg string
for _, obj := range rnps {
recommendedNetworkPolicy := obj.(*crdv1alpha1.RecommendedNetworkPolicy)
err := c.crdClient.CrdV1alpha1().RecommendedNetworkPolicies(namespace).Delete(context.TODO(), recommendedNetworkPolicy.Name, metav1.DeleteOptions{})
err = c.crdClient.CrdV1alpha1().RecommendedNetworkPolicies(namespace).Delete(context.TODO(), recommendedNetworkPolicy.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete the RecommendedNetworkPolicy %s, error: %v", recommendedNetworkPolicy.Name, err)
aggregatedErrorMsg += fmt.Sprintf("failed to delete the RecommendedNetworkPolicy %s, error: %v\n", recommendedNetworkPolicy.Name, err)
}
}
if aggregatedErrorMsg != "" {
return fmt.Errorf(aggregatedErrorMsg)
}
// Delete the result from the ClickHouse
return deletePolicyRecommendationResult(c.kubeClient, namespace, sparkApplicationId)
if c.clickhouseConnect == nil {
c.clickhouseConnect, err = setupClickHouseConnection(c.kubeClient, namespace)
if err != nil {
return err
}
}
return deletePolicyRecommendationResult(c.clickhouseConnect, sparkApplicationId)
}

func (c *NPRecommendationController) updateResult(npReco *crdv1alpha1.NetworkPolicyRecommendation) error {
Expand Down Expand Up @@ -374,7 +390,13 @@ func (c *NPRecommendationController) updateResult(npReco *crdv1alpha1.NetworkPol
klog.V(4).InfoS("Failed to find RecommendedNetworkPolicy", "id", npReco.Status.SparkApplication)
}
// Get result from database
recommendedNetworkPolicy, err = getPolicyRecommendationResult(c.kubeClient, npReco.Status.SparkApplication, npReco.Namespace)
if c.clickhouseConnect == nil {
c.clickhouseConnect, err = setupClickHouseConnection(c.kubeClient, npReco.Namespace)
if err != nil {
return err
}
}
recommendedNetworkPolicy, err = getPolicyRecommendationResult(c.clickhouseConnect, npReco.Status.SparkApplication)
if err != nil {
return err
}
Expand Down Expand Up @@ -403,12 +425,7 @@ func (c *NPRecommendationController) updateProgress(npReco *crdv1alpha1.NetworkP
if state != crdv1alpha1.NPRecommendationStateRunning {
return nil
}
service := fmt.Sprintf("pr-%s-ui-svc", npReco.Status.SparkApplication)
serviceIP, servicePort, err := getServiceAddr(c.kubeClient, service, npReco.Namespace, sparkPortName)
if err != nil {
return fmt.Errorf("error when getting the progress of the job, cannot get Spark Monitor Service address: %v", err)
}
endpoint := fmt.Sprintf("http://%s:%d", serviceIP, servicePort)
endpoint := GetSparkMonitoringSvcDNS(npReco.Status.SparkApplication, npReco.Namespace)
completedStages, totalStages, err := getPolicyRecommendationProgress(endpoint)
if err != nil {
// The Spark Monitoring Service may not start or closed at this point due to the async
Expand Down Expand Up @@ -478,8 +495,8 @@ func (c *NPRecommendationController) startJob(npReco *crdv1alpha1.NetworkPolicyR
return err
}
err := c.startSparkApplication(npReco)
// Mark the NetworkPolicyRecommendation as failed and not retry if it failed due to invalid request
if err != nil && strings.Contains(err.Error(), "invalid request") {
// Mark the NetworkPolicyRecommendation as failed and not retry if it failed due to illeagel arguements in request
if err != nil && reflect.TypeOf(err) == reflect.TypeOf(IlleagelArguementError{}) {
return c.updateNPRecommendationStatus(
npReco,
crdv1alpha1.NetworkPolicyRecommendationStatus{
Expand All @@ -501,12 +518,12 @@ func (c *NPRecommendationController) startJob(npReco *crdv1alpha1.NetworkPolicyR
func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.NetworkPolicyRecommendation) error {
var recoJobArgs []string
if npReco.Spec.JobType != "initial" && npReco.Spec.JobType != "subsequent" {
return fmt.Errorf("invalid request: recommendation type should be 'initial' or 'subsequent'")
return IlleagelArguementError{fmt.Errorf("invalid request: recommendation type should be 'initial' or 'subsequent'")}
}
recoJobArgs = append(recoJobArgs, "--type", npReco.Spec.JobType)

if npReco.Spec.Limit < 0 {
return fmt.Errorf("invalid request: limit should be an integer >= 0")
return IlleagelArguementError{fmt.Errorf("invalid request: limit should be an integer >= 0")}
}
recoJobArgs = append(recoJobArgs, "--limit", strconv.Itoa(npReco.Spec.Limit))

Expand All @@ -518,7 +535,7 @@ func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.N
} else if npReco.Spec.PolicyType == "k8s-np" {
policyTypeArg = 3
} else {
return fmt.Errorf("invalid request: type of generated NetworkPolicy should be anp-deny-applied or anp-deny-all or k8s-np")
return IlleagelArguementError{fmt.Errorf("invalid request: type of generated NetworkPolicy should be anp-deny-applied or anp-deny-all or k8s-np")}
}
recoJobArgs = append(recoJobArgs, "--option", strconv.Itoa(policyTypeArg))

Expand All @@ -528,20 +545,14 @@ func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.N
if !npReco.Spec.EndInterval.IsZero() {
endAfterStart := npReco.Spec.EndInterval.After(npReco.Spec.StartInterval.Time)
if !endAfterStart {
return fmt.Errorf("invalid request: EndInterval should be after StartInterval")
return IlleagelArguementError{fmt.Errorf("invalid request: EndInterval should be after StartInterval")}
}
recoJobArgs = append(recoJobArgs, "--end_time", npReco.Spec.EndInterval.Format(inputTimeFormat))
}

if len(npReco.Spec.NSAllowList) > 0 {
nsAllowListStr := "["
for i, nsAllow := range npReco.Spec.NSAllowList {
nsAllowListStr += ("\"" + nsAllow + "\"")
if i != len(npReco.Spec.NSAllowList)-1 {
nsAllowListStr += ","
}
}
nsAllowListStr += "]"
nsAllowListStr := strings.Join(npReco.Spec.NSAllowList, "\",\"")
nsAllowListStr = "[\"" + nsAllowListStr + "\"]"
recoJobArgs = append(recoJobArgs, "--ns_allow_list", nsAllowListStr)
}

Expand All @@ -557,31 +568,31 @@ func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.N
}{}

if npReco.Spec.ExecutorInstances < 0 {
return fmt.Errorf("invalid request: ExecutorInstances should be an integer >= 0")
return IlleagelArguementError{fmt.Errorf("invalid request: ExecutorInstances should be an integer >= 0")}
}
sparkResourceArgs.executorInstances = int32(npReco.Spec.ExecutorInstances)

matchResult, err := regexp.MatchString(k8sQuantitiesReg, npReco.Spec.DriverCoreRequest)
if err != nil || !matchResult {
return fmt.Errorf("invalid request: DriverCoreRequest should conform to the Kubernetes resource quantity convention")
return IlleagelArguementError{fmt.Errorf("invalid request: DriverCoreRequest should conform to the Kubernetes resource quantity convention")}
}
sparkResourceArgs.driverCoreRequest = npReco.Spec.DriverCoreRequest

matchResult, err = regexp.MatchString(k8sQuantitiesReg, npReco.Spec.DriverMemory)
if err != nil || !matchResult {
return fmt.Errorf("invalid request: DriverMemory should conform to the Kubernetes resource quantity convention")
return IlleagelArguementError{fmt.Errorf("invalid request: DriverMemory should conform to the Kubernetes resource quantity convention")}
}
sparkResourceArgs.driverMemory = npReco.Spec.DriverMemory

matchResult, err = regexp.MatchString(k8sQuantitiesReg, npReco.Spec.ExecutorCoreRequest)
if err != nil || !matchResult {
return fmt.Errorf("invalid request: ExecutorCoreRequest should conform to the Kubernetes resource quantity convention")
return IlleagelArguementError{fmt.Errorf("invalid request: ExecutorCoreRequest should conform to the Kubernetes resource quantity convention")}
}
sparkResourceArgs.executorCoreRequest = npReco.Spec.ExecutorCoreRequest

matchResult, err = regexp.MatchString(k8sQuantitiesReg, npReco.Spec.ExecutorMemory)
if err != nil || !matchResult {
return fmt.Errorf("invalid request: ExecutorMemory should conform to the Kubernetes resource quantity convention")
return IlleagelArguementError{fmt.Errorf("invalid request: ExecutorMemory should conform to the Kubernetes resource quantity convention")}
}
sparkResourceArgs.executorMemory = npReco.Spec.ExecutorMemory

Expand Down
31 changes: 9 additions & 22 deletions pkg/controller/networkpolicyrecommendation/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -170,26 +169,9 @@ func createFakeSparkApplicationService(kubeClient kubernetes.Interface, id strin
}
}))

// the url format http://ip:port
urlArray := strings.Split(strings.Split(testServer.URL, "//")[1], ":")
ip := urlArray[0]
port, err := strconv.ParseInt(urlArray[1], 10, 32)
if err != nil {
return err
GetSparkMonitoringSvcDNS = func(id, namespace string) string {
return testServer.URL
}

serviceName := fmt.Sprintf("pr-%s-ui-svc", id)
sparkApplicationService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: testNamespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{Name: sparkPortName, Port: int32(port)}},
ClusterIP: ip,
},
}
kubeClient.CoreV1().Services(testNamespace).Create(context.TODO(), sparkApplicationService, metav1.CreateOptions{})
return nil
}

Expand Down Expand Up @@ -671,8 +653,13 @@ func TestGetPolicyRecommendationResult(t *testing.T) {
if db != nil {
defer db.Close()
}
_, err := getPolicyRecommendationResult(kubeClient, sparkAppID, testNamespace)
assert.Contains(t, err.Error(), tc.expectedErrorMsg)
connect, err := setupClickHouseConnection(kubeClient, testNamespace)
if err != nil {
assert.Contains(t, err.Error(), tc.expectedErrorMsg)
} else {
_, err := getPolicyRecommendationResult(connect, sparkAppID)
assert.Contains(t, err.Error(), tc.expectedErrorMsg)
}
})
}
}
30 changes: 11 additions & 19 deletions pkg/controller/networkpolicyrecommendation/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func getPolicyRecommendationProgress(baseUrl string) (completedStages int, total
return completedStages, totalStages, nil
}

func getResultFromClickHouse(connect *sql.DB, id string) (*crdv1alpha1.RecommendedNetworkPolicy, error) {
func getPolicyRecommendationResult(connect *sql.DB, id string) (*crdv1alpha1.RecommendedNetworkPolicy, error) {
var resultType, resultTimeCreatedStr, resultYamls string
query := "SELECT type, timeCreated, yamls FROM recommendations WHERE id = (?);"
err := connect.QueryRow(query, id).Scan(&resultType, &resultTimeCreatedStr, &resultYamls)
Expand Down Expand Up @@ -237,18 +237,6 @@ func setupClickHouseConnection(client kubernetes.Interface, namespace string) (c
return connect, nil
}

func getPolicyRecommendationResult(client kubernetes.Interface, id string, namespace string) (recommendedNetworkPolicy *crdv1alpha1.RecommendedNetworkPolicy, err error) {
connect, err := setupClickHouseConnection(client, namespace)
if err != nil {
return nil, err
}
recommendedNetworkPolicy, err = getResultFromClickHouse(connect, id)
if err != nil {
return nil, fmt.Errorf("error when getting the result from ClickHouse, %v", err)
}
return recommendedNetworkPolicy, nil
}

func deleteSparkApplicationIfExists(client kubernetes.Interface, namespace string, id string) error {
sparkApplicationList, err := ListSparkApplication(client, namespace)
if err != nil {
Expand All @@ -262,16 +250,12 @@ func deleteSparkApplicationIfExists(client kubernetes.Interface, namespace strin
}
}
if existed {
DeleteSparkApplication(client, namespace, "pr-"+id)
DeleteSparkApplication(client, "pr-"+id, namespace)
}
return nil
}

func deletePolicyRecommendationResult(client kubernetes.Interface, namespace string, id string) (err error) {
connect, err := setupClickHouseConnection(client, namespace)
if err != nil {
return err
}
func deletePolicyRecommendationResult(connect *sql.DB, id string) (err error) {
query := "ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);"
_, err = connect.Exec(query, id)
if err != nil {
Expand Down Expand Up @@ -324,3 +308,11 @@ func createSparkApplication(client kubernetes.Interface, namespace string, recom
Do(context.TODO()).
Into(response)
}

type IlleagelArguementError struct {
error
}

func getSparkMonitoringSvcDNS(id string, namespace string) string {
return fmt.Sprintf("http://pr-%s-ui-svc.%s.svc:%d", id, namespace, sparkPort)
}

0 comments on commit d142742

Please sign in to comment.