From e4120cb8cd1ed4a2018747f9c840f68bdf852acd Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Tue, 14 Jun 2022 18:44:24 -0700 Subject: [PATCH] Add CLI command to retrieve metrics in clickhouse database 1. Add CLI commands to get diagnostic infos about Clickhouse DB 2. Add e2e test for theia CLI Signed-off-by: Yun-Tang Hsu --- ci/jenkins/test-vmc.sh | 2 + ci/kind/test-e2e-kind.sh | 4 +- docs/theia-cli.md | 81 +++- pkg/theia/commands/clickhouse.go | 44 +++ pkg/theia/commands/clickhouse_status.go | 271 +++++++++++++ pkg/theia/commands/config/config.go | 29 ++ .../commands/policy_recommendation_delete.go | 7 +- .../commands/policy_recommendation_list.go | 5 +- .../policy_recommendation_retrieve.go | 2 +- .../policy_recommendation_retrieve_test.go | 8 +- .../commands/policy_recommendation_run.go | 40 +- .../commands/policy_recommendation_status.go | 3 +- pkg/theia/commands/utils.go | 31 +- pkg/theia/commands/utils_test.go | 10 +- test/e2e/fixture.go | 10 +- test/e2e/flowvisibility_test.go | 11 +- test/e2e/framework.go | 81 ++-- test/e2e/policyrecommendation_test.go | 4 +- test/e2e/theia_clickhouse_test.go | 374 ++++++++++++++++++ 19 files changed, 922 insertions(+), 95 deletions(-) create mode 100644 pkg/theia/commands/clickhouse.go create mode 100644 pkg/theia/commands/clickhouse_status.go create mode 100644 pkg/theia/commands/config/config.go create mode 100644 test/e2e/theia_clickhouse_test.go diff --git a/ci/jenkins/test-vmc.sh b/ci/jenkins/test-vmc.sh index 3749b9f3a..a9cc493ed 100644 --- a/ci/jenkins/test-vmc.sh +++ b/ci/jenkins/test-vmc.sh @@ -337,6 +337,8 @@ function deliver_antrea { ${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --ch-size 100Mi --ch-monitor-threshold 0.1 > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility.yml ${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --no-grafana --spark-operator > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility-with-spark.yml + ${GIT_CHECKOUT_DIR}/hack/generate-manifest.sh --no-grafana > ${GIT_CHECKOUT_DIR}/build/yamls/flow-visibility-ch-only.yml + ${SCP_WITH_ANTREA_CI_KEY} $GIT_CHECKOUT_DIR/build/charts/theia/crds/clickhouse-operator-install-bundle.yaml capv@${control_plane_ip}:~ ${SCP_WITH_ANTREA_CI_KEY} $GIT_CHECKOUT_DIR/build/yamls/*.yml capv@${control_plane_ip}:~ diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 764f0191e..2577d0b6e 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -40,6 +40,7 @@ TESTBED_CMD=$(dirname $0)"/kind-setup.sh" YML_DIR=$(dirname $0)"/../../build/yamls" FLOW_VISIBILITY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --ch-size 100Mi --ch-monitor-threshold 0.1" FLOW_VISIBILITY_WITH_SPARK_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana --spark-operator" +FLOW_VISIBILITY_CH_ONLY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana" CH_OPERATOR_YML=$(dirname $0)"/../../build/charts/theia/crds/clickhouse-operator-install-bundle.yaml" make theia-linux @@ -161,13 +162,14 @@ function run_test { docker exec -i kind-control-plane dd of=/root/clickhouse-operator-install-bundle.yaml < $CH_OPERATOR_YML $FLOW_VISIBILITY_CMD | docker exec -i kind-control-plane dd of=/root/flow-visibility.yml $FLOW_VISIBILITY_WITH_SPARK_CMD | docker exec -i kind-control-plane dd of=/root/flow-visibility-with-spark.yml + $FLOW_VISIBILITY_CH_ONLY_CMD | docker exec -i kind-control-plane dd of=/root/flow-visibility-ch-only.yml docker exec -i kind-control-plane dd of=/root/theia < $THEIACTL_BIN rm -rf $TMP_DIR sleep 1 - go test -v -timeout=20m antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist + go test -v -timeout=30m antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist } echo "======== Test encap mode ==========" diff --git a/docs/theia-cli.md b/docs/theia-cli.md index 9b8c4a76f..1761160ae 100644 --- a/docs/theia-cli.md +++ b/docs/theia-cli.md @@ -8,6 +8,12 @@ visibility capabilities. - [Installation](#installation) - [Usage](#usage) + - [NetworkPolicy Recommendation feature](#networkpolicy-recommendation-feature) + - [ClickHouse](#clickhouse) + - [Disk usage information](#disk-usage-information) + - [Table Information](#table-information) + - [Insertion rate](#insertion-rate) + - [Stack trace](#stack-trace) ## Installation @@ -36,8 +42,11 @@ theia help ## Usage -To see the list of available commands and options, run `theia help`. Currently, -we have 5 commands for the NetworkPolicy Recommendation feature: +To see the list of available commands and options, run `theia help`. + +### NetworkPolicy Recommendation feature + +We currently have 5 commands for NetworkPolicy Recommendation: - `theia policy-recommendation run` - `theia policy-recommendation status` @@ -47,3 +56,71 @@ we have 5 commands for the NetworkPolicy Recommendation feature: For details, please refer to [NetworkPolicy recommendation doc]( networkpolicy-recommendation.md) + +### ClickHouse + +From Theia v0.2, we introduce one command for ClickHouse: + +- `theia clickhouse status [flags]` + +#### Disk usage information + +The `--diskInfo` flag will list disk usage information of each ClickHouse shard. `Shard`, `DatabaseName`, `Path`, `Free` +, `Total` and `Used_Percentage`of each ClickHouse shard will be displayed in table format. For example: + +```bash +> theia clickhouse status --diskInfo +Shard DatabaseName Path Free Total Used_Percentage +1 default /var/lib/clickhouse/ 1.84 GiB 1.84 GiB 0.04 % +``` + +#### Table Information + +The `--tableInfo` flag will list basic table information of each ClickHouse shard. `Shard`, `DatabaseName`, `TableName`, +`TotalRows`, `TotalBytes` and `TotalCol`of tables in each ClickHouse shard will be displayed in table format. For example: + +```bash +> theia clickhouse status --tableInfo +Shard DatabaseName TableName TotalRows TotalBytes TotalCols +1 default .inner.flows_node_view 7 2.84 KiB 16 +1 default .inner.flows_pod_view 131 5.00 KiB 20 +1 default .inner.flows_policy_view 131 6.28 KiB 27 +1 default flows 267 18.36 KiB 49 +``` + +#### Insertion rate + +The `--insertRate` flag will list the insertion rate of each ClickHouse shard. `Shard`, `RowsPerSecond`, and +`BytesPerSecond` of each ClickHouse shard will be displayed in table format. For example: + +```bash +> theia clickhouse status --insertRate +Shard RowsPerSecond BytesPerSecond +1 230 6.31 KiB +``` + +#### Stack trace + +If ClickHouse is busy with something, and you don’t know what’s happening, you can check the stacktraces of all +the threads which are working. + +The `--stackTraces` flag will list the stacktraces of each ClickHouse shard. `Shard`, `trace_function`, and +`count()` of each ClickHouse shard will be displayed in table format. For example: + +```bash +> theia clickhouse status --stackTraces +Row 1: +------- +Shard: 1 +trace_functions: pthread_cond_timedwait@@GLIBC_2.3.2\nPoco::EventImpl::waitImpl(long)\nPoco::NotificationQueue:: +waitDequeueNotification(long)\nDB::BackgroundSchedulePool::threadFunction()\n\nThreadPoolImpl:: +worker(std::__1::__list_iterator)\n\nstart_thread\n__clone +count(): 128 + +Row 2: +------- +Shard: 1 +trace_functions: __poll\nPoco::Net::SocketImpl::pollImpl(Poco::Timespan&, int)\nPoco::Net::SocketImpl::poll(Poco:: +Timespan const&, int)\nPoco::Net::TCPServer::run()\nPoco::ThreadImpl::runnableEntry(void*)\nstart_thread\n__clone +count(): 5 +``` diff --git a/pkg/theia/commands/clickhouse.go b/pkg/theia/commands/clickhouse.go new file mode 100644 index 000000000..3dc8673e8 --- /dev/null +++ b/pkg/theia/commands/clickhouse.go @@ -0,0 +1,44 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +var clickHouseCmd = &cobra.Command{ + Use: "clickhouse", + Aliases: []string{"ch"}, + Short: "Commands of Theia ClickHouse feature", + Run: func(cmd *cobra.Command, args []string) { + fmt.Println("Error: must also specify a subcommand to run like status") + }, +} + +func init() { + rootCmd.AddCommand(clickHouseCmd) + clickHouseCmd.PersistentFlags().String( + "clickhouse-endpoint", + "", + "The ClickHouse service endpoint.") + clickHouseCmd.PersistentFlags().Bool( + "use-cluster-ip", + false, + `Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse Service. +It can only be used when running in cluster.`, + ) +} diff --git a/pkg/theia/commands/clickhouse_status.go b/pkg/theia/commands/clickhouse_status.go new file mode 100644 index 000000000..42490d0d1 --- /dev/null +++ b/pkg/theia/commands/clickhouse_status.go @@ -0,0 +1,271 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commands + +import ( + "database/sql" + "fmt" + "net/url" + "strings" + + "github.com/spf13/cobra" +) + +type chOptions struct { + diskInfo bool + tableInfo bool + insertRate bool + stackTraces bool +} + +type diskInfo struct { + shard string + name string + path string + freeSpace string + totalSpace string + usedPercentage string +} + +type tableInfo struct { + shard string + database string + tableName string + totalRows string + totalBytes string + totalCols string +} + +type insertRate struct { + shard string + rowsPerSec string + bytesPerSec string +} + +type stackTraces struct { + shard string + traceFunctions string + count string +} + +const ( + diskQuery int = iota + tableInfoQuery + // average writing rate for all tables per second + insertRateQuery + stackTracesQuery +) + +var queryMap = map[int]string{ + diskQuery: ` +SELECT + shardNum() as Shard, + name as DatabaseName, + path as Path, + formatReadableSize(free_space) as Free, + formatReadableSize(total_space) as Total, + TRUNCATE((1 - free_space/total_space) * 100, 2) as Used_Percentage +FROM cluster('{cluster}', system.disks);`, + tableInfoQuery: ` +SELECT + Shard, + DatabaseName, + TableName, + TotalRows, + TotalBytes, + TotalCols +FROM ( + SELECT + shardNum() as Shard, + database AS DatabaseName, + name AS TableName, + total_rows AS TotalRows, + formatReadableSize(total_bytes) AS TotalBytes + FROM cluster('{cluster}', system.tables) WHERE database = 'default' + ) as t1 + INNER JOIN ( + SELECT + shardNum() as Shard, + table_catalog as DatabaseName, + table_name as TableName, + COUNT(*) as TotalCols + FROM cluster('{cluster}', INFORMATION_SCHEMA.COLUMNS) + WHERE table_catalog == 'default' + GROUP BY table_name, table_catalog, Shard + ) as t2 + ON t1.DatabaseName = t2.DatabaseName and t1.TableName = t2.TableName and t1.Shard = t2.Shard`, + // average writing rate for all tables per second + insertRateQuery: ` +SELECT + sd.Shard, + sd.RowsPerSecond, + sd.BytesPerSecond +FROM ( + SELECT + shardNum() as Shard, + (intDiv(toUInt32(date_trunc('minute', toDateTime(event_time))), 2) * 2) * 1000 as t, + TRUNCATE(avg(ProfileEvent_InsertedRows),0) as RowsPerSecond, + formatReadableSize(avg(ProfileEvent_InsertedBytes)) as BytesPerSecond, + ROW_NUMBER() OVER(PARTITION BY shardNum() ORDER BY t DESC) rowNumber + FROM cluster('{cluster}', system.metric_log) + GROUP BY t, shardNum() + ORDER BY t DESC, shardNum() + ) sd +WHERE sd.rowNumber=1`, + stackTracesQuery: ` +SELECT + shardNum() as Shard, + arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\\n') AS trace_function, + count() +FROM cluster('{cluster}', system.stack_trace) +GROUP BY trace_function, Shard +ORDER BY count() +DESC SETTINGS allow_introspection_functions=1`, +} + +var options *chOptions + +var clickHouseStatusCmd = &cobra.Command{ + Use: "status", + Short: "Get diagnostic infos of ClickHouse database", + Example: example, + Args: cobra.NoArgs, + RunE: getClickHouseStatus, +} + +var example = strings.Trim(` +theia clickhouse status --diskInfo +theia clickhouse status --diskInfo --tableInfo +theia clickhouse status --diskInfo --tableInfo --insertRate +`, "\n") + +func init() { + clickHouseCmd.AddCommand(clickHouseStatusCmd) + options = &chOptions{} + clickHouseStatusCmd.Flags().BoolVar(&options.diskInfo, "diskInfo", false, "check disk usage information") + clickHouseStatusCmd.Flags().BoolVar(&options.tableInfo, "tableInfo", false, "check basic table information") + clickHouseStatusCmd.Flags().BoolVar(&options.insertRate, "insertRate", false, "check the insertion-rate of clickhouse") + clickHouseStatusCmd.Flags().BoolVar(&options.stackTraces, "stackTraces", false, "check stacktrace of clickhouse") +} + +func getClickHouseStatus(cmd *cobra.Command, args []string) error { + if !options.diskInfo && !options.tableInfo && !options.insertRate && !options.stackTraces { + return fmt.Errorf("no metric related flag is specified") + } + kubeconfig, err := ResolveKubeConfig(cmd) + if err != nil { + return err + } + clientset, err := CreateK8sClient(kubeconfig) + if err != nil { + return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err) + } + + endpoint, err := cmd.Flags().GetString("clickhouse-endpoint") + if err != nil { + return err + } + if endpoint != "" { + _, err := url.ParseRequestURI(endpoint) + if err != nil { + return fmt.Errorf("failed to decode input endpoint %s into a url, err: %v", endpoint, err) + } + } + useClusterIP, err := cmd.Flags().GetBool("use-cluster-ip") + if err != nil { + return err + } + if err := CheckClickHousePod(clientset); err != nil { + return err + } + // Connect to ClickHouse and get the result + connect, pf, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + if err != nil { + return err + } + if pf != nil { + defer pf.Stop() + } + if options.diskInfo { + data, err := getDataFromClickHouse(connect, diskQuery) + if err != nil { + return fmt.Errorf("error when getting diskInfo from clickhouse: %v", err) + } + TableOutput(data) + } + if options.tableInfo { + data, err := getDataFromClickHouse(connect, tableInfoQuery) + if err != nil { + return fmt.Errorf("error when getting tableInfo from clickhouse: %v", err) + } + TableOutput(data) + } + if options.insertRate { + data, err := getDataFromClickHouse(connect, insertRateQuery) + if err != nil { + return fmt.Errorf("error when getting insertRate from clickhouse: %v", err) + } + TableOutput(data) + } + if options.stackTraces { + data, err := getDataFromClickHouse(connect, stackTracesQuery) + if err != nil { + return fmt.Errorf("error when getting stackTraces from clickhouse: %v", err) + } + TableOutputVertical(data) + } + return nil +} + +func getDataFromClickHouse(connect *sql.DB, query int) ([][]string, error) { + result, err := connect.Query(queryMap[query]) + if err != nil { + return nil, fmt.Errorf("failed to get data from clickhouse: %v", err) + } + defer result.Close() + columnName, err := result.Columns() + if err != nil { + return nil, fmt.Errorf("failed to get the name of columns: %v", err) + } + var data [][]string + data = append(data, columnName) + for result.Next() { + switch query { + case diskQuery: + var res diskInfo + result.Scan(&res.shard, &res.name, &res.path, &res.freeSpace, &res.totalSpace, &res.usedPercentage) + data = append(data, []string{res.shard, res.name, res.path, res.freeSpace, res.totalSpace, res.usedPercentage + " %"}) + case tableInfoQuery: + res := tableInfo{} + result.Scan(&res.shard, &res.database, &res.tableName, &res.totalRows, &res.totalBytes, &res.totalCols) + if !strings.Contains(res.tableName, "inner") && res.tableName != "flows" { + continue + } + data = append(data, []string{res.shard, res.database, res.tableName, res.totalRows, res.totalBytes, res.totalCols}) + case insertRateQuery: + res := insertRate{} + result.Scan(&res.shard, &res.rowsPerSec, &res.bytesPerSec) + data = append(data, []string{res.shard, res.rowsPerSec, res.bytesPerSec}) + case stackTracesQuery: + res := stackTraces{} + result.Scan(&res.shard, &res.traceFunctions, &res.count) + data = append(data, []string{res.shard, res.traceFunctions, res.count}) + } + } + if len(data) <= 1 { + return nil, fmt.Errorf("no data is returned by database") + } + return data, nil +} diff --git a/pkg/theia/commands/config/config.go b/pkg/theia/commands/config/config.go new file mode 100644 index 000000000..21dbb3980 --- /dev/null +++ b/pkg/theia/commands/config/config.go @@ -0,0 +1,29 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import "time" + +const ( + FlowVisibilityNS = "flow-visibility" + K8sQuantitiesReg = "^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$" + SparkImage = "projects.registry.vmware.com/antrea/theia-policy-recommendation:latest" + SparkImagePullPolicy = "IfNotPresent" + SparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py" + SparkServiceAccount = "policy-recommendation-spark" + SparkVersion = "3.1.1" + StatusCheckPollInterval = 5 * time.Second + StatusCheckPollTimeout = 60 * time.Minute +) diff --git a/pkg/theia/commands/policy_recommendation_delete.go b/pkg/theia/commands/policy_recommendation_delete.go index 24cabc16b..a7cb8ec93 100644 --- a/pkg/theia/commands/policy_recommendation_delete.go +++ b/pkg/theia/commands/policy_recommendation_delete.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" + "antrea.io/theia/pkg/theia/commands/config" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) @@ -82,7 +83,7 @@ $ theia policy-recommendation delete e998433e-accb-4888-9fc8-06563f073e86 clientset.CoreV1().RESTClient().Delete(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Name("pr-" + recoID). Do(context.TODO()) @@ -102,7 +103,7 @@ func getPolicyRecommendationIdMap(clientset kubernetes.Interface, kubeconfig str sparkApplicationList := &sparkv1.SparkApplicationList{} err = clientset.CoreV1().RESTClient().Get(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Do(context.TODO()).Into(sparkApplicationList) if err != nil { @@ -123,7 +124,7 @@ func getPolicyRecommendationIdMap(clientset kubernetes.Interface, kubeconfig str } func deletePolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, recoID string) (err error) { - connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + connect, portForward, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) if portForward != nil { defer portForward.Stop() } diff --git a/pkg/theia/commands/policy_recommendation_list.go b/pkg/theia/commands/policy_recommendation_list.go index b28b1cc58..6a8d3d95e 100644 --- a/pkg/theia/commands/policy_recommendation_list.go +++ b/pkg/theia/commands/policy_recommendation_list.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" + "antrea.io/theia/pkg/theia/commands/config" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) @@ -73,7 +74,7 @@ $ theia policy-recommendation list sparkApplicationList := &sparkv1.SparkApplicationList{} err = clientset.CoreV1().RESTClient().Get(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Do(context.TODO()).Into(sparkApplicationList) if err != nil { @@ -121,7 +122,7 @@ $ theia policy-recommendation list } func getCompletedPolicyRecommendationList(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (completedPolicyRecommendationList []policyRecommendationRow, err error) { - connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + connect, portForward, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) if portForward != nil { defer portForward.Stop() } diff --git a/pkg/theia/commands/policy_recommendation_retrieve.go b/pkg/theia/commands/policy_recommendation_retrieve.go index 873c29cc1..712f8b753 100644 --- a/pkg/theia/commands/policy_recommendation_retrieve.go +++ b/pkg/theia/commands/policy_recommendation_retrieve.go @@ -100,7 +100,7 @@ $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --us } func getPolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, filePath string, recoID string) (recoResult string, err error) { - connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + connect, portForward, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) if portForward != nil { defer portForward.Stop() } diff --git a/pkg/theia/commands/policy_recommendation_retrieve_test.go b/pkg/theia/commands/policy_recommendation_retrieve_test.go index f94823e6b..d1e1c3be9 100644 --- a/pkg/theia/commands/policy_recommendation_retrieve_test.go +++ b/pkg/theia/commands/policy_recommendation_retrieve_test.go @@ -22,6 +22,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + + "antrea.io/theia/pkg/theia/commands/config" ) func TestGetClickHouseSecret(t *testing.T) { @@ -38,7 +40,7 @@ func TestGetClickHouseSecret(t *testing.T) { &v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "clickhouse-secret", - Namespace: flowVisibilityNS, + Namespace: config.FlowVisibilityNS, }, Data: map[string][]byte{ "username": []byte("clickhouse_operator"), @@ -63,7 +65,7 @@ func TestGetClickHouseSecret(t *testing.T) { &v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "clickhouse-secret", - Namespace: flowVisibilityNS, + Namespace: config.FlowVisibilityNS, }, Data: map[string][]byte{ "password": []byte("clickhouse_operator_password"), @@ -80,7 +82,7 @@ func TestGetClickHouseSecret(t *testing.T) { &v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "clickhouse-secret", - Namespace: flowVisibilityNS, + Namespace: config.FlowVisibilityNS, }, Data: map[string][]byte{ "username": []byte("clickhouse_operator"), diff --git a/pkg/theia/commands/policy_recommendation_run.go b/pkg/theia/commands/policy_recommendation_run.go index b7cb347fd..d5a8d587c 100644 --- a/pkg/theia/commands/policy_recommendation_run.go +++ b/pkg/theia/commands/policy_recommendation_run.go @@ -29,18 +29,8 @@ import ( "k8s.io/apimachinery/pkg/util/wait" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" -) -const ( - flowVisibilityNS = "flow-visibility" - k8sQuantitiesReg = "^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$" - sparkImage = "projects.registry.vmware.com/antrea/theia-policy-recommendation:latest" - sparkImagePullPolicy = "IfNotPresent" - sparkAppFile = "local:///opt/spark/work-dir/policy_recommendation_job.py" - sparkServiceAccount = "policy-recommendation-spark" - sparkVersion = "3.1.1" - statusCheckPollInterval = 5 * time.Second - statusCheckPollTimeout = 60 * time.Minute + "antrea.io/theia/pkg/theia/commands/config" ) type SparkResourceArgs struct { @@ -175,7 +165,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f if err != nil { return err } - matchResult, err := regexp.MatchString(k8sQuantitiesReg, driverCoreRequest) + matchResult, err := regexp.MatchString(config.K8sQuantitiesReg, driverCoreRequest) if err != nil || !matchResult { return fmt.Errorf("driver-core-request should conform to the Kubernetes resource quantity convention") } @@ -185,7 +175,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f if err != nil { return err } - matchResult, err = regexp.MatchString(k8sQuantitiesReg, driverMemory) + matchResult, err = regexp.MatchString(config.K8sQuantitiesReg, driverMemory) if err != nil || !matchResult { return fmt.Errorf("driver-memory should conform to the Kubernetes resource quantity convention") } @@ -195,7 +185,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f if err != nil { return err } - matchResult, err = regexp.MatchString(k8sQuantitiesReg, executorCoreRequest) + matchResult, err = regexp.MatchString(config.K8sQuantitiesReg, executorCoreRequest) if err != nil || !matchResult { return fmt.Errorf("executor-core-request should conform to the Kubernetes resource quantity convention") } @@ -205,7 +195,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f if err != nil { return err } - matchResult, err = regexp.MatchString(k8sQuantitiesReg, executorMemory) + matchResult, err = regexp.MatchString(config.K8sQuantitiesReg, executorMemory) if err != nil || !matchResult { return fmt.Errorf("executor-memory should conform to the Kubernetes resource quantity convention") } @@ -239,22 +229,22 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f }, ObjectMeta: metav1.ObjectMeta{ Name: "pr-" + recommendationID, - Namespace: flowVisibilityNS, + Namespace: config.FlowVisibilityNS, }, Spec: sparkv1.SparkApplicationSpec{ Type: "Python", - SparkVersion: sparkVersion, + SparkVersion: config.SparkVersion, Mode: "cluster", - Image: ConstStrToPointer(sparkImage), - ImagePullPolicy: ConstStrToPointer(sparkImagePullPolicy), - MainApplicationFile: ConstStrToPointer(sparkAppFile), + Image: ConstStrToPointer(config.SparkImage), + ImagePullPolicy: ConstStrToPointer(config.SparkImagePullPolicy), + MainApplicationFile: ConstStrToPointer(config.SparkAppFile), Arguments: recoJobArgs, Driver: sparkv1.DriverSpec{ CoreRequest: &driverCoreRequest, SparkPodSpec: sparkv1.SparkPodSpec{ Memory: &driverMemory, Labels: map[string]string{ - "version": sparkVersion, + "version": config.SparkVersion, }, EnvSecretKeyRefs: map[string]sparkv1.NameKey{ "CH_USERNAME": { @@ -266,7 +256,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f Key: "password", }, }, - ServiceAccount: ConstStrToPointer(sparkServiceAccount), + ServiceAccount: ConstStrToPointer(config.SparkServiceAccount), }, }, Executor: sparkv1.ExecutorSpec{ @@ -274,7 +264,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f SparkPodSpec: sparkv1.SparkPodSpec{ Memory: &executorMemory, Labels: map[string]string{ - "version": sparkVersion, + "version": config.SparkVersion, }, EnvSecretKeyRefs: map[string]sparkv1.NameKey{ "CH_USERNAME": { @@ -295,7 +285,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f err = clientset.CoreV1().RESTClient(). Post(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Body(recommendationApplication). Do(context.TODO()). @@ -304,7 +294,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f return err } if waitFlag { - err = wait.Poll(statusCheckPollInterval, statusCheckPollTimeout, func() (bool, error) { + err = wait.Poll(config.StatusCheckPollInterval, config.StatusCheckPollTimeout, func() (bool, error) { state, err := getPolicyRecommendationStatus(clientset, recommendationID) if err != nil { return false, err diff --git a/pkg/theia/commands/policy_recommendation_status.go b/pkg/theia/commands/policy_recommendation_status.go index d73b32d3c..e640d63bb 100644 --- a/pkg/theia/commands/policy_recommendation_status.go +++ b/pkg/theia/commands/policy_recommendation_status.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "antrea.io/theia/pkg/theia/commands/config" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) @@ -146,7 +147,7 @@ func getSparkAppByRecommendationID(clientset kubernetes.Interface, id string) (s err = clientset.CoreV1().RESTClient(). Get(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Name("pr-" + id). Do(context.TODO()). diff --git a/pkg/theia/commands/utils.go b/pkg/theia/commands/utils.go index 77f017afd..af88d53eb 100644 --- a/pkg/theia/commands/utils.go +++ b/pkg/theia/commands/utils.go @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + "antrea.io/theia/pkg/theia/commands/config" "antrea.io/theia/pkg/theia/portforwarder" ) @@ -62,7 +63,7 @@ func PolicyRecoPreCheck(clientset kubernetes.Interface) error { func CheckSparkOperatorPod(clientset kubernetes.Interface) error { // Check the deployment of Spark Operator in flow-visibility ns - pods, err := clientset.CoreV1().Pods(flowVisibilityNS).List(context.TODO(), metav1.ListOptions{ + pods, err := clientset.CoreV1().Pods(config.FlowVisibilityNS).List(context.TODO(), metav1.ListOptions{ LabelSelector: "app.kubernetes.io/name=spark-operator", }) if err != nil { @@ -79,14 +80,14 @@ func CheckSparkOperatorPod(clientset kubernetes.Interface) error { } } if !hasRunningPod { - return fmt.Errorf("can't find a running ClickHouse Pod, please check the deployment of ClickHouse") + return fmt.Errorf("can't find a running Spark Operator Pod, please check the deployment of Spark") } return nil } func CheckClickHousePod(clientset kubernetes.Interface) error { // Check the ClickHouse deployment in flow-visibility namespace - pods, err := clientset.CoreV1().Pods(flowVisibilityNS).List(context.TODO(), metav1.ListOptions{ + pods, err := clientset.CoreV1().Pods(config.FlowVisibilityNS).List(context.TODO(), metav1.ListOptions{ LabelSelector: "app=clickhouse", }) if err != nil { @@ -115,7 +116,7 @@ func ConstStrToPointer(constStr string) *string { func GetServiceAddr(clientset kubernetes.Interface, serviceName string) (string, int, error) { var serviceIP string var servicePort int - service, err := clientset.CoreV1().Services(flowVisibilityNS).Get(context.TODO(), serviceName, metav1.GetOptions{}) + service, err := clientset.CoreV1().Services(config.FlowVisibilityNS).Get(context.TODO(), serviceName, metav1.GetOptions{}) if err != nil { return serviceIP, servicePort, fmt.Errorf("error when finding the Service %s: %v", serviceName, err) } @@ -132,12 +133,12 @@ func GetServiceAddr(clientset kubernetes.Interface, serviceName string) (string, } func StartPortForward(kubeconfig string, service string, servicePort int, listenAddress string, listenPort int) (*portforwarder.PortForwarder, error) { - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + configuration, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, err } // Forward the policy recommendation service port - pf, err := portforwarder.NewServicePortForwarder(config, flowVisibilityNS, service, servicePort, listenAddress, listenPort) + pf, err := portforwarder.NewServicePortForwarder(configuration, config.FlowVisibilityNS, service, servicePort, listenAddress, listenPort) if err != nil { return nil, err } @@ -165,7 +166,7 @@ func ResolveKubeConfig(cmd *cobra.Command) (string, error) { } func getClickHouseSecret(clientset kubernetes.Interface) (username []byte, password []byte, err error) { - secret, err := clientset.CoreV1().Secrets(flowVisibilityNS).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) + secret, err := clientset.CoreV1().Secrets(config.FlowVisibilityNS).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) if err != nil { return username, password, fmt.Errorf("error %v when finding the ClickHouse secret, please check the deployment of ClickHouse", err) } @@ -211,7 +212,7 @@ func connectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, err return connect, nil } -func setupClickHouseConnection(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (connect *sql.DB, portForward *portforwarder.PortForwarder, err error) { +func SetupClickHouseConnection(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (connect *sql.DB, portForward *portforwarder.PortForwarder, err error) { if endpoint == "" { service := "clickhouse-clickhouse" if useClusterIP { @@ -257,6 +258,20 @@ func TableOutput(table [][]string) { writer.Flush() } +func TableOutputVertical(table [][]string) { + header := table[0] + writer := tabwriter.NewWriter(os.Stdout, 15, 0, 1, ' ', 0) + for i := 1; i < len(table); i++ { + fmt.Fprintln(writer, fmt.Sprintf("Row %d:\t", i)) + fmt.Fprintln(writer, fmt.Sprint("-------")) + for j, val := range table[i] { + fmt.Fprintln(writer, fmt.Sprintf("%s:\t%s", header[j], val)) + } + fmt.Fprintln(writer) + writer.Flush() + } +} + func FormatTimestamp(timestamp time.Time) string { if timestamp.IsZero() { return "N/A" diff --git a/pkg/theia/commands/utils_test.go b/pkg/theia/commands/utils_test.go index feafec6d6..00a436491 100644 --- a/pkg/theia/commands/utils_test.go +++ b/pkg/theia/commands/utils_test.go @@ -21,6 +21,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + + "antrea.io/theia/pkg/theia/commands/config" ) func TestGetServiceAddr(t *testing.T) { @@ -38,7 +40,7 @@ func TestGetServiceAddr(t *testing.T) { &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "clickhouse-clickhouse", - Namespace: flowVisibilityNS, + Namespace: config.FlowVisibilityNS, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{Name: "tcp", Port: 9000}}, @@ -84,7 +86,7 @@ func TestPolicyRecoPreCheck(t *testing.T) { &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "clickhouse", - Namespace: flowVisibilityNS, + Namespace: config.FlowVisibilityNS, Labels: map[string]string{"app": "clickhouse"}, }, Status: v1.PodStatus{ @@ -94,7 +96,7 @@ func TestPolicyRecoPreCheck(t *testing.T) { &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "spark-operator", - Namespace: flowVisibilityNS, + Namespace: config.FlowVisibilityNS, Labels: map[string]string{ "app.kubernetes.io/name": "spark-operator", }, @@ -117,7 +119,7 @@ func TestPolicyRecoPreCheck(t *testing.T) { &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "spark-operator", - Namespace: flowVisibilityNS, + Namespace: config.FlowVisibilityNS, Labels: map[string]string{ "app.kubernetes.io/name": "spark-operator", }, diff --git a/test/e2e/fixture.go b/test/e2e/fixture.go index e49e78352..392b8cc73 100644 --- a/test/e2e/fixture.go +++ b/test/e2e/fixture.go @@ -281,7 +281,7 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestForFlowVisibility(tb testing.TB, withSparkOperator, withGrafana bool) (*TestData, bool, bool, error) { +func setupTestForFlowVisibility(tb testing.TB, withSparkOperator bool, withGrafana bool, withFlowAggregator bool) (*TestData, bool, bool, error) { v4Enabled := clusterInfo.podV4NetworkCIDR != "" v6Enabled := clusterInfo.podV6NetworkCIDR != "" testData, err := setupTest(tb) @@ -295,9 +295,11 @@ func setupTestForFlowVisibility(tb testing.TB, withSparkOperator, withGrafana bo return testData, v4Enabled, v6Enabled, err } tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP) - tb.Logf("Applying flow aggregator YAML") - if err := testData.deployFlowAggregator(); err != nil { - return testData, v4Enabled, v6Enabled, err + if withFlowAggregator { + tb.Logf("Applying flow aggregator YAML") + if err := testData.deployFlowAggregator(); err != nil { + return testData, v4Enabled, v6Enabled, err + } } return testData, v4Enabled, v6Enabled, nil } diff --git a/test/e2e/flowvisibility_test.go b/test/e2e/flowvisibility_test.go index 82f93fe88..b9ffc848c 100644 --- a/test/e2e/flowvisibility_test.go +++ b/test/e2e/flowvisibility_test.go @@ -22,13 +22,12 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "os/exec" - "syscall" - - "net" "strconv" "strings" + "syscall" "testing" "time" @@ -139,7 +138,7 @@ type testFlow struct { } func TestFlowVisibility(t *testing.T) { - data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, false, true) + data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, false, true, true) if err != nil { t.Errorf("Error when setting up test: %v", err) failOnError(err, t, data) @@ -150,7 +149,7 @@ func TestFlowVisibility(t *testing.T) { failOnError(err, t, data) } defer portForwardCmd.Process.Kill() - defer flowVisibilityCleanup(t, data, false) + defer flowVisibilityCleanup(t, data, false, true) podAIPs, podBIPs, podCIPs, podDIPs, podEIPs, err := createPerftestPods(data) if err != nil { @@ -1254,6 +1253,6 @@ func failOnError(err error, t *testing.T, data *TestData) { if portForwardCmd.Process != nil { portForwardCmd.Process.Kill() } - flowVisibilityCleanup(t, data, false) + flowVisibilityCleanup(t, data, false, true) t.Fatalf("test failed: %v", err) } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 931f0c53a..2430b7697 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -62,27 +62,28 @@ const ( defaultInterval = 1 * time.Second realizeTimeout = 5 * time.Minute - antreaNamespace string = "kube-system" - kubeNamespace string = "kube-system" - flowAggregatorNamespace string = "flow-aggregator" - flowVisibilityNamespace string = "flow-visibility" - testNamespace string = "antrea-test" - iperfPort int32 = 5201 - clickHouseHTTPPort string = "8123" - busyboxContainerName string = "busybox" - defaultBridgeName string = "br-int" - antreaYML string = "antrea.yml" - antreaDaemonSet string = "antrea-agent" - antreaDeployment string = "antrea-controller" - flowAggregatorDeployment string = "flow-aggregator" - flowAggregatorYML string = "flow-aggregator.yml" - flowVisibilityYML string = "flow-visibility.yml" - flowVisibilityWithSparkYML string = "flow-visibility-with-spark.yml" - chOperatorYML string = "clickhouse-operator-install-bundle.yaml" - flowVisibilityCHPodName string = "chi-clickhouse-clickhouse-0-0-0" - policyOutputYML string = "output.yaml" - sparkOperatorPodLabel string = "app.kubernetes.io/name=spark-operator" - grafanaPodLabel string = "app=grafana" + antreaNamespace string = "kube-system" + kubeNamespace string = "kube-system" + flowAggregatorNamespace string = "flow-aggregator" + flowVisibilityNamespace string = "flow-visibility" + testNamespace string = "antrea-test" + iperfPort int32 = 5201 + clickHouseHTTPPort string = "8123" + busyboxContainerName string = "busybox" + defaultBridgeName string = "br-int" + antreaYML string = "antrea.yml" + antreaDaemonSet string = "antrea-agent" + antreaDeployment string = "antrea-controller" + flowAggregatorDeployment string = "flow-aggregator" + flowAggregatorYML string = "flow-aggregator.yml" + flowVisibilityYML string = "flow-visibility.yml" + flowVisibilityWithSparkYML string = "flow-visibility-with-spark.yml" + flowVisibilityChOnlyYML string = "flow-visibility-ch-only.yml" + chOperatorYML string = "clickhouse-operator-install-bundle.yaml" + flowVisibilityCHPodNamePrefix string = "chi-clickhouse-clickhouse" + policyOutputYML string = "output.yaml" + sparkOperatorPodLabel string = "app.kubernetes.io/name=spark-operator" + grafanaPodLabel string = "app=grafana" agnhostImage = "k8s.gcr.io/e2e-test-images/agnhost:2.29" busyboxImage = "projects.registry.vmware.com/antrea/busybox" @@ -91,6 +92,8 @@ const ( exporterActiveFlowExportTimeout = 2 * time.Second aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond aggregatorClickHouseCommitInterval = 1 * time.Second + + shardNum = 1 ) type ClusterNode struct { @@ -1089,10 +1092,16 @@ func (data *TestData) createTestNamespace() error { // deployFlowVisibility deploys ClickHouse Operator and DB. If withSparkOperator/ // withGrafana is set to true, it also deploys Spark Operator/Grafana. func (data *TestData) deployFlowVisibility(withSparkOperator, withGrafana bool) (string, error) { - flowVisibilityManifest := flowVisibilityYML - if withSparkOperator { + + var flowVisibilityManifest string + if !withGrafana && !withSparkOperator { + flowVisibilityManifest = flowVisibilityChOnlyYML + } else if withSparkOperator { flowVisibilityManifest = flowVisibilityWithSparkYML + } else { + flowVisibilityManifest = flowVisibilityYML } + rc, _, _, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", chOperatorYML)) if err != nil || rc != 0 { return "", fmt.Errorf("error when deploying the ClickHouse Operator YML; %s not available on the control-plane Node", chOperatorYML) @@ -1126,10 +1135,12 @@ func (data *TestData) deployFlowVisibility(withSparkOperator, withGrafana bool) } // check for ClickHouse Pod ready. Wait for 2x timeout as ch operator needs to be running first to handle chi - if err = data.podWaitForReady(2*defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace); err != nil { - return "", err + for i := 0; i < shardNum; i++ { + chPodName := fmt.Sprintf("%s-%v-0-0", flowVisibilityCHPodNamePrefix, i) + if err = data.podWaitForReady(2*defaultTimeout, chPodName, flowVisibilityNamespace); err != nil { + return "", err + } } - // check ClickHouse Service http port for Service connectivity chSvc, err := data.GetService("flow-visibility", "clickhouse-clickhouse") if err != nil { @@ -1226,11 +1237,11 @@ func (data *TestData) deleteClickHouseOperator() error { return nil } -func teardownFlowVisibility(tb testing.TB, data *TestData, withSparkOperator bool) { +func teardownFlowVisibility(tb testing.TB, data *TestData, withSparkOperator bool, withGrafana bool) { if err := data.DeleteNamespace(flowAggregatorNamespace, defaultTimeout); err != nil { tb.Logf("Error when tearing down flow aggregator: %v", err) } - if err := data.deleteFlowVisibility(withSparkOperator); err != nil { + if err := data.deleteFlowVisibility(withSparkOperator, withGrafana); err != nil { tb.Logf("Error when deleting K8s resources created by flow visibility: %v", err) } if err := data.deleteClickHouseOperator(); err != nil { @@ -1238,10 +1249,14 @@ func teardownFlowVisibility(tb testing.TB, data *TestData, withSparkOperator boo } } -func (data *TestData) deleteFlowVisibility(withSparkOperator bool) error { - flowVisibilityManifest := flowVisibilityYML - if withSparkOperator { +func (data *TestData) deleteFlowVisibility(withSparkOperator bool, withGrafana bool) error { + var flowVisibilityManifest string + if !withGrafana && !withSparkOperator { + flowVisibilityManifest = flowVisibilityChOnlyYML + } else if withSparkOperator { flowVisibilityManifest = flowVisibilityWithSparkYML + } else { + flowVisibilityManifest = flowVisibilityYML } startTime := time.Now() defer func() { @@ -1352,7 +1367,7 @@ func (data *TestData) Cleanup(namespaces []string) { } } -func flowVisibilityCleanup(tb testing.TB, data *TestData, withSparkOperator bool) { +func flowVisibilityCleanup(tb testing.TB, data *TestData, withSparkOperator bool, withGrafana bool) { teardownTest(tb, data) - teardownFlowVisibility(tb, data, withSparkOperator) + teardownFlowVisibility(tb, data, withSparkOperator, withGrafana) } diff --git a/test/e2e/policyrecommendation_test.go b/test/e2e/policyrecommendation_test.go index 06f7fdb0e..9e5d03ad6 100644 --- a/test/e2e/policyrecommendation_test.go +++ b/test/e2e/policyrecommendation_test.go @@ -42,14 +42,14 @@ const ( ) func TestPolicyRecommendation(t *testing.T) { - data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, true, false) + data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, true, false, true) if err != nil { t.Fatalf("Error when setting up test: %v", err) } defer func() { teardownTest(t, data) deleteRecommendedPolicies(t, data) - teardownFlowVisibility(t, data, true) + teardownFlowVisibility(t, data, true, false) }() t.Run("testPolicyRecommendationRun", func(t *testing.T) { diff --git a/test/e2e/theia_clickhouse_test.go b/test/e2e/theia_clickhouse_test.go new file mode 100644 index 000000000..59d02b073 --- /dev/null +++ b/test/e2e/theia_clickhouse_test.go @@ -0,0 +1,374 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "crypto/rand" + "database/sql" + "fmt" + "math/big" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "antrea.io/theia/pkg/theia/commands" +) + +const ( + getDiskInfoCmd = "./theia clickhouse status --diskInfo" + getTableInfoCmd = "./theia clickhouse status --tableInfo" + getInsertRateCmd = "./theia clickhouse status --insertRate" + insertQuery = `INSERT INTO flows ( + flowStartSeconds, + flowEndSeconds, + flowEndSecondsFromSourceNode, + flowEndSecondsFromDestinationNode, + flowEndReason, + sourceIP, + destinationIP, + sourceTransportPort, + destinationTransportPort, + protocolIdentifier, + packetTotalCount, + octetTotalCount, + packetDeltaCount, + octetDeltaCount, + reversePacketTotalCount, + reverseOctetTotalCount, + reversePacketDeltaCount, + reverseOctetDeltaCount, + sourcePodName, + sourcePodNamespace, + sourceNodeName, + destinationPodName, + destinationPodNamespace, + destinationNodeName, + destinationClusterIP, + destinationServicePort, + destinationServicePortName, + ingressNetworkPolicyName, + ingressNetworkPolicyNamespace, + ingressNetworkPolicyRuleName, + ingressNetworkPolicyRuleAction, + ingressNetworkPolicyType, + egressNetworkPolicyName, + egressNetworkPolicyNamespace, + egressNetworkPolicyRuleName, + egressNetworkPolicyRuleAction, + egressNetworkPolicyType, + tcpState, + flowType, + sourcePodLabels, + destinationPodLabels, + throughput, + reverseThroughput, + throughputFromSourceNode, + throughputFromDestinationNode, + reverseThroughputFromSourceNode, + reverseThroughputFromDestinationNode) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?)` + recordPerCommit = 1000 + insertInterval = 1 + threshold = 25 + MaxInt32 = 1<<31 - 1 + numFieldsInDiskInfo = 9 + numFieldsInTableInfo = 7 + dateBaseName = "default" + defaultPath = "/var/lib/clickhouse/" +) + +var tableColumnNumberMap = map[string]string{ + ".inner.flows_node_view": "16", + ".inner.flows_node_view_local": "16", + ".inner.flows_pod_view": "20", + ".inner.flows_pod_view_local": "20", + ".inner.flows_policy_view": "27", + ".inner.flows_policy_view_local": "27", + "flows": "49", + "flows_local": "49", +} + +func TestTheiaClickHouseStatusCommand(t *testing.T) { + data, _, _, err := setupTestForFlowVisibility(t, false, false, false) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer func() { + teardownTest(t, data) + teardownFlowVisibility(t, data, false, false) + }() + + clientset := data.clientset + kubeconfig, err := data.provider.GetKubeconfigPath() + require.NoError(t, err) + connect, pf, err := commands.SetupClickHouseConnection(clientset, kubeconfig, "", false) + require.NoError(t, err) + if pf != nil { + defer pf.Stop() + } + + t.Run("testTheiaGetClickHouseDiskInfo", func(t *testing.T) { + testTheiaGetClickHouseDiskInfo(t, data) + }) + t.Run("testTheiaGetClickHouseTableInfo", func(t *testing.T) { + testTheiaGetClickHouseTableInfo(t, data, connect) + }) + t.Run("testTheiaGetClickHouseInsertRate", func(t *testing.T) { + testTheiaGetClickHouseInsertRate(t, data, connect) + }) + +} + +// Example output +// Shard DatabaseName Path Free Total Used_Percentage +// 1 default /var/lib/clickhouse/ 888.00 KiB 100.00 MiB 99.13 % +func testTheiaGetClickHouseDiskInfo(t *testing.T, data *TestData) { + // retrieve metrics + stdout, err := getClickHouseDBInfo(t, data, getDiskInfoCmd) + require.NoError(t, err) + resultArray := strings.Split(stdout, "\n") + assert := assert.New(t) + length := len(resultArray) + assert.GreaterOrEqualf(length, 2, "stdout: %s", stdout) + // Check header component + assert.Containsf(stdout, "Shard", "stdout: %s", stdout) + assert.Containsf(stdout, "DatabaseName", "stdout: %s", stdout) + assert.Containsf(stdout, "Path", "stdout: %s", stdout) + assert.Containsf(stdout, "Free", "stdout: %s", stdout) + assert.Containsf(stdout, "Total", "stdout: %s", stdout) + assert.Containsf(stdout, "Used_Percentage", "stdout: %s", stdout) + for i := 1; i < length; i++ { + // check metrics' value + diskInfoArray := strings.Fields(resultArray[i]) + assert.Equal(numFieldsInDiskInfo, len(diskInfoArray), "number of columns is not correct") + assert.Equalf(dateBaseName, diskInfoArray[1], "diskInfoArray: %s", diskInfoArray) + assert.Equalf(defaultPath, diskInfoArray[2], "diskInfoArray: %s", diskInfoArray) + usedStorage, err := strconv.ParseFloat(diskInfoArray[7], 64) + assert.NoError(err) + assert.GreaterOrEqual(threshold, int(usedStorage), "diskInfoArray: %s", diskInfoArray) + } +} + +// Example output +// Shard DatabaseName TableName TotalRows TotalBytes TotalCols +// 1 default .inner.flows_node_view 50000 4.19 MiB 16 +// 1 default .inner.flows_pod_view 48000 4.72 MiB 20 +// 1 default .inner.flows_policy_view 48000 7.16 MiB 27 +// 1 default flows 50000 13.09 MiB 49 +func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql.DB) { + // send 10000 records to clickhouse + commitNum := 10 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + sendTraffic(t, commitNum, connect) + }() + wg.Wait() + // retrieve metrics + stdout, err := getClickHouseDBInfo(t, data, getTableInfoCmd) + require.NoError(t, err) + resultArray := strings.Split(stdout, "\n") + assert := assert.New(t) + length := len(resultArray) + assert.GreaterOrEqualf(length, 2, "stdout: %s", stdout) + // check header component + assert.Containsf(stdout, "Shard", "stdout: %s", stdout) + assert.Containsf(stdout, "DatabaseName", "stdout: %s", stdout) + assert.Containsf(stdout, "TableName", "stdout: %s", stdout) + assert.Containsf(stdout, "TotalRows", "stdout: %s", stdout) + assert.Containsf(stdout, "TotalBytes", "stdout: %s", stdout) + assert.Containsf(stdout, "TotalCols", "stdout: %s", stdout) + // check four tables are in db + assert.Containsf(stdout, ".inner.flows_node_view", "stdout: %s", stdout) + assert.Containsf(stdout, ".inner.flows_pod_view", "stdout: %s", stdout) + assert.Containsf(stdout, ".inner.flows_policy_view", "stdout: %s", stdout) + assert.Containsf(stdout, "flows", "stdout: %s", stdout) + + flowNum := 0 + for i := 1; i < length; i++ { + // check metrics' value + tableInfoArray := strings.Fields(resultArray[i]) + tableName := tableInfoArray[2] + expectedColNum, ok := tableColumnNumberMap[tableName] + if !ok { + continue + } + assert.Equal(numFieldsInTableInfo, len(tableInfoArray), "tableInfoArray: %s", tableInfoArray) + assert.Equalf(dateBaseName, tableInfoArray[1], "tableInfoArray: %s", tableInfoArray) + assert.Equal(expectedColNum, tableInfoArray[6], "tableInfoArray: %s", tableInfoArray) + if tableName == "flows" || tableName == "flows_local" { + num, error := strconv.Atoi(tableInfoArray[3]) + assert.NoError(error) + flowNum += num + } + } + // sum of records in table flows in each shard should be the total number of records sent to db + assert.Equal(commitNum*recordPerCommit, flowNum) +} + +// Example output +// Shard RowsPerSecond BytesPerSecond +// 1 4763 1.48 MiB +func testTheiaGetClickHouseInsertRate(t *testing.T, data *TestData, connect *sql.DB) { + commitNum := 70 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + sendTraffic(t, commitNum, connect) + }() + // need to wait at least 1 min to get the insertion rate. + // insertion rate is the average ProfileEvent_InsertedRows in system.metric_log in current minute + time.Sleep(1 * time.Minute) + // retrieve metrics + stdout, err := getClickHouseDBInfo(t, data, getInsertRateCmd) + require.NoError(t, err) + resultArray := strings.Split(stdout, "\n") + assert := assert.New(t) + length := len(resultArray) + assert.GreaterOrEqualf(length, 2, "stdout: %s", stdout) + // check header component + assert.Containsf(stdout, "Shard", "stdout: %s", stdout) + assert.Containsf(stdout, "RowsPerSecond", "stdout: %s", stdout) + assert.Containsf(stdout, "BytesPerSecond", "stdout: %s", stdout) + + for i := 1; i < length; i++ { + // check metrics' value + tableInfoArray := strings.Fields(resultArray[i]) + assert.Equal(4, len(tableInfoArray), "tableInfoArray: %s", tableInfoArray) + actualInsertRate, error := strconv.Atoi(tableInfoArray[1]) + assert.NoError(error) + tableNum := len(tableColumnNumberMap) + percent := (actualInsertRate/tableNum - recordPerCommit/insertInterval) * 100 / (recordPerCommit / insertInterval) + assert.LessOrEqualf(percent, threshold, "stdout: %s, expectedInsertRate: %s", stdout, recordPerCommit/insertInterval) + } + wg.Wait() +} + +func getClickHouseDBInfo(t *testing.T, data *TestData, query string) (stdout string, err error) { + cmd := "chmod +x ./theia" + rc, stdout, stderr, err := data.RunCommandOnNode(controlPlaneNodeName(), cmd) + if err != nil || rc != 0 { + return "", fmt.Errorf("error when running %s from %s: %v\nstdout:%s\nstderr:%s", cmd, controlPlaneNodeName(), err, stdout, stderr) + } + rc, stdout, stderr, err = data.RunCommandOnNode(controlPlaneNodeName(), query) + + if err != nil || rc != 0 { + return "", fmt.Errorf("error when running %s from %s: %v\nstdout:%s\nstderr:%s", cmd, controlPlaneNodeName(), err, stdout, stderr) + } + return strings.TrimSuffix(stdout, "\n"), nil +} + +func getRandIP(t *testing.T) string { + return fmt.Sprintf("%d.%d.%d.%d", randInt(t, 256), randInt(t, 256), randInt(t, 256), randInt(t, 256)) +} + +func addFakeRecord(t *testing.T, stmt *sql.Stmt) { + _, err := stmt.Exec( + time.Now(), + time.Now(), + time.Now(), + time.Now(), + 0, + getRandIP(t), + getRandIP(t), + uint16(randInt(t, 65535)), + uint16(randInt(t, 65535)), + 6, + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + fmt.Sprintf("PodName-%d", randInt(t, MaxInt32)), + fmt.Sprintf("PodNameSpace-%d", randInt(t, MaxInt32)), + fmt.Sprintf("NodeName-%d", randInt(t, MaxInt32)), + fmt.Sprintf("PodName-%d", randInt(t, MaxInt32)), + fmt.Sprintf("PodNameSpace-%d", randInt(t, MaxInt32)), + fmt.Sprintf("NodeName-%d", randInt(t, MaxInt32)), + getRandIP(t), + uint16(randInt(t, 65535)), + fmt.Sprintf("ServicePortName-%d", randInt(t, MaxInt32)), + fmt.Sprintf("PolicyName-%d", randInt(t, MaxInt32)), + fmt.Sprintf("PolicyNameSpace-%d", randInt(t, MaxInt32)), + fmt.Sprintf("PolicyRuleName-%d", randInt(t, MaxInt32)), + 1, + 1, + fmt.Sprintf("PolicyName-%d", randInt(t, MaxInt32)), + fmt.Sprintf("PolicyNameSpace-%d", randInt(t, MaxInt32)), + fmt.Sprintf("PolicyRuleName-%d", randInt(t, MaxInt32)), + 1, + 1, + "tcpState", + 0, + fmt.Sprintf("PodLabels-%d", randInt(t, MaxInt32)), + fmt.Sprintf("PodLabels-%d", randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + uint64(randInt(t, MaxInt32)), + ) + require.NoError(t, err) +} + +func writeRecords(t *testing.T, connect *sql.DB, wg *sync.WaitGroup) { + defer wg.Done() + // Test ping DB + var err error + err = connect.Ping() + require.NoError(t, err) + // Test open Transaction + tx, err := connect.Begin() + require.NoError(t, err) + stmt, _ := tx.Prepare(insertQuery) + defer stmt.Close() + for j := 0; j < recordPerCommit; j++ { + addFakeRecord(t, stmt) + } + err = tx.Commit() + assert.NoError(t, err) +} + +func sendTraffic(t *testing.T, commitNum int, connect *sql.DB) { + var wg sync.WaitGroup + for i := 0; i < commitNum; i++ { + wg.Add(1) + go writeRecords(t, connect, &wg) + time.Sleep(time.Duration(insertInterval) * time.Second) + } + wg.Wait() +} + +func randInt(t *testing.T, limit int64) int64 { + assert := assert.New(t) + randNum, error := rand.Int(rand.Reader, big.NewInt(limit)) + assert.NoError(error) + return randNum.Int64() +}