From 0f04a832b87bff9fc8eb293fdcf63127067f7452 Mon Sep 17 00:00:00 2001 From: Yongming Ding Date: Fri, 3 Jun 2022 16:28:10 -0700 Subject: [PATCH] Address comments Signed-off-by: Yongming Ding --- Makefile | 19 ++-- .../policy_recommendation_retrieve.go | 10 +- .../commands/policy_recommendation_run.go | 101 +++++++++--------- .../commands/policy_recommendation_status.go | 12 +-- pkg/theia/commands/utils.go | 6 +- pkg/theia/commands/utils_test.go | 2 +- .../policy_recommendation_job.py | 2 +- 7 files changed, 78 insertions(+), 74 deletions(-) diff --git a/Makefile b/Makefile index 4d76207d1..384d02e1f 100644 --- a/Makefile +++ b/Makefile @@ -167,16 +167,17 @@ policy-recommendation: docker tag antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION) projects.registry.vmware.com/antrea/theia-policy-recommendation docker tag antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION) projects.registry.vmware.com/antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION) -.PHONY: theia -theia: - @mkdir -p $(BINDIR) - GOOS=linux $(GO) build -o $(BINDIR) $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/pkg/theia +THEIA_BINARIES := theia-darwin theia-linux theia-windows +$(THEIA_BINARIES): theia-%: + @GOOS=$* $(GO) build -o $(BINDIR)/$@ $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/pkg/theia + @if [[ $@ != *windows ]]; then \ + chmod 0755 $(BINDIR)/$@; \ + else \ + mv $(BINDIR)/$@ $(BINDIR)/$@.exe; \ + fi -# Add the darwin version binary to help dev&test on Mac for now -.PHONY: theia-darwin -theia-darwin: - @mkdir -p $(BINDIR) - GOOS=darwin $(GO) build -o $(BINDIR) $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/pkg/theia +.PHONY: theia +theia: $(THEIA_BINARIES) .PHONY: theia-release theia-release: diff --git a/pkg/theia/commands/policy_recommendation_retrieve.go b/pkg/theia/commands/policy_recommendation_retrieve.go index 0f01f78eb..7e416fc27 100644 --- a/pkg/theia/commands/policy_recommendation_retrieve.go +++ b/pkg/theia/commands/policy_recommendation_retrieve.go @@ -35,7 +35,7 @@ var policyRecommendationRetrieveCmd = &cobra.Command{ Use: "result", Short: "Get the recommendation result of a policy recommendation Spark job", Long: `Get the recommendation result of a policy recommendation Spark job by ID. -It will return the recommended network policies described in yaml.`, +It will return the recommended NetworkPolicies described in yaml.`, Args: cobra.RangeArgs(0, 1), Example: ` Get the recommendation result with job ID e998433e-accb-4888-9fc8-06563f073e86 @@ -44,7 +44,7 @@ Or $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 Use a customized ClickHouse endpoint when connecting to ClickHouse to getting the result $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --clickhouse-endpoint 10.10.1.1 -Use Cluster IP when connecting to ClickHouse to getting the result +Use Service ClusterIP when connecting to ClickHouse to getting the result $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip Save the recommendation result to file $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip --file output.yaml @@ -229,13 +229,13 @@ func init() { policyRecommendationRetrieveCmd.Flags().Bool( "use-cluster-ip", false, - `Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse service. -(Only works when running in cluster)`, + `Enable this option will use Service ClusterIP instead of port forwarding when connecting to the ClickHouse service. +It can only be used when running theia in cluster.`, ) policyRecommendationRetrieveCmd.Flags().StringP( "file", "f", "", - "The file path where you want to save the results.", + "The file path where you want to save the result.", ) } diff --git a/pkg/theia/commands/policy_recommendation_run.go b/pkg/theia/commands/policy_recommendation_run.go index e8040f6d9..16c97ac30 100644 --- a/pkg/theia/commands/policy_recommendation_run.go +++ b/pkg/theia/commands/policy_recommendation_run.go @@ -21,6 +21,7 @@ import ( "net/url" "regexp" "strconv" + "strings" "time" "github.com/google/uuid" @@ -37,7 +38,7 @@ const ( sparkImage = "antrea/theia-policy-recommendation:latest" sparkImagePullPolicy = "IfNotPresent" sparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py" - sparkServiceAccount = "policy-reco-spark" + sparkServiceAccount = "policy-recommendation-spark" sparkVersion = "3.1.1" statusCheckPollInterval = 5 * time.Second statusCheckPollTimeout = 60 * time.Minute @@ -55,17 +56,15 @@ type SparkResourceArgs struct { var policyRecommendationRunCmd = &cobra.Command{ Use: "run", Short: "Run a new policy recommendation Spark job", - Long: `Run a new policy recommendation Spark job. -Network policies will be recommended based on the flow records sent by Flow Aggregator. -Must finish the deployment of Theia first, please follow the steps in -https://github.com/antrea-io/theia/blob/main/docs/network-policy-recommendation.md`, - Example: `Run a policy recommendation spark job with default configuration + Long: `Run a new policy recommendation Spark job. +Must finish the deployment of Theia first`, + Example: `Run a policy recommendation Spark job with default configuration $ theia policy-recommendation run -Run an initial policy recommendation spark job with network isolation option anp-deny-applied and limit on last 10k flow records -$ theia policy-recommendation run --type initial --option anp-deny-applied --limit 10000 -Run an initial policy recommendation spark job with network isolation option anp-deny-applied and limit on flow records from 2022-01-01 00:00:00 to 2022-01-31 23:59:59. -$ theia policy-recommendation run --type initial --option anp-deny-applied --start-time '2022-01-01 00:00:00' --end-time '2022-01-31 23:59:59' -Run a policy recommendation spark job with default configuration but doesn't recommend toServices ANPs +Run an initial policy recommendation Spark job with policy type anp-deny-applied and limit on last 10k flow records +$ theia policy-recommendation run --type initial --policy-type anp-deny-applied --limit 10000 +Run an initial policy recommendation Spark job with policy type anp-deny-applied and limit on flow records from 2022-01-01 00:00:00 to 2022-01-31 23:59:59. +$ theia policy-recommendation run --type initial --policy-type anp-deny-applied --start-time '2022-01-01 00:00:00' --end-time '2022-01-31 23:59:59' +Run a policy recommendation Spark job with default configuration but doesn't recommend toServices ANPs $ theia policy-recommendation run --to-services=false `, RunE: func(cmd *cobra.Command, args []string) error { @@ -90,22 +89,22 @@ $ theia policy-recommendation run --to-services=false } recoJobArgs = append(recoJobArgs, "--limit", strconv.Itoa(limit)) - option, err := cmd.Flags().GetString("option") + policyType, err := cmd.Flags().GetString("policy-type") if err != nil { return err } - var optionArg int - if option == "anp-deny-applied" { - optionArg = 1 - } else if option == "anp-deny-all" { - optionArg = 2 - } else if option == "k8s-np" { - optionArg = 3 + var policyTypeArg int + if policyType == "anp-deny-applied" { + policyTypeArg = 1 + } else if policyType == "anp-deny-all" { + policyTypeArg = 2 + } else if policyType == "k8s-np" { + policyTypeArg = 3 } else { - return fmt.Errorf(`option of network isolation preference should be + return fmt.Errorf(`type of generated NetworkPolicy should be anp-deny-applied or anp-deny-all or k8s-np`) } - recoJobArgs = append(recoJobArgs, "--option", strconv.Itoa(optionArg)) + recoJobArgs = append(recoJobArgs, "--option", strconv.Itoa(policyTypeArg)) startTime, err := cmd.Flags().GetString("start-time") if err != nil { @@ -152,11 +151,11 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f recoJobArgs = append(recoJobArgs, "--ns_allow_list", nsAllowList) } - rmLabels, err := cmd.Flags().GetBool("rm-labels") + excludeLabels, err := cmd.Flags().GetBool("exclude-labels") if err != nil { return err } - recoJobArgs = append(recoJobArgs, "--rm_labels", strconv.FormatBool(rmLabels)) + recoJobArgs = append(recoJobArgs, "--rm_labels", strconv.FormatBool(excludeLabels)) toServices, err := cmd.Flags().GetBool("to-services") if err != nil { @@ -179,7 +178,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f } matchResult, err := regexp.MatchString(k8sQuantitiesReg, driverCoreRequest) if err != nil || !matchResult { - return fmt.Errorf("driver-core-request should conform to the Kubernetes convention") + return fmt.Errorf("driver-core-request should conform to the Kubernetes resource quantity convention") } sparkResourceArgs.driverCoreRequest = driverCoreRequest @@ -189,7 +188,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f } matchResult, err = regexp.MatchString(k8sQuantitiesReg, driverMemory) if err != nil || !matchResult { - return fmt.Errorf("driver-memory should conform to the Kubernetes convention") + return fmt.Errorf("driver-memory should conform to the Kubernetes resource quantity convention") } sparkResourceArgs.driverMemory = driverMemory @@ -199,7 +198,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f } matchResult, err = regexp.MatchString(k8sQuantitiesReg, executorCoreRequest) if err != nil || !matchResult { - return fmt.Errorf("executor-core-request should conform to the Kubernetes convention") + return fmt.Errorf("executor-core-request should conform to the Kubernetes resource quantity convention") } sparkResourceArgs.executorCoreRequest = executorCoreRequest @@ -209,7 +208,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f } matchResult, err = regexp.MatchString(k8sQuantitiesReg, executorMemory) if err != nil || !matchResult { - return fmt.Errorf("executor-memory should conform to the Kubernetes convention") + return fmt.Errorf("executor-memory should conform to the Kubernetes resource quantity convention") } sparkResourceArgs.executorMemory = executorMemory @@ -240,7 +239,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f Kind: "SparkApplication", }, ObjectMeta: metav1.ObjectMeta{ - Name: "policy-reco-" + recommendationID, + Name: "policy-recommendation-" + recommendationID, Namespace: flowVisibilityNS, }, Spec: sparkv1.SparkApplicationSpec{ @@ -321,6 +320,10 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f } }) if err != nil { + if strings.Contains(err.Error(), "timed out") { + return fmt.Errorf(`Spark job with ID %s wait timeout of 60 minutes expired. +Job is still running. Please check completion status for job via CLI later.`, recommendationID) + } return err } @@ -376,14 +379,14 @@ func init() { "The limit on the number of flow records read from the database. 0 means no limit.", ) policyRecommendationRunCmd.Flags().StringP( - "option", - "o", + "policy-type", + "p", "anp-deny-applied", - `Option of network isolation preference in policy recommendation. -Currently we support 3 options: -anp-deny-applied: Recommending allow ANP/ACNP policies, with default deny rules only on applied to Pod labels which have allow rules recommended. + `Types of generated NetworkPolicy. +Currently we have 3 generated NetworkPolicy types: +anp-deny-applied: Recommending allow ANP/ACNP policies, with default deny rules only on Pods which have an allow rule applied. anp-deny-all: Recommending allow ANP/ACNP policies, with default deny rules for whole cluster. -k8s-np: Recommending allow K8s network policies, with no deny rules at all`, +k8s-np: Recommending allow K8s NetworkPolicies.`, ) policyRecommendationRunCmd.Flags().StringP( "start-time", @@ -403,71 +406,71 @@ Format is YYYY-MM-DD hh:mm:ss in UTC timezone. No limit of the end time of flow "ns-allow-list", "n", "", - `List of default traffic allow namespaces. -If no namespaces provided, Traffic inside Antrea CNI related namespaces: ['kube-system', 'flow-aggregator', + `List of default allow Namespaces. +If no Namespaces provided, Traffic inside Antrea CNI related Namespaces: ['kube-system', 'flow-aggregator', 'flow-visibility'] will be allowed by default.`, ) policyRecommendationRunCmd.Flags().Bool( - "rm-labels", + "exclude-labels", true, - `Enable this option will remove automatically generated Pod labels including 'pod-template-hash', -'controller-revision-hash', 'pod-template-generation'.`, + `Enable this option will exclude automatically generated Pod labels including 'pod-template-hash', +'controller-revision-hash', 'pod-template-generation' during policy recommendation.`, ) policyRecommendationRunCmd.Flags().Bool( "to-services", true, `Use the toServices feature in ANP and recommendation toServices rules for Pod-to-Service flows, -only works when option is 1 or 2.`, +only works when option is anp-deny-applied or anp-deny-all.`, ) policyRecommendationRunCmd.Flags().Int32( "executor-instances", 1, - "Specify the number of executors for the spark application. Example values include 1, 2, 8, etc.", + "Specify the number of executors for the Spark application. Example values include 1, 2, 8, etc.", ) policyRecommendationRunCmd.Flags().String( "driver-core-request", "200m", - `Specify the cpu request for the driver Pod. Values conform to the Kubernetes convention. + `Specify the CPU request for the driver Pod. Values conform to the Kubernetes resource quantity convention. Example values include 0.1, 500m, 1.5, 5, etc.`, ) policyRecommendationRunCmd.Flags().String( "driver-memory", "512M", - `Specify the memory request for the driver Pod. Values conform to the Kubernetes convention. + `Specify the memory request for the driver Pod. Values conform to the Kubernetes resource quantity convention. Example values include 512M, 1G, 8G, etc.`, ) policyRecommendationRunCmd.Flags().String( "executor-core-request", "200m", - `Specify the cpu request for the executor Pod. Values conform to the Kubernetes convention. + `Specify the CPU request for the executor Pod. Values conform to the Kubernetes resource quantity convention. Example values include 0.1, 500m, 1.5, 5, etc.`, ) policyRecommendationRunCmd.Flags().String( "executor-memory", "512M", - `Specify the memory request for the executor Pod. Values conform to the Kubernetes convention. + `Specify the memory request for the executor Pod. Values conform to the Kubernetes resource quantity convention. Example values include 512M, 1G, 8G, etc.`, ) policyRecommendationRunCmd.Flags().Bool( "wait", false, - "Enable this option will hold and wait the whole policy recommendation job finished.", + "Enable this option will hold and wait the whole policy recommendation job finishes.", ) policyRecommendationRunCmd.Flags().String( "clickhouse-endpoint", "", - "The ClickHouse Service endpoint. (Only works when wait is enabled)", + "The ClickHouse Service endpoint. It can only be used when wait is enabled.", ) policyRecommendationRunCmd.Flags().Bool( "use-cluster-ip", false, `Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse Service. -It can only be used when running in cluster. (Only works when wait is enabled)`, +It can only be used when running in cluster and when wait is enabled.`, ) policyRecommendationRunCmd.Flags().StringP( "file", "f", "", - "The file path where you want to save the results. (Only works when wait is enabled)", + "The file path where you want to save the result. It can only be used when wait is enabled.", ) } diff --git a/pkg/theia/commands/policy_recommendation_status.go b/pkg/theia/commands/policy_recommendation_status.go index e13bc4e6c..d09cc54f3 100644 --- a/pkg/theia/commands/policy_recommendation_status.go +++ b/pkg/theia/commands/policy_recommendation_status.go @@ -44,7 +44,7 @@ Check the current status of job with ID e998433e-accb-4888-9fc8-06563f073e86 $ theia policy-recommendation status --id e998433e-accb-4888-9fc8-06563f073e86 Or $ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 -Use Cluster IP when checking the current status of job with ID e998433e-accb-4888-9fc8-06563f073e86 +Use Service ClusterIP when checking the current status of job with ID e998433e-accb-4888-9fc8-06563f073e86 $ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip `, RunE: func(cmd *cobra.Command, args []string) error { @@ -83,7 +83,7 @@ $ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 --use- } if state == "RUNNING" { var endpoint string - service := fmt.Sprintf("policy-reco-%s-ui-svc", recoID) + service := fmt.Sprintf("policy-recommendation-%s-ui-svc", recoID) if useClusterIP { serviceIP, servicePort, err := GetServiceAddr(clientset, service) if err != nil { @@ -124,7 +124,7 @@ func getPolicyRecommendationStatus(clientset kubernetes.Interface, recoID string AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). Namespace(flowVisibilityNS). Resource("sparkapplications"). - Name("policy-reco-" + recoID). + Name("policy-recommendation-" + recoID). Do(context.TODO()). Into(sparkApplication) if err != nil { @@ -135,7 +135,7 @@ func getPolicyRecommendationStatus(clientset kubernetes.Interface, recoID string } func getPolicyRecommendationProgress(baseUrl string) (string, error) { - // Get the id of current spark application + // Get the id of current Spark application url := fmt.Sprintf("%s/api/v1/applications", baseUrl) response, err := getResponseFromSparkMonitoringSvc(url) if err != nil { @@ -212,7 +212,7 @@ func init() { policyRecommendationStatusCmd.Flags().Bool( "use-cluster-ip", false, - `Enable this option will use ClusterIP instead of port forwarding when connecting to the Spark Monitoring Service. -(Only works when running in cluster)`, + `Enable this option will use Service ClusterIP instead of port forwarding when connecting to the Spark Monitoring Service. +It can only be used when running theia in cluster.`, ) } diff --git a/pkg/theia/commands/utils.go b/pkg/theia/commands/utils.go index e2ada6f5a..dc8dc5c6b 100644 --- a/pkg/theia/commands/utils.go +++ b/pkg/theia/commands/utils.go @@ -56,13 +56,13 @@ func PolicyRecoPreCheck(clientset kubernetes.Interface) error { func CheckSparkOperatorPod(clientset kubernetes.Interface) error { // Check the deployment of Spark Operator in flow-visibility ns pods, err := clientset.CoreV1().Pods(flowVisibilityNS).List(context.TODO(), metav1.ListOptions{ - LabelSelector: "app.kubernetes.io/instance=policy-reco,app.kubernetes.io/name=spark-operator", + LabelSelector: "app.kubernetes.io/instance=policy-recommendation,app.kubernetes.io/name=spark-operator", }) if err != nil { - return fmt.Errorf("error %v when finding the policy-reco-spark-operator Pod, please check the deployment of the Spark Operator", err) + return fmt.Errorf("error %v when finding the policy-recommendation-spark-operator Pod, please check the deployment of the Spark Operator", err) } if len(pods.Items) < 1 { - return fmt.Errorf("can't find the policy-reco-spark-operator Pod, please check the deployment of the Spark Operator") + return fmt.Errorf("can't find the policy-recommendation-spark-operator Pod, please check the deployment of the Spark Operator") } hasRunningPod := false for _, pod := range pods.Items { diff --git a/pkg/theia/commands/utils_test.go b/pkg/theia/commands/utils_test.go index 82dc5da3d..497943fe4 100644 --- a/pkg/theia/commands/utils_test.go +++ b/pkg/theia/commands/utils_test.go @@ -39,7 +39,7 @@ var fakeClientset = fake.NewSimpleClientset( Name: "spark-operator", Namespace: flowVisibilityNS, Labels: map[string]string{ - "app.kubernetes.io/instance": "policy-reco", + "app.kubernetes.io/instance": "policy-recommendation", "app.kubernetes.io/name": "spark-operator", }, }, diff --git a/plugins/policy-recommendation/policy_recommendation_job.py b/plugins/policy-recommendation/policy_recommendation_job.py index 9f6858f39..d611862a8 100644 --- a/plugins/policy-recommendation/policy_recommendation_job.py +++ b/plugins/policy-recommendation/policy_recommendation_job.py @@ -747,7 +747,7 @@ def main(argv): Options: -h, --help: Show help message. -t, --type=initial: {initial|subsequent} Indicates this recommendation is an initial recommendion or a subsequent recommendation job. - -d, --db_jdbc_url=None: The JDBC URL used by Spark jobs connect to the ClickHouse database for reading flow records and writing results. + -d, --db_jdbc_url=None: The JDBC URL used by Spark jobs connect to the ClickHouse database for reading flow records and writing result. jdbc:clickhouse://clickhouse-clickhouse.flow-visibility.svc:8123 is the ClickHouse JDBC URL used by default. -l, --limit=0: The limit on the number of flow records read from the database. 0 means no limit. -o, --option=1: Option of network isolation preference in policy recommendation.