Skip to content

Commit

Permalink
Address comments and improve unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yanjun Zhou <zhouya@vmware.com>
  • Loading branch information
yanjunz97 committed Oct 10, 2022
1 parent 43415aa commit 8ddd85e
Show file tree
Hide file tree
Showing 3 changed files with 490 additions and 186 deletions.
103 changes: 62 additions & 41 deletions pkg/controller/networkpolicyrecommendation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package networkpolicyrecommendation

import (
"context"
"database/sql"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -54,7 +56,7 @@ const (
inputTimeFormat = "2006-01-02 15:04:05"
// Time format for parsing time from ClickHouse
clickHouseTimeFormat = "2006-01-02T15:04:05Z"
// SparkApplication id idex name for RecommendedNetworkPolicy
// SparkApplication id index name for RecommendedNetworkPolicy
idIndex = "id"
k8sQuantitiesReg = "^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$"
// Spark related parameters
Expand All @@ -63,15 +65,16 @@ const (
sparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py"
sparkServiceAccount = "policy-recommendation-spark"
sparkVersion = "3.1.1"
sparkPortName = "spark-driver-ui-port"
sparkPort = 4040
)

var (
// Spark Application CRUD functions, for unit tests
CreateSparkApplication = createSparkApplication
DeleteSparkApplication = deleteSparkApplication
ListSparkApplication = listSparkApplication
GetSparkApplication = getSparkApplication
CreateSparkApplication = createSparkApplication
DeleteSparkApplication = deleteSparkApplication
ListSparkApplication = listSparkApplication
GetSparkApplication = getSparkApplication
GetSparkMonitoringSvcDNS = getSparkMonitoringSvcDNS
// For NPR in scheduled or running state, check its status periodically
npRecommendationResyncPeriod = 10 * time.Second
)
Expand All @@ -89,6 +92,7 @@ type NPRecommendationController struct {
deletionQueue workqueue.RateLimitingInterface
periodicResyncSetMutex sync.Mutex
periodicResyncSet map[apimachinerytypes.NamespacedName]struct{}
clickhouseConnect *sql.DB
}

type NamespacedId struct {
Expand Down Expand Up @@ -124,12 +128,12 @@ func NewNPRecommendationController(
)

// Add SparkApplication ID index for RecommendedNetworkPolicy
recommendedNPInformer.Informer().AddIndexers(cache.Indexers{idIndex: idIndexFunc})
c.recommendedNPInformer.AddIndexers(cache.Indexers{idIndex: rnpIdIndexFunc})

return c
}

func idIndexFunc(obj interface{}) ([]string, error) {
func rnpIdIndexFunc(obj interface{}) ([]string, error) {
recommendedNP, ok := obj.(*crdv1alpha1.RecommendedNetworkPolicy)
if !ok {
return nil, fmt.Errorf("obj is not RecommendedNetworkPolicy: %v", obj)
Expand All @@ -138,7 +142,11 @@ func idIndexFunc(obj interface{}) ([]string, error) {
}

func (c *NPRecommendationController) addNPRecommendation(obj interface{}) {
npReco, _ := obj.(*crdv1alpha1.NetworkPolicyRecommendation)
npReco, ok := obj.(*crdv1alpha1.NetworkPolicyRecommendation)
if !ok {
klog.ErrorS(nil, "fail to convert to NetworkPolicyRecommendation", "object", obj)
return
}
klog.V(2).InfoS("Processing NP Recommendation ADD event", "name", npReco.Name, "labels", npReco.Labels)
namespacedName := apimachinerytypes.NamespacedName{
Namespace: npReco.Namespace,
Expand All @@ -148,7 +156,11 @@ func (c *NPRecommendationController) addNPRecommendation(obj interface{}) {
}

func (c *NPRecommendationController) updateNPRecommendation(_, new interface{}) {
npReco, _ := new.(*crdv1alpha1.NetworkPolicyRecommendation)
npReco, ok := new.(*crdv1alpha1.NetworkPolicyRecommendation)
if !ok {
klog.ErrorS(nil, "fail to convert to NetworkPolicyRecommendation", "object", new)
return
}
klog.V(2).InfoS("Processing NP Recommendation UPDATE event", "name", npReco.Name, "labels", npReco.Labels)
namespacedName := apimachinerytypes.NamespacedName{
Namespace: npReco.Namespace,
Expand Down Expand Up @@ -320,12 +332,27 @@ func (c *NPRecommendationController) cleanupNPRecommendation(namespace string, s
if err != nil {
return fmt.Errorf("failed to get the RecommendedNetworkPolicy when deleting, error: %v", err)
}
var undeletedRnpNames []string
var errorList []error
for _, obj := range rnps {
recommendedNetworkPolicy := obj.(*crdv1alpha1.RecommendedNetworkPolicy)
c.crdClient.CrdV1alpha1().RecommendedNetworkPolicies(namespace).Delete(context.TODO(), recommendedNetworkPolicy.Name, metav1.DeleteOptions{})
err = c.crdClient.CrdV1alpha1().RecommendedNetworkPolicies(namespace).Delete(context.TODO(), recommendedNetworkPolicy.Name, metav1.DeleteOptions{})
if err != nil {
undeletedRnpNames = append(undeletedRnpNames, recommendedNetworkPolicy.Name)
errorList = append(errorList, err)
}
}
if len(errorList) > 0 {
return fmt.Errorf("failed to deleted RecommendedNetworkPolicies: %v, error: %v", undeletedRnpNames, errorList)
}
// Delete the result from the ClickHouse
return deletePolicyRecommendationResult(c.kubeClient, namespace, sparkApplicationId)
if c.clickhouseConnect == nil {
c.clickhouseConnect, err = setupClickHouseConnection(c.kubeClient, namespace)
if err != nil {
return err
}
}
return deletePolicyRecommendationResult(c.clickhouseConnect, sparkApplicationId)
}

func (c *NPRecommendationController) updateResult(npReco *crdv1alpha1.NetworkPolicyRecommendation) error {
Expand Down Expand Up @@ -365,7 +392,13 @@ func (c *NPRecommendationController) updateResult(npReco *crdv1alpha1.NetworkPol
klog.V(4).InfoS("Failed to find RecommendedNetworkPolicy", "id", npReco.Status.SparkApplication)
}
// Get result from database
recommendedNetworkPolicy, err = getPolicyRecommendationResult(c.kubeClient, npReco.Status.SparkApplication, npReco.Namespace)
if c.clickhouseConnect == nil {
c.clickhouseConnect, err = setupClickHouseConnection(c.kubeClient, npReco.Namespace)
if err != nil {
return err
}
}
recommendedNetworkPolicy, err = getPolicyRecommendationResult(c.clickhouseConnect, npReco.Status.SparkApplication)
if err != nil {
return err
}
Expand All @@ -374,15 +407,14 @@ func (c *NPRecommendationController) updateResult(npReco *crdv1alpha1.NetworkPol
if err != nil {
return err
}
klog.V(2).InfoS("Created RecommendedNetworkPolicy", "RecommendedNetworkPolicy", recommendedNetworkPolicy.Name, "NetworkRecommendationPolicy", npReco.Name)
}
klog.V(2).InfoS("Created RecommendedNetworkPolicy", "RecommendedNetworkPolicy", recommendedNetworkPolicy.Name, "NetworkRecommendationPolicy", npReco.Name)
return c.updateNPRecommendationStatus(npReco, crdv1alpha1.NetworkPolicyRecommendationStatus{
State: crdv1alpha1.NPRecommendationStateCompleted,
RecommendedNP: recommendedNetworkPolicy,
EndTime: metav1.NewTime(time.Now()),
})
}

return nil
}

Expand All @@ -395,12 +427,7 @@ func (c *NPRecommendationController) updateProgress(npReco *crdv1alpha1.NetworkP
if state != crdv1alpha1.NPRecommendationStateRunning {
return nil
}
service := fmt.Sprintf("pr-%s-ui-svc", npReco.Status.SparkApplication)
serviceIP, servicePort, err := getServiceAddr(c.kubeClient, service, npReco.Namespace, sparkPortName)
if err != nil {
return fmt.Errorf("error when getting the progress of the job, cannot get Spark Monitor Service address: %v", err)
}
endpoint := fmt.Sprintf("http://%s:%d", serviceIP, servicePort)
endpoint := GetSparkMonitoringSvcDNS(npReco.Status.SparkApplication, npReco.Namespace)
completedStages, totalStages, err := getPolicyRecommendationProgress(endpoint)
if err != nil {
// The Spark Monitoring Service may not start or closed at this point due to the async
Expand Down Expand Up @@ -470,8 +497,8 @@ func (c *NPRecommendationController) startJob(npReco *crdv1alpha1.NetworkPolicyR
return err
}
err := c.startSparkApplication(npReco)
// Mark the NetworkPolicyRecommendation as failed and not retry if it failed due to invalid request
if err != nil && strings.Contains(err.Error(), "invalid request") {
// Mark the NetworkPolicyRecommendation as failed and not retry if it failed due to illeagel arguements in request
if err != nil && reflect.TypeOf(err) == reflect.TypeOf(IlleagelArguementError{}) {
return c.updateNPRecommendationStatus(
npReco,
crdv1alpha1.NetworkPolicyRecommendationStatus{
Expand All @@ -493,12 +520,12 @@ func (c *NPRecommendationController) startJob(npReco *crdv1alpha1.NetworkPolicyR
func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.NetworkPolicyRecommendation) error {
var recoJobArgs []string
if npReco.Spec.JobType != "initial" && npReco.Spec.JobType != "subsequent" {
return fmt.Errorf("invalid request: recommendation type should be 'initial' or 'subsequent'")
return IlleagelArguementError{fmt.Errorf("invalid request: recommendation type should be 'initial' or 'subsequent'")}
}
recoJobArgs = append(recoJobArgs, "--type", npReco.Spec.JobType)

if npReco.Spec.Limit < 0 {
return fmt.Errorf("invalid request: limit should be an integer >= 0")
return IlleagelArguementError{fmt.Errorf("invalid request: limit should be an integer >= 0")}
}
recoJobArgs = append(recoJobArgs, "--limit", strconv.Itoa(npReco.Spec.Limit))

Expand All @@ -510,7 +537,7 @@ func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.N
} else if npReco.Spec.PolicyType == "k8s-np" {
policyTypeArg = 3
} else {
return fmt.Errorf("invalid request: type of generated NetworkPolicy should be anp-deny-applied or anp-deny-all or k8s-np")
return IlleagelArguementError{fmt.Errorf("invalid request: type of generated NetworkPolicy should be anp-deny-applied or anp-deny-all or k8s-np")}
}
recoJobArgs = append(recoJobArgs, "--option", strconv.Itoa(policyTypeArg))

Expand All @@ -520,20 +547,14 @@ func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.N
if !npReco.Spec.EndInterval.IsZero() {
endAfterStart := npReco.Spec.EndInterval.After(npReco.Spec.StartInterval.Time)
if !endAfterStart {
return fmt.Errorf("invalid request: EndInterval should be after StartInterval")
return IlleagelArguementError{fmt.Errorf("invalid request: EndInterval should be after StartInterval")}
}
recoJobArgs = append(recoJobArgs, "--end_time", npReco.Spec.EndInterval.Format(inputTimeFormat))
}

if len(npReco.Spec.NSAllowList) > 0 {
nsAllowListStr := "["
for i, nsAllow := range npReco.Spec.NSAllowList {
nsAllowListStr += ("\"" + nsAllow + "\"")
if i != len(npReco.Spec.NSAllowList)-1 {
nsAllowListStr += ","
}
}
nsAllowListStr += "]"
nsAllowListStr := strings.Join(npReco.Spec.NSAllowList, "\",\"")
nsAllowListStr = "[\"" + nsAllowListStr + "\"]"
recoJobArgs = append(recoJobArgs, "--ns_allow_list", nsAllowListStr)
}

Expand All @@ -549,31 +570,31 @@ func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.N
}{}

if npReco.Spec.ExecutorInstances < 0 {
return fmt.Errorf("invalid request: ExecutorInstances should be an integer >= 0")
return IlleagelArguementError{fmt.Errorf("invalid request: ExecutorInstances should be an integer >= 0")}
}
sparkResourceArgs.executorInstances = int32(npReco.Spec.ExecutorInstances)

matchResult, err := regexp.MatchString(k8sQuantitiesReg, npReco.Spec.DriverCoreRequest)
if err != nil || !matchResult {
return fmt.Errorf("invalid request: DriverCoreRequest should conform to the Kubernetes resource quantity convention")
return IlleagelArguementError{fmt.Errorf("invalid request: DriverCoreRequest should conform to the Kubernetes resource quantity convention")}
}
sparkResourceArgs.driverCoreRequest = npReco.Spec.DriverCoreRequest

matchResult, err = regexp.MatchString(k8sQuantitiesReg, npReco.Spec.DriverMemory)
if err != nil || !matchResult {
return fmt.Errorf("invalid request: DriverMemory should conform to the Kubernetes resource quantity convention")
return IlleagelArguementError{fmt.Errorf("invalid request: DriverMemory should conform to the Kubernetes resource quantity convention")}
}
sparkResourceArgs.driverMemory = npReco.Spec.DriverMemory

matchResult, err = regexp.MatchString(k8sQuantitiesReg, npReco.Spec.ExecutorCoreRequest)
if err != nil || !matchResult {
return fmt.Errorf("invalid request: ExecutorCoreRequest should conform to the Kubernetes resource quantity convention")
return IlleagelArguementError{fmt.Errorf("invalid request: ExecutorCoreRequest should conform to the Kubernetes resource quantity convention")}
}
sparkResourceArgs.executorCoreRequest = npReco.Spec.ExecutorCoreRequest

matchResult, err = regexp.MatchString(k8sQuantitiesReg, npReco.Spec.ExecutorMemory)
if err != nil || !matchResult {
return fmt.Errorf("invalid request: ExecutorMemory should conform to the Kubernetes resource quantity convention")
return IlleagelArguementError{fmt.Errorf("invalid request: ExecutorMemory should conform to the Kubernetes resource quantity convention")}
}
sparkResourceArgs.executorMemory = npReco.Spec.ExecutorMemory

Expand Down Expand Up @@ -638,7 +659,7 @@ func (c *NPRecommendationController) startSparkApplication(npReco *crdv1alpha1.N
},
},
}
CreateSparkApplication(c.kubeClient, npReco.Namespace, recommendationApplication)
err = CreateSparkApplication(c.kubeClient, npReco.Namespace, recommendationApplication)
if err != nil {
return fmt.Errorf("failed to create Spark Application: %v", err)
}
Expand Down
Loading

0 comments on commit 8ddd85e

Please sign in to comment.