Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yongming Ding <dyongming@vmware.com>
  • Loading branch information
Yongming Ding committed Jun 4, 2022
1 parent 9a8b26e commit 0f04a83
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 74 deletions.
19 changes: 10 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,17 @@ policy-recommendation:
docker tag antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION) projects.registry.vmware.com/antrea/theia-policy-recommendation
docker tag antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION) projects.registry.vmware.com/antrea/theia-policy-recommendation:$(DOCKER_IMG_VERSION)

.PHONY: theia
theia:
@mkdir -p $(BINDIR)
GOOS=linux $(GO) build -o $(BINDIR) $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/pkg/theia
THEIA_BINARIES := theia-darwin theia-linux theia-windows
$(THEIA_BINARIES): theia-%:
@GOOS=$* $(GO) build -o $(BINDIR)/$@ $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/pkg/theia
@if [[ $@ != *windows ]]; then \
chmod 0755 $(BINDIR)/$@; \
else \
mv $(BINDIR)/$@ $(BINDIR)/$@.exe; \
fi

# Add the darwin version binary to help dev&test on Mac for now
.PHONY: theia-darwin
theia-darwin:
@mkdir -p $(BINDIR)
GOOS=darwin $(GO) build -o $(BINDIR) $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/pkg/theia
.PHONY: theia
theia: $(THEIA_BINARIES)

.PHONY: theia-release
theia-release:
Expand Down
10 changes: 5 additions & 5 deletions pkg/theia/commands/policy_recommendation_retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var policyRecommendationRetrieveCmd = &cobra.Command{
Use: "result",
Short: "Get the recommendation result of a policy recommendation Spark job",
Long: `Get the recommendation result of a policy recommendation Spark job by ID.
It will return the recommended network policies described in yaml.`,
It will return the recommended NetworkPolicies described in yaml.`,
Args: cobra.RangeArgs(0, 1),
Example: `
Get the recommendation result with job ID e998433e-accb-4888-9fc8-06563f073e86
Expand All @@ -44,7 +44,7 @@ Or
$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86
Use a customized ClickHouse endpoint when connecting to ClickHouse to getting the result
$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --clickhouse-endpoint 10.10.1.1
Use Cluster IP when connecting to ClickHouse to getting the result
Use Service ClusterIP when connecting to ClickHouse to getting the result
$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip
Save the recommendation result to file
$ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip --file output.yaml
Expand Down Expand Up @@ -229,13 +229,13 @@ func init() {
policyRecommendationRetrieveCmd.Flags().Bool(
"use-cluster-ip",
false,
`Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse service.
(Only works when running in cluster)`,
`Enable this option will use Service ClusterIP instead of port forwarding when connecting to the ClickHouse service.
It can only be used when running theia in cluster.`,
)
policyRecommendationRetrieveCmd.Flags().StringP(
"file",
"f",
"",
"The file path where you want to save the results.",
"The file path where you want to save the result.",
)
}
101 changes: 52 additions & 49 deletions pkg/theia/commands/policy_recommendation_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"regexp"
"strconv"
"strings"
"time"

"github.com/google/uuid"
Expand All @@ -37,7 +38,7 @@ const (
sparkImage = "antrea/theia-policy-recommendation:latest"
sparkImagePullPolicy = "IfNotPresent"
sparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py"
sparkServiceAccount = "policy-reco-spark"
sparkServiceAccount = "policy-recommendation-spark"
sparkVersion = "3.1.1"
statusCheckPollInterval = 5 * time.Second
statusCheckPollTimeout = 60 * time.Minute
Expand All @@ -55,17 +56,15 @@ type SparkResourceArgs struct {
var policyRecommendationRunCmd = &cobra.Command{
Use: "run",
Short: "Run a new policy recommendation Spark job",
Long: `Run a new policy recommendation Spark job.
Network policies will be recommended based on the flow records sent by Flow Aggregator.
Must finish the deployment of Theia first, please follow the steps in
https://github.com/antrea-io/theia/blob/main/docs/network-policy-recommendation.md`,
Example: `Run a policy recommendation spark job with default configuration
Long: `Run a new policy recommendation Spark job.
Must finish the deployment of Theia first`,
Example: `Run a policy recommendation Spark job with default configuration
$ theia policy-recommendation run
Run an initial policy recommendation spark job with network isolation option anp-deny-applied and limit on last 10k flow records
$ theia policy-recommendation run --type initial --option anp-deny-applied --limit 10000
Run an initial policy recommendation spark job with network isolation option anp-deny-applied and limit on flow records from 2022-01-01 00:00:00 to 2022-01-31 23:59:59.
$ theia policy-recommendation run --type initial --option anp-deny-applied --start-time '2022-01-01 00:00:00' --end-time '2022-01-31 23:59:59'
Run a policy recommendation spark job with default configuration but doesn't recommend toServices ANPs
Run an initial policy recommendation Spark job with policy type anp-deny-applied and limit on last 10k flow records
$ theia policy-recommendation run --type initial --policy-type anp-deny-applied --limit 10000
Run an initial policy recommendation Spark job with policy type anp-deny-applied and limit on flow records from 2022-01-01 00:00:00 to 2022-01-31 23:59:59.
$ theia policy-recommendation run --type initial --policy-type anp-deny-applied --start-time '2022-01-01 00:00:00' --end-time '2022-01-31 23:59:59'
Run a policy recommendation Spark job with default configuration but doesn't recommend toServices ANPs
$ theia policy-recommendation run --to-services=false
`,
RunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -90,22 +89,22 @@ $ theia policy-recommendation run --to-services=false
}
recoJobArgs = append(recoJobArgs, "--limit", strconv.Itoa(limit))

option, err := cmd.Flags().GetString("option")
policyType, err := cmd.Flags().GetString("policy-type")
if err != nil {
return err
}
var optionArg int
if option == "anp-deny-applied" {
optionArg = 1
} else if option == "anp-deny-all" {
optionArg = 2
} else if option == "k8s-np" {
optionArg = 3
var policyTypeArg int
if policyType == "anp-deny-applied" {
policyTypeArg = 1
} else if policyType == "anp-deny-all" {
policyTypeArg = 2
} else if policyType == "k8s-np" {
policyTypeArg = 3
} else {
return fmt.Errorf(`option of network isolation preference should be
return fmt.Errorf(`type of generated NetworkPolicy should be
anp-deny-applied or anp-deny-all or k8s-np`)
}
recoJobArgs = append(recoJobArgs, "--option", strconv.Itoa(optionArg))
recoJobArgs = append(recoJobArgs, "--option", strconv.Itoa(policyTypeArg))

startTime, err := cmd.Flags().GetString("start-time")
if err != nil {
Expand Down Expand Up @@ -152,11 +151,11 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f
recoJobArgs = append(recoJobArgs, "--ns_allow_list", nsAllowList)
}

rmLabels, err := cmd.Flags().GetBool("rm-labels")
excludeLabels, err := cmd.Flags().GetBool("exclude-labels")
if err != nil {
return err
}
recoJobArgs = append(recoJobArgs, "--rm_labels", strconv.FormatBool(rmLabels))
recoJobArgs = append(recoJobArgs, "--rm_labels", strconv.FormatBool(excludeLabels))

toServices, err := cmd.Flags().GetBool("to-services")
if err != nil {
Expand All @@ -179,7 +178,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f
}
matchResult, err := regexp.MatchString(k8sQuantitiesReg, driverCoreRequest)
if err != nil || !matchResult {
return fmt.Errorf("driver-core-request should conform to the Kubernetes convention")
return fmt.Errorf("driver-core-request should conform to the Kubernetes resource quantity convention")
}
sparkResourceArgs.driverCoreRequest = driverCoreRequest

Expand All @@ -189,7 +188,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f
}
matchResult, err = regexp.MatchString(k8sQuantitiesReg, driverMemory)
if err != nil || !matchResult {
return fmt.Errorf("driver-memory should conform to the Kubernetes convention")
return fmt.Errorf("driver-memory should conform to the Kubernetes resource quantity convention")
}
sparkResourceArgs.driverMemory = driverMemory

Expand All @@ -199,7 +198,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f
}
matchResult, err = regexp.MatchString(k8sQuantitiesReg, executorCoreRequest)
if err != nil || !matchResult {
return fmt.Errorf("executor-core-request should conform to the Kubernetes convention")
return fmt.Errorf("executor-core-request should conform to the Kubernetes resource quantity convention")
}
sparkResourceArgs.executorCoreRequest = executorCoreRequest

Expand All @@ -209,7 +208,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f
}
matchResult, err = regexp.MatchString(k8sQuantitiesReg, executorMemory)
if err != nil || !matchResult {
return fmt.Errorf("executor-memory should conform to the Kubernetes convention")
return fmt.Errorf("executor-memory should conform to the Kubernetes resource quantity convention")
}
sparkResourceArgs.executorMemory = executorMemory

Expand Down Expand Up @@ -240,7 +239,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f
Kind: "SparkApplication",
},
ObjectMeta: metav1.ObjectMeta{
Name: "policy-reco-" + recommendationID,
Name: "policy-recommendation-" + recommendationID,
Namespace: flowVisibilityNS,
},
Spec: sparkv1.SparkApplicationSpec{
Expand Down Expand Up @@ -321,6 +320,10 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f
}
})
if err != nil {
if strings.Contains(err.Error(), "timed out") {
return fmt.Errorf(`Spark job with ID %s wait timeout of 60 minutes expired.
Job is still running. Please check completion status for job via CLI later.`, recommendationID)
}
return err
}

Expand Down Expand Up @@ -376,14 +379,14 @@ func init() {
"The limit on the number of flow records read from the database. 0 means no limit.",
)
policyRecommendationRunCmd.Flags().StringP(
"option",
"o",
"policy-type",
"p",
"anp-deny-applied",
`Option of network isolation preference in policy recommendation.
Currently we support 3 options:
anp-deny-applied: Recommending allow ANP/ACNP policies, with default deny rules only on applied to Pod labels which have allow rules recommended.
`Types of generated NetworkPolicy.
Currently we have 3 generated NetworkPolicy types:
anp-deny-applied: Recommending allow ANP/ACNP policies, with default deny rules only on Pods which have an allow rule applied.
anp-deny-all: Recommending allow ANP/ACNP policies, with default deny rules for whole cluster.
k8s-np: Recommending allow K8s network policies, with no deny rules at all`,
k8s-np: Recommending allow K8s NetworkPolicies.`,
)
policyRecommendationRunCmd.Flags().StringP(
"start-time",
Expand All @@ -403,71 +406,71 @@ Format is YYYY-MM-DD hh:mm:ss in UTC timezone. No limit of the end time of flow
"ns-allow-list",
"n",
"",
`List of default traffic allow namespaces.
If no namespaces provided, Traffic inside Antrea CNI related namespaces: ['kube-system', 'flow-aggregator',
`List of default allow Namespaces.
If no Namespaces provided, Traffic inside Antrea CNI related Namespaces: ['kube-system', 'flow-aggregator',
'flow-visibility'] will be allowed by default.`,
)
policyRecommendationRunCmd.Flags().Bool(
"rm-labels",
"exclude-labels",
true,
`Enable this option will remove automatically generated Pod labels including 'pod-template-hash',
'controller-revision-hash', 'pod-template-generation'.`,
`Enable this option will exclude automatically generated Pod labels including 'pod-template-hash',
'controller-revision-hash', 'pod-template-generation' during policy recommendation.`,
)
policyRecommendationRunCmd.Flags().Bool(
"to-services",
true,
`Use the toServices feature in ANP and recommendation toServices rules for Pod-to-Service flows,
only works when option is 1 or 2.`,
only works when option is anp-deny-applied or anp-deny-all.`,
)
policyRecommendationRunCmd.Flags().Int32(
"executor-instances",
1,
"Specify the number of executors for the spark application. Example values include 1, 2, 8, etc.",
"Specify the number of executors for the Spark application. Example values include 1, 2, 8, etc.",
)
policyRecommendationRunCmd.Flags().String(
"driver-core-request",
"200m",
`Specify the cpu request for the driver Pod. Values conform to the Kubernetes convention.
`Specify the CPU request for the driver Pod. Values conform to the Kubernetes resource quantity convention.
Example values include 0.1, 500m, 1.5, 5, etc.`,
)
policyRecommendationRunCmd.Flags().String(
"driver-memory",
"512M",
`Specify the memory request for the driver Pod. Values conform to the Kubernetes convention.
`Specify the memory request for the driver Pod. Values conform to the Kubernetes resource quantity convention.
Example values include 512M, 1G, 8G, etc.`,
)
policyRecommendationRunCmd.Flags().String(
"executor-core-request",
"200m",
`Specify the cpu request for the executor Pod. Values conform to the Kubernetes convention.
`Specify the CPU request for the executor Pod. Values conform to the Kubernetes resource quantity convention.
Example values include 0.1, 500m, 1.5, 5, etc.`,
)
policyRecommendationRunCmd.Flags().String(
"executor-memory",
"512M",
`Specify the memory request for the executor Pod. Values conform to the Kubernetes convention.
`Specify the memory request for the executor Pod. Values conform to the Kubernetes resource quantity convention.
Example values include 512M, 1G, 8G, etc.`,
)
policyRecommendationRunCmd.Flags().Bool(
"wait",
false,
"Enable this option will hold and wait the whole policy recommendation job finished.",
"Enable this option will hold and wait the whole policy recommendation job finishes.",
)
policyRecommendationRunCmd.Flags().String(
"clickhouse-endpoint",
"",
"The ClickHouse Service endpoint. (Only works when wait is enabled)",
"The ClickHouse Service endpoint. It can only be used when wait is enabled.",
)
policyRecommendationRunCmd.Flags().Bool(
"use-cluster-ip",
false,
`Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse Service.
It can only be used when running in cluster. (Only works when wait is enabled)`,
It can only be used when running in cluster and when wait is enabled.`,
)
policyRecommendationRunCmd.Flags().StringP(
"file",
"f",
"",
"The file path where you want to save the results. (Only works when wait is enabled)",
"The file path where you want to save the result. It can only be used when wait is enabled.",
)
}
12 changes: 6 additions & 6 deletions pkg/theia/commands/policy_recommendation_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Check the current status of job with ID e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation status --id e998433e-accb-4888-9fc8-06563f073e86
Or
$ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86
Use Cluster IP when checking the current status of job with ID e998433e-accb-4888-9fc8-06563f073e86
Use Service ClusterIP when checking the current status of job with ID e998433e-accb-4888-9fc8-06563f073e86
$ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 --use-cluster-ip
`,
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -83,7 +83,7 @@ $ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 --use-
}
if state == "RUNNING" {
var endpoint string
service := fmt.Sprintf("policy-reco-%s-ui-svc", recoID)
service := fmt.Sprintf("policy-recommendation-%s-ui-svc", recoID)
if useClusterIP {
serviceIP, servicePort, err := GetServiceAddr(clientset, service)
if err != nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func getPolicyRecommendationStatus(clientset kubernetes.Interface, recoID string
AbsPath("/apis/sparkoperator.k8s.io/v1beta2").
Namespace(flowVisibilityNS).
Resource("sparkapplications").
Name("policy-reco-" + recoID).
Name("policy-recommendation-" + recoID).
Do(context.TODO()).
Into(sparkApplication)
if err != nil {
Expand All @@ -135,7 +135,7 @@ func getPolicyRecommendationStatus(clientset kubernetes.Interface, recoID string
}

func getPolicyRecommendationProgress(baseUrl string) (string, error) {
// Get the id of current spark application
// Get the id of current Spark application
url := fmt.Sprintf("%s/api/v1/applications", baseUrl)
response, err := getResponseFromSparkMonitoringSvc(url)
if err != nil {
Expand Down Expand Up @@ -212,7 +212,7 @@ func init() {
policyRecommendationStatusCmd.Flags().Bool(
"use-cluster-ip",
false,
`Enable this option will use ClusterIP instead of port forwarding when connecting to the Spark Monitoring Service.
(Only works when running in cluster)`,
`Enable this option will use Service ClusterIP instead of port forwarding when connecting to the Spark Monitoring Service.
It can only be used when running theia in cluster.`,
)
}
6 changes: 3 additions & 3 deletions pkg/theia/commands/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ func PolicyRecoPreCheck(clientset kubernetes.Interface) error {
func CheckSparkOperatorPod(clientset kubernetes.Interface) error {
// Check the deployment of Spark Operator in flow-visibility ns
pods, err := clientset.CoreV1().Pods(flowVisibilityNS).List(context.TODO(), metav1.ListOptions{
LabelSelector: "app.kubernetes.io/instance=policy-reco,app.kubernetes.io/name=spark-operator",
LabelSelector: "app.kubernetes.io/instance=policy-recommendation,app.kubernetes.io/name=spark-operator",
})
if err != nil {
return fmt.Errorf("error %v when finding the policy-reco-spark-operator Pod, please check the deployment of the Spark Operator", err)
return fmt.Errorf("error %v when finding the policy-recommendation-spark-operator Pod, please check the deployment of the Spark Operator", err)
}
if len(pods.Items) < 1 {
return fmt.Errorf("can't find the policy-reco-spark-operator Pod, please check the deployment of the Spark Operator")
return fmt.Errorf("can't find the policy-recommendation-spark-operator Pod, please check the deployment of the Spark Operator")
}
hasRunningPod := false
for _, pod := range pods.Items {
Expand Down
2 changes: 1 addition & 1 deletion pkg/theia/commands/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var fakeClientset = fake.NewSimpleClientset(
Name: "spark-operator",
Namespace: flowVisibilityNS,
Labels: map[string]string{
"app.kubernetes.io/instance": "policy-reco",
"app.kubernetes.io/instance": "policy-recommendation",
"app.kubernetes.io/name": "spark-operator",
},
},
Expand Down
2 changes: 1 addition & 1 deletion plugins/policy-recommendation/policy_recommendation_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ def main(argv):
Options:
-h, --help: Show help message.
-t, --type=initial: {initial|subsequent} Indicates this recommendation is an initial recommendion or a subsequent recommendation job.
-d, --db_jdbc_url=None: The JDBC URL used by Spark jobs connect to the ClickHouse database for reading flow records and writing results.
-d, --db_jdbc_url=None: The JDBC URL used by Spark jobs connect to the ClickHouse database for reading flow records and writing result.
jdbc:clickhouse://clickhouse-clickhouse.flow-visibility.svc:8123 is the ClickHouse JDBC URL used by default.
-l, --limit=0: The limit on the number of flow records read from the database. 0 means no limit.
-o, --option=1: Option of network isolation preference in policy recommendation.
Expand Down

0 comments on commit 0f04a83

Please sign in to comment.