diff --git a/go.mod b/go.mod index ca2239352..fd6be0263 100644 --- a/go.mod +++ b/go.mod @@ -53,10 +53,6 @@ require ( github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/json-iterator/go v1.1.11 // indirect - github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.1.0 // indirect - github.com/mailru/easyjson v0.7.0 // indirect - github.com/mattn/go-colorable v0.1.8 // indirect - github.com/mattn/go-isatty v0.0.12 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/moby/spdystream v0.2.0 // indirect @@ -68,6 +64,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect + github.com/rivo/uniseg v0.2.0 // indirect github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/streamrail/concurrent-map v0.0.0-20160823150647-8bf1e9bacbf6 // indirect diff --git a/go.sum b/go.sum index 2727569b0..764d82a26 100644 --- a/go.sum +++ b/go.sum @@ -330,7 +330,6 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e/go.mod h1:0AA//k/eakGydO4jKRoRL2j92ZKSzTgj9tclaCrvXHk= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -613,6 +612,7 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc= @@ -643,7 +643,6 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= -github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= diff --git a/pkg/theia/commands/clickhouse/command.go b/pkg/theia/commands/clickhouse.go similarity index 65% rename from pkg/theia/commands/clickhouse/command.go rename to pkg/theia/commands/clickhouse.go index fd96c0878..5b7a96d30 100644 --- a/pkg/theia/commands/clickhouse/command.go +++ b/pkg/theia/commands/clickhouse.go @@ -12,17 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package clickhouse +package commands import ( "fmt" "github.com/spf13/cobra" - - "antrea.io/theia/pkg/theia/commands/clickhouse/status" ) -var ClickHouseCmd = &cobra.Command{ +var clickHouseCmd = &cobra.Command{ Use: "clickhouse", Aliases: []string{"ch"}, Short: "Commands of Theia ClickHouse feature", @@ -32,5 +30,15 @@ var ClickHouseCmd = &cobra.Command{ } func init() { - ClickHouseCmd.AddCommand(status.Command) + rootCmd.AddCommand(clickHouseCmd) + clickHouseCmd.PersistentFlags().String( + "clickhouse-endpoint", + "", + "The ClickHouse service endpoint.") + clickHouseCmd.PersistentFlags().Bool( + "use-cluster-ip", + false, + `Enable this option will use ClusterIP instead of port forwarding when connecting to the ClickHouse Service. +It can only be used when running in cluster and when wait is enabled.`, + ) } diff --git a/pkg/theia/commands/clickhouse/status/command.go b/pkg/theia/commands/clickhouse_status.go similarity index 69% rename from pkg/theia/commands/clickhouse/status/command.go rename to pkg/theia/commands/clickhouse_status.go index 7d7b83d05..12118373c 100644 --- a/pkg/theia/commands/clickhouse/status/command.go +++ b/pkg/theia/commands/clickhouse_status.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package status +package commands import ( "database/sql" @@ -23,15 +23,14 @@ import ( "github.com/olekukonko/tablewriter" "github.com/spf13/cobra" - - "antrea.io/theia/pkg/theia/util" ) type chOptions struct { diskInfo bool tableInfo bool insertRate bool - formatTable bool + stackTraces bool + printTable bool } type diskInfo struct { @@ -58,29 +57,44 @@ type writeRowsPerSec struct { bytesPerSec string } +type stackTraces struct { + shard string + traceFunctions string + count string +} + const ( diskQuery = "SELECT shardNum() as shard, name as Name, path as Path, formatReadableSize(free_space) as Free," + - " formatReadableSize(total_space) as Total, TRUNCATE((1 - free_space/total_space) * 100, 2) as Used_Percentage FROM " + - "cluster('{cluster}', system.disks) ;" + " formatReadableSize(total_space) as Total, TRUNCATE((1 - free_space/total_space) * 100, 2) as " + + "Used_Percentage FROM cluster('{cluster}', system.disks) ;" tableInfoBasicQuery = "SELECT shard, DatabaseName, TableName, TotalRows, TotalBytes, TotalCols FROM (SELECT " + "shardNum() as shard, database AS DatabaseName, name AS TableName, total_rows AS TotalRows, " + "formatReadableSize(total_bytes) AS TotalBytes FROM cluster('{cluster}', system.tables) WHERE database = " + "'default') as t1 INNER JOIN(SELECT shardNum() as shard, table_catalog as DatabaseName, table_name as " + - "TableName, COUNT(*) as TotalCols FROM cluster('{cluster}', INFORMATION_SCHEMA.COLUMNS) WHERE table_catalog == " + - "'default' GROUP BY table_name, table_catalog, shard) as t2 ON t1.DatabaseName = t2.DatabaseName and " + + "TableName, COUNT(*) as TotalCols FROM cluster('{cluster}', INFORMATION_SCHEMA.COLUMNS) WHERE table_catalog " + + "== 'default' GROUP BY table_name, table_catalog, shard) as t2 ON t1.DatabaseName = t2.DatabaseName and " + "t1.TableName = t2.TableName and t1.shard = t2.shard" // average writing rate for all tables per second writePerSecQuery = "SELECT sd.shard, sd.Rows_per_second, sd.Bytes_per_second FROM (SELECT shardNum() as " + "shard, (intDiv(toUInt32(date_trunc('minute', toDateTime(event_time))), 2) * 2) * 1000 as t, " + - "TRUNCATE(avg(ProfileEvent_InsertedRows),0) as Rows_per_second, formatReadableSize(avg(ProfileEvent_InsertedBytes)) as Bytes_per_second, " + + "TRUNCATE(avg(ProfileEvent_InsertedRows),0) as Rows_per_second, " + + "formatReadableSize(avg(ProfileEvent_InsertedBytes)) as Bytes_per_second, " + "ROW_NUMBER() OVER(PARTITION BY shardNum() ORDER BY t DESC) rowNumber FROM cluster('{cluster}', " + "system.metric_log) GROUP BY t, shardNum() ORDER BY t DESC, shardNum()) sd WHERE sd.rowNumber=1" + stackTracesQuery = "SELECT shardNum() as shard, arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), " + + "trace), '\\n') AS trace_functions, count() FROM cluster('{cluster}', system.stack_trace) GROUP BY " + + "trace_functions, shard ORDER BY count() DESC SETTINGS allow_introspection_functions=1\n" ) var options *chOptions -// Command is the support bundle command implementation. -var Command *cobra.Command +var clickHouseStatusCmd = &cobra.Command{ + Use: "status", + Short: "Get diagnostic infos of ClickHouse database", + Example: example, + Args: cobra.NoArgs, + RunE: getClickHouseStatus, +} var example = strings.Trim(` theia clickhouse status --storage @@ -89,36 +103,24 @@ theia clickhouse status --storage --record-number --insertion-rate --print-table `, "\n") func init() { - Command = &cobra.Command{ - Use: "status", - Short: "Get diagnostic infos of ClickHouse database", - Example: example, - Args: cobra.NoArgs, - RunE: getClickHouseStatus, - } + clickHouseCmd.AddCommand(clickHouseStatusCmd) options = &chOptions{} - Command.Flags().BoolVar(&options.diskInfo, "diskInfo", false, "check storage") - Command.Flags().BoolVar(&options.tableInfo, "tableInfo", false, "check number of records") - Command.Flags().BoolVar(&options.insertRate, "insertion-rate", false, "check insertion-rate") - Command.Flags().BoolVar(&options.formatTable, "print-table", false, "output data in table format") - Command.Flags().String("clickhouse-endpoint", "", "The ClickHouse service endpoint.") - Command.Flags().Bool( - "use-cluster-ip", - false, - `Enable this option will use Service ClusterIP instead of port forwarding when connecting to the ClickHouse service. -It can only be used when running theia in cluster.`, - ) + clickHouseStatusCmd.Flags().BoolVar(&options.diskInfo, "diskInfo", false, "check storage") + clickHouseStatusCmd.Flags().BoolVar(&options.tableInfo, "tableInfo", false, "check number of records") + clickHouseStatusCmd.Flags().BoolVar(&options.insertRate, "insertionRate", false, "check insertion-rate") + clickHouseStatusCmd.Flags().BoolVar(&options.stackTraces, "stackTraces", false, "check stacktrace") + clickHouseStatusCmd.Flags().BoolVar(&options.printTable, "printTable", false, "output data in table format") } func getClickHouseStatus(cmd *cobra.Command, args []string) error { - if !options.diskInfo && !options.tableInfo && !options.insertRate { + if !options.diskInfo && !options.tableInfo && !options.insertRate && !options.stackTraces { return fmt.Errorf("no metric related flag is specified") } - kubeconfig, err := util.ResolveKubeConfig(cmd) + kubeconfig, err := ResolveKubeConfig(cmd) if err != nil { return err } - clientset, err := util.CreateK8sClient(kubeconfig) + clientset, err := CreateK8sClient(kubeconfig) if err != nil { return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err) } @@ -137,11 +139,11 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error { if err != nil { return err } - if err := util.CheckClickHousePod(clientset); err != nil { + if err := CheckClickHousePod(clientset); err != nil { return err } // Connect to ClickHouse and get the result - connect, pf, err := util.SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + connect, pf, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) if err != nil { return err } @@ -153,7 +155,7 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error { if err != nil { return err } - if options.formatTable { + if options.printTable { printTable(data) } else { for _, arr := range data { @@ -166,7 +168,7 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error { if err != nil { return err } - if options.formatTable { + if options.printTable { printTable(data) } else { for _, arr := range data { @@ -179,7 +181,20 @@ func getClickHouseStatus(cmd *cobra.Command, args []string) error { if err != nil { return err } - if options.formatTable { + if options.printTable { + printTable(data) + } else { + for _, arr := range data { + fmt.Println(arr) + } + } + } + if options.stackTraces { + data, err := getStackTracesFromClickHouse(connect) + if err != nil { + return err + } + if options.printTable { printTable(data) } else { for _, arr := range data { @@ -259,10 +274,32 @@ func getWritingRateFromClickHouse(connect *sql.DB) ([][]string, error) { return data, nil } +func getStackTracesFromClickHouse(connect *sql.DB) ([][]string, error) { + result, err := connect.Query(stackTracesQuery) + if err != nil { + return nil, fmt.Errorf("failed to get clickhouse stack trace: %v", err) + } + defer result.Close() + columnName, err := result.Columns() + if err != nil { + return nil, fmt.Errorf("failed to get the head of stack trace: %v", err) + } + var data [][]string + data = append(data, columnName) + for result.Next() { + res := stackTraces{} + result.Scan(&res.shard, &res.traceFunctions, &res.count) + data = append(data, []string{res.shard, res.traceFunctions, res.count}) + } + if len(data) <= 1 { + return nil, fmt.Errorf("no data is returned by database") + } + return data, nil +} + func printTable(data [][]string) { table := tablewriter.NewWriter(os.Stdout) - //table, _ := tablewriter.NewCSV(os.Stdout, "./test_info.csv", true) - //table.SetAlignment(tablewriter.ALIGN_LEFT) + table.SetRowLine(true) table.SetHeader(data[0]) for i := 1; i < len(data); i++ { table.Append(data[i]) diff --git a/pkg/theia/commands/policy_recommendation_retrieve.go b/pkg/theia/commands/policy_recommendation_retrieve.go index c231c5d3d..97970d513 100644 --- a/pkg/theia/commands/policy_recommendation_retrieve.go +++ b/pkg/theia/commands/policy_recommendation_retrieve.go @@ -23,8 +23,6 @@ import ( "github.com/google/uuid" "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" - - "antrea.io/theia/pkg/theia/util" ) // policyRecommendationRetrieveCmd represents the policy-recommendation retrieve command @@ -59,7 +57,7 @@ $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --us if err != nil { return fmt.Errorf("failed to decode input id %s into a UUID, err: %v", recoID, err) } - kubeconfig, err := util.ResolveKubeConfig(cmd) + kubeconfig, err := ResolveKubeConfig(cmd) if err != nil { return err } @@ -83,11 +81,11 @@ $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --us } // Verify Clickhouse is running - clientset, err := util.CreateK8sClient(kubeconfig) + clientset, err := CreateK8sClient(kubeconfig) if err != nil { return fmt.Errorf("couldn't create k8s client using given kubeconfig: %v", err) } - if err := util.CheckClickHousePod(clientset); err != nil { + if err := CheckClickHousePod(clientset); err != nil { return err } @@ -104,7 +102,7 @@ $ theia policy-recommendation retrieve e998433e-accb-4888-9fc8-06563f073e86 --us } func getPolicyRecommendationResult(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool, filePath string, recoID string) (recoResult string, err error) { - connect, portForward, err := setupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) + connect, portForward, err := SetupClickHouseConnection(clientset, kubeconfig, endpoint, useClusterIP) if portForward != nil { defer portForward.Stop() } diff --git a/pkg/theia/commands/policy_recommendation_retrieve_test.go b/pkg/theia/commands/policy_recommendation_retrieve_test.go index ee9c4026c..d1e1c3be9 100644 --- a/pkg/theia/commands/policy_recommendation_retrieve_test.go +++ b/pkg/theia/commands/policy_recommendation_retrieve_test.go @@ -24,7 +24,6 @@ import ( "k8s.io/client-go/kubernetes/fake" "antrea.io/theia/pkg/theia/commands/config" - "antrea.io/theia/pkg/theia/util" ) func TestGetClickHouseSecret(t *testing.T) { @@ -97,7 +96,7 @@ func TestGetClickHouseSecret(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - username, password, err := util.GetClickHouseSecret(tt.fakeClientset) + username, password, err := getClickHouseSecret(tt.fakeClientset) if tt.expectedErrorMsg != "" { assert.EqualErrorf(t, err, tt.expectedErrorMsg, "Error should be: %v, got: %v", tt.expectedErrorMsg, err) } diff --git a/pkg/theia/commands/policy_recommendation_run.go b/pkg/theia/commands/policy_recommendation_run.go index fd17bcf71..e845fc2ed 100644 --- a/pkg/theia/commands/policy_recommendation_run.go +++ b/pkg/theia/commands/policy_recommendation_run.go @@ -29,7 +29,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - "antrea.io/theia/pkg/theia/util" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" "antrea.io/theia/pkg/theia/commands/config" @@ -203,11 +202,11 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f } sparkResourceArgs.executorMemory = executorMemory - kubeconfig, err := util.ResolveKubeConfig(cmd) + kubeconfig, err := ResolveKubeConfig(cmd) if err != nil { return err } - clientset, err := util.CreateK8sClient(kubeconfig) + clientset, err := CreateK8sClient(kubeconfig) if err != nil { return fmt.Errorf("couldn't create k8s client using given kubeconfig, %v", err) } @@ -217,7 +216,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f return err } - err = util.PolicyRecoPreCheck(clientset) + err = policyRecoPreCheck(clientset) if err != nil { return err } @@ -237,9 +236,9 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f Type: "Python", SparkVersion: config.SparkVersion, Mode: "cluster", - Image: util.ConstStrToPointer(config.SparkImage), - ImagePullPolicy: util.ConstStrToPointer(config.SparkImagePullPolicy), - MainApplicationFile: util.ConstStrToPointer(config.SparkAppFile), + Image: ConstStrToPointer(config.SparkImage), + ImagePullPolicy: ConstStrToPointer(config.SparkImagePullPolicy), + MainApplicationFile: ConstStrToPointer(config.SparkAppFile), Arguments: recoJobArgs, Driver: sparkv1.DriverSpec{ CoreRequest: &driverCoreRequest, @@ -258,7 +257,7 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f Key: "password", }, }, - ServiceAccount: util.ConstStrToPointer(config.SparkServiceAccount), + ServiceAccount: ConstStrToPointer(config.SparkServiceAccount), }, }, Executor: sparkv1.ExecutorSpec{ @@ -336,7 +335,7 @@ Job is still running. Please check completion status for job via CLI later.`, re if err != nil { return err } - if err := util.CheckClickHousePod(clientset); err != nil { + if err := CheckClickHousePod(clientset); err != nil { return err } recoResult, err := getPolicyRecommendationResult(clientset, kubeconfig, endpoint, useClusterIP, filePath, recommendationID) diff --git a/pkg/theia/commands/policy_recommendation_status.go b/pkg/theia/commands/policy_recommendation_status.go index 37ce8e1f3..61b729e57 100644 --- a/pkg/theia/commands/policy_recommendation_status.go +++ b/pkg/theia/commands/policy_recommendation_status.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "antrea.io/theia/pkg/theia/commands/config" sparkv1 "antrea.io/theia/third_party/sparkoperator/v1beta2" ) @@ -83,7 +84,7 @@ $ theia policy-recommendation status e998433e-accb-4888-9fc8-06563f073e86 --use- return err } - err = PolicyRecoPreCheck(clientset) + err = policyRecoPreCheck(clientset) if err != nil { return err } @@ -148,7 +149,7 @@ func getSparkAppByRecommendationID(clientset kubernetes.Interface, id string) (s err = clientset.CoreV1().RESTClient(). Get(). AbsPath("/apis/sparkoperator.k8s.io/v1beta2"). - Namespace(flowVisibilityNS). + Namespace(config.FlowVisibilityNS). Resource("sparkapplications"). Name("pr-" + id). Do(context.TODO()). diff --git a/pkg/theia/commands/root.go b/pkg/theia/commands/root.go index 41d5ec31f..0a89d9b4c 100644 --- a/pkg/theia/commands/root.go +++ b/pkg/theia/commands/root.go @@ -20,8 +20,6 @@ import ( "github.com/spf13/cobra" "k8s.io/klog/v2" - - "antrea.io/theia/pkg/theia/commands/clickhouse" ) // rootCmd represents the base command when called without any subcommands @@ -61,5 +59,4 @@ func init() { "", "absolute path to the k8s config file, will use $KUBECONFIG if not specified", ) - rootCmd.AddCommand(clickhouse.ClickHouseCmd) } diff --git a/pkg/theia/util/utils.go b/pkg/theia/commands/utils.go similarity index 96% rename from pkg/theia/util/utils.go rename to pkg/theia/commands/utils.go index b670460a0..fb5711939 100644 --- a/pkg/theia/util/utils.go +++ b/pkg/theia/commands/utils.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package util +package commands import ( "context" @@ -46,7 +46,7 @@ func CreateK8sClient(kubeconfig string) (kubernetes.Interface, error) { return clientset, nil } -func PolicyRecoPreCheck(clientset kubernetes.Interface) error { +func policyRecoPreCheck(clientset kubernetes.Interface) error { err := CheckSparkOperatorPod(clientset) if err != nil { return err @@ -163,7 +163,7 @@ func ResolveKubeConfig(cmd *cobra.Command) (string, error) { } func getClickHouseSecret(clientset kubernetes.Interface) (username []byte, password []byte, err error) { - secret, err := clientset.CoreV1().Secrets(flowVisibilityNS).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) + secret, err := clientset.CoreV1().Secrets(config.FlowVisibilityNS).Get(context.TODO(), "clickhouse-secret", metav1.GetOptions{}) if err != nil { return username, password, fmt.Errorf("error %v when finding the ClickHouse secret, please check the deployment of ClickHouse", err) } @@ -209,7 +209,7 @@ func connectClickHouse(clientset kubernetes.Interface, url string) (*sql.DB, err return connect, nil } -func setupClickHouseConnection(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (connect *sql.DB, portForward *portforwarder.PortForwarder, err error) { +func SetupClickHouseConnection(clientset kubernetes.Interface, kubeconfig string, endpoint string, useClusterIP bool) (connect *sql.DB, portForward *portforwarder.PortForwarder, err error) { if endpoint == "" { service := "clickhouse-clickhouse" if useClusterIP { diff --git a/pkg/theia/commands/utils_test.go b/pkg/theia/commands/utils_test.go index fca944607..849546352 100644 --- a/pkg/theia/commands/utils_test.go +++ b/pkg/theia/commands/utils_test.go @@ -23,7 +23,6 @@ import ( "k8s.io/client-go/kubernetes/fake" "antrea.io/theia/pkg/theia/commands/config" - "antrea.io/theia/pkg/theia/util" ) func TestGetServiceAddr(t *testing.T) { @@ -65,7 +64,7 @@ func TestGetServiceAddr(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - ip, port, err := util.GetServiceAddr(tt.fakeClientset, tt.serviceName) + ip, port, err := GetServiceAddr(tt.fakeClientset, tt.serviceName) if tt.expectedErrorMsg != "" { assert.EqualErrorf(t, err, tt.expectedErrorMsg, "Error should be: %v, got: %v", tt.expectedErrorMsg, err) } @@ -135,7 +134,7 @@ func TestPolicyRecoPreCheck(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - err := util.PolicyRecoPreCheck(tt.fakeClientset) + err := policyRecoPreCheck(tt.fakeClientset) if tt.expectedErrorMsg != "" { assert.EqualErrorf(t, err, tt.expectedErrorMsg, "Error should be: %v, got: %v", tt.expectedErrorMsg, err) } else { diff --git a/test/e2e/flowvisibility_test.go b/test/e2e/flowvisibility_test.go index b8f71a615..976959db7 100644 --- a/test/e2e/flowvisibility_test.go +++ b/test/e2e/flowvisibility_test.go @@ -131,7 +131,7 @@ type testFlow struct { } func TestFlowVisibility(t *testing.T) { - data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, false) + data, v4Enabled, v6Enabled, err := setupTestForFlowVisibility(t, false, true) if err != nil { t.Fatalf("Error when setting up test: %v", err) } diff --git a/test/e2e/theia_get_test.go b/test/e2e/theia_clickhouse_test.go similarity index 91% rename from test/e2e/theia_get_test.go rename to test/e2e/theia_clickhouse_test.go index be187f64b..1a9adcf78 100644 --- a/test/e2e/theia_get_test.go +++ b/test/e2e/theia_clickhouse_test.go @@ -28,13 +28,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "antrea.io/theia/pkg/theia/util" + "antrea.io/theia/pkg/theia/commands" ) const ( getDiskInfoCmd = "./theia clickhouse status --diskInfo" getTableInfoCmd = "./theia clickhouse status --tableInfo" - getInsertRateCmd = "./theia clickhouse status --insertion-rate" + getInsertRateCmd = "./theia clickhouse status --insertionRate" insertQuery = `INSERT INTO flows ( flowStartSeconds, flowEndSeconds, @@ -96,42 +96,29 @@ const ( var targetTable = map[string]string{ ".inner.flows_node_view": "16", - ".inner.flows_pod_view": "17", + ".inner.flows_pod_view": "20", ".inner.flows_policy_view": "27", "flows": "49", } -var wg sync.WaitGroup - func TestTheiaGetCommand(t *testing.T) { - data, _, _, err := setupTestForTheia(t, false, false) + data, _, _, err := setupTestForFlowVisibility(t, false, false) if err != nil { t.Fatalf("Error when setting up test: %v", err) } defer func() { teardownTest(t, data) - teardownFlowAggregator(t, data, false) + teardownFlowVisibility(t, data, false) }() clientset := data.clientset - service := "clickhouse-clickhouse" - listenAddress := "localhost" - listenPort := 9000 - _, servicePort, err := util.GetServiceAddr(clientset, service) - require.NoError(t, err) - // Forward the ClickHouse service port kubeconfig, err := data.provider.GetKubeconfigPath() require.NoError(t, err) - pf, err := util.StartPortForward(kubeconfig, service, servicePort, listenAddress, listenPort) - require.NoError(t, err) - defer pf.Stop() - endpoint := fmt.Sprintf("tcp://%s:%d", listenAddress, listenPort) - username, password, err := util.GetClickHouseSecret(clientset) - require.NoError(t, err) - url := fmt.Sprintf("%s?debug=false&username=%s&password=%s", endpoint, username, password) - // Check connection - connect, err := util.ConnectClickHouse(url) + connect, pf, err := commands.SetupClickHouseConnection(clientset, kubeconfig, "", false) require.NoError(t, err) + if pf != nil { + defer pf.Stop() + } t.Run("testTheiaGetClickHouseDiskInfo", func(t *testing.T) { testTheiaGetClickHouseDiskInfo(t, data) @@ -178,8 +165,12 @@ func testTheiaGetClickHouseDiskInfo(t *testing.T, data *TestData) { func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql.DB) { // send 10000 records to clickhouse commitNum := 10 + var wg sync.WaitGroup wg.Add(1) - sendTraffic(t, commitNum, connect) + go func() { + defer wg.Done() + sendTraffic(t, commitNum, connect) + }() wg.Wait() // retrieve metrics stdout, err := getClickHouseDBInfo(t, data, getTableInfoCmd) @@ -226,8 +217,12 @@ func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql. func testTheiaGetClickHouseInsertRate(t *testing.T, data *TestData, connect *sql.DB) { commitNum := 70 + var wg sync.WaitGroup wg.Add(1) - go sendTraffic(t, commitNum, connect) + go func() { + defer wg.Done() + sendTraffic(t, commitNum, connect) + }() // need to wait at least 1 min to get the insertion rate. // insertion rate is the average ProfileEvent_InsertedRows in system.metric_log in current minute time.Sleep(1 * time.Minute) @@ -327,7 +322,7 @@ func addFakeRecord(t *testing.T, stmt *sql.Stmt) { require.NoError(t, err) } -func writeRecords(t *testing.T, connect *sql.DB) { +func writeRecords(t *testing.T, connect *sql.DB, wg *sync.WaitGroup) { defer wg.Done() // Test ping DB var err error @@ -346,12 +341,13 @@ func writeRecords(t *testing.T, connect *sql.DB) { } func sendTraffic(t *testing.T, commitNum int, connect *sql.DB) { - defer wg.Done() + var wg sync.WaitGroup for i := 0; i < commitNum; i++ { wg.Add(1) - go writeRecords(t, connect) + go writeRecords(t, connect, &wg) time.Sleep(time.Duration(insertInterval) * time.Second) } + wg.Wait() } func randInt(t *testing.T, limit int64) int64 {