diff --git a/tests/integrations/realcluster/cluster.go b/tests/integrations/realcluster/cluster.go index 2bcebcf9515..b62738f6bb1 100644 --- a/tests/integrations/realcluster/cluster.go +++ b/tests/integrations/realcluster/cluster.go @@ -23,6 +23,7 @@ import ( "syscall" "time" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -32,9 +33,47 @@ import ( type clusterSuite struct { suite.Suite - clusterCnt int - suiteName string - ms bool + suiteName string + mode string + cluster *cluster +} + +// SetupSuite will run before the tests in the suite are run. +func (s *clusterSuite) SetupSuite() { + re := s.Require() + + dataDir := s.dataDir() + matches, err := filepath.Glob(dataDir) + re.NoError(err) + + for _, match := range matches { + re.NoError(runCommand("rm", "-rf", match)) + } + + s.cluster = newCluster(re, s.tag(), dataDir, s.mode) + s.cluster.start() +} + +// TearDownSuite will run after all the tests in the suite have been run. +func (s *clusterSuite) TearDownSuite() { + // Even if the cluster deployment fails, we still need to destroy the cluster. + // If the cluster does not fail to deploy, the cluster will be destroyed in + // the cleanup function. And these code will not work. + s.cluster.stop() +} + +func (s *clusterSuite) tag() string { + if s.mode == "ms" { + return fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, time.Now().Unix()) + } + return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, time.Now().Unix()) +} + +func (s *clusterSuite) dataDir() string { + if s.mode == "ms" { + return filepath.Join(os.Getenv("HOME"), ".tiup", "data", fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, time.Now().Unix())) + } + return filepath.Join(os.Getenv("HOME"), ".tiup", "data", fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, time.Now().Unix())) } var ( @@ -52,48 +91,6 @@ const ( deployTimeout = 5 * time.Minute ) -// ProcessManager is used to manage the processes of the cluster. -type ProcessManager struct { - tag string - pids []int -} - -// NewProcessManager creates a new ProcessManager. -func NewProcessManager(tag string) *ProcessManager { - return &ProcessManager{tag: tag} -} - -// CollectPids will collect the pids of the processes. -func (pm *ProcessManager) CollectPids() error { - output, err := runCommandWithOutput(fmt.Sprintf("pgrep -f %s", pm.tag)) - if err != nil { - return fmt.Errorf("failed to collect pids: %v", err) - } - - for _, pidStr := range strings.Split(strings.TrimSpace(output), "\n") { - if pid, err := strconv.Atoi(pidStr); err == nil { - pm.pids = append(pm.pids, pid) - } - } - return nil -} - -// Cleanup will send SIGTERM to all the processes and wait for a while. -func (pm *ProcessManager) Cleanup() { - for _, pid := range pm.pids { - // First try SIGTERM - syscall.Kill(pid, syscall.SIGTERM) - } - - // Wait and force kill if necessary - time.Sleep(3 * time.Second) - for _, pid := range pm.pids { - if isProcessRunning(pid) { - syscall.Kill(pid, syscall.SIGKILL) - } - } -} - func isProcessRunning(pid int) bool { process, err := os.FindProcess(pid) if err != nil { @@ -103,68 +100,54 @@ func isProcessRunning(pid int) bool { return err == nil } -// SetupSuite will run before the tests in the suite are run. -func (s *clusterSuite) SetupSuite() { - re := s.Require() - - // Clean the data dir. It is the default data dir of TiUP. - dataDir := filepath.Join(os.Getenv("HOME"), ".tiup", "data", "pd_real_cluster_test_"+s.suiteName+"_*") - matches, err := filepath.Glob(dataDir) - re.NoError(err) - - for _, match := range matches { - re.NoError(runCommand("rm", "-rf", match)) - } - s.startCluster() +type cluster struct { + re *require.Assertions + tag string + datadir string + mode string + pids []int } -// TearDownSuite will run after all the tests in the suite have been run. -func (s *clusterSuite) TearDownSuite() { - // Even if the cluster deployment fails, we still need to destroy the cluster. - // If the cluster does not fail to deploy, the cluster will be destroyed in - // the cleanup function. And these code will not work. - s.clusterCnt++ - s.stopCluster() +func newCluster(re *require.Assertions, tag, datadir, mode string) *cluster { + return &cluster{re: re, datadir: datadir, tag: tag, mode: mode} } -func (s *clusterSuite) startCluster() { - log.Info("start to deploy a cluster", zap.Bool("ms", s.ms)) - s.deploy() - s.waitReady() - s.clusterCnt++ +func (c *cluster) start() { + log.Info("start to deploy a cluster", zap.String("mode", c.mode)) + c.deploy() + c.waitReady() } -func (s *clusterSuite) stopCluster() { - s.clusterCnt-- - tag := s.tag() +func (c *cluster) restart() { + log.Info("start to restart", zap.String("tag", c.tag)) + c.stop() + c.start() + log.Info("restart success") +} - pm := NewProcessManager(tag) - if err := pm.CollectPids(); err != nil { +func (c *cluster) stop() { + if err := c.collectPids(); err != nil { log.Warn("failed to collect pids", zap.Error(err)) return } - pm.Cleanup() - log.Info("cluster destroyed", zap.String("tag", tag)) -} - -func (s *clusterSuite) tag() string { - if s.ms { - return fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, s.clusterCnt) + for _, pid := range c.pids { + // First try SIGTERM + _ = syscall.Kill(pid, syscall.SIGTERM) } - return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, s.clusterCnt) -} -func (s *clusterSuite) restart() { - tag := s.tag() - log.Info("start to restart", zap.String("tag", tag)) - s.stopCluster() - s.startCluster() - log.Info("TiUP restart success") + // Wait and force kill if necessary + time.Sleep(3 * time.Second) + for _, pid := range c.pids { + if isProcessRunning(pid) { + _ = syscall.Kill(pid, syscall.SIGKILL) + } + } + log.Info("cluster destroyed", zap.String("tag", c.tag)) } -func (s *clusterSuite) deploy() { - re := s.Require() +func (c *cluster) deploy() { + re := c.re curPath, err := os.Getwd() re.NoError(err) re.NoError(os.Chdir("../../..")) @@ -192,10 +175,10 @@ func (s *clusterSuite) deploy() { fmt.Sprintf("--db %d", defaultTiDBCount), fmt.Sprintf("--pd %d", defaultPDCount), "--without-monitor", - fmt.Sprintf("--tag %s", s.tag()), + fmt.Sprintf("--tag %s", c.tag), } - if s.ms { + if c.mode == "ms" { playgroundOpts = append(playgroundOpts, "--pd.mode ms", "--tso 1", @@ -206,11 +189,11 @@ func (s *clusterSuite) deploy() { cmd := fmt.Sprintf(`%s playground nightly %s %s > %s 2>&1 & `, tiupBin, strings.Join(playgroundOpts, " "), - buildBinPathsOpts(s.ms), - filepath.Join(playgroundLogDir, s.tag()+".log"), + buildBinPathsOpts(c.mode == "ms"), + filepath.Join(playgroundLogDir, c.tag+".log"), ) - runCommand("sh", "-c", cmd) + re.NoError(runCommand("sh", "-c", cmd)) }() // Avoid to change the dir before execute `tiup playground`. @@ -218,51 +201,66 @@ func (s *clusterSuite) deploy() { re.NoError(os.Chdir(curPath)) } -func buildBinPathsOpts(ms bool) string { - opts := []string{ - "--pd.binpath ./bin/pd-server", - "--kv.binpath ./third_bin/tikv-server", - "--db.binpath ./third_bin/tidb-server", - "--tiflash.binpath ./third_bin/tiflash", +// collectPids will collect the pids of the processes. +func (c *cluster) collectPids() error { + output, err := runCommandWithOutput(fmt.Sprintf("pgrep -f %s", c.tag)) + if err != nil { + return fmt.Errorf("failed to collect pids: %v", err) } - if ms { - opts = append(opts, - "--tso.binpath ./bin/pd-server", - "--scheduling.binpath ./bin/pd-server", - ) + for _, pidStr := range strings.Split(strings.TrimSpace(output), "\n") { + if pid, err := strconv.Atoi(pidStr); err == nil { + c.pids = append(c.pids, pid) + } } - - return strings.Join(opts, " ") + return nil } -func (s *clusterSuite) waitReady() { - re := s.Require() +func (c *cluster) waitReady() { + re := c.re const ( interval = 5 maxTimes = 20 ) - log.Info("start to wait TiUP ready", zap.String("tag", s.tag())) + log.Info("start to wait TiUP ready", zap.String("tag", c.tag)) timeout := time.After(time.Duration(maxTimes*interval) * time.Second) ticker := time.NewTicker(time.Duration(interval) * time.Second) defer ticker.Stop() - for i := 0; i < maxTimes; i++ { + for i := range maxTimes { select { case <-timeout: - re.FailNowf("TiUP is not ready after timeout, tag: %s", s.tag()) + re.FailNowf("TiUP is not ready after timeout, tag: %s", c.tag) case <-ticker.C: - log.Info("check TiUP ready", zap.String("tag", s.tag())) - cmd := fmt.Sprintf(`%s playground display --tag %s`, tiupBin, s.tag()) + log.Info("check TiUP ready", zap.String("tag", c.tag)) + cmd := fmt.Sprintf(`%s playground display --tag %s`, tiupBin, c.tag) output, err := runCommandWithOutput(cmd) if err == nil { - log.Info("TiUP is ready", zap.String("tag", s.tag())) + log.Info("TiUP is ready", zap.String("tag", c.tag)) return } log.Info(output) log.Info("TiUP is not ready, will retry", zap.Int("retry times", i), - zap.String("tag", s.tag()), zap.Error(err)) + zap.String("tag", c.tag), zap.Error(err)) } } - re.FailNowf("TiUP is not ready after max retries, tag: %s", s.tag()) + re.FailNowf("TiUP is not ready after max retries, tag: %s", c.tag) +} + +func buildBinPathsOpts(ms bool) string { + opts := []string{ + "--pd.binpath ./bin/pd-server", + "--kv.binpath ./third_bin/tikv-server", + "--db.binpath ./third_bin/tidb-server", + "--tiflash.binpath ./third_bin/tiflash", + } + + if ms { + opts = append(opts, + "--tso.binpath ./bin/pd-server", + "--scheduling.binpath ./bin/pd-server", + ) + } + + return strings.Join(opts, " ") } diff --git a/tests/integrations/realcluster/cluster_id_test.go b/tests/integrations/realcluster/cluster_id_test.go index 33e892373cb..7044dd46fbe 100644 --- a/tests/integrations/realcluster/cluster_id_test.go +++ b/tests/integrations/realcluster/cluster_id_test.go @@ -40,13 +40,14 @@ func TestClusterID(t *testing.T) { } func (s *clusterIDSuite) TestClientClusterID() { - re := require.New(s.T()) + re := s.Require() ctx := context.Background() // deploy second cluster - s.startCluster() - defer s.stopCluster() + cluster2 := newCluster(re, s.tag(), s.dataDir(), s.mode) + cluster2.start() + defer cluster2.stop() - pdEndpoints := getPDEndpoints(s.T()) + pdEndpoints := getPDEndpoints(re) // Try to create a client with the mixed endpoints. _, err := pd.NewClientWithContext( ctx, caller.TestComponent, pdEndpoints, @@ -56,9 +57,9 @@ func (s *clusterIDSuite) TestClientClusterID() { re.Contains(err.Error(), "unmatched cluster id") } -func getPDEndpoints(t *testing.T) []string { +func getPDEndpoints(re *require.Assertions) []string { output, err := runCommandWithOutput("ps -ef | grep tikv-server | awk -F '--pd-endpoints=' '{print $2}' | awk '{print $1}'") - require.NoError(t, err) + re.NoError(err) var pdAddrs []string for _, addr := range strings.Split(strings.TrimSpace(output), "\n") { // length of addr is less than 5 means it must not be a valid address diff --git a/tests/integrations/realcluster/etcd_key_test.go b/tests/integrations/realcluster/etcd_key_test.go index 473c93d19c8..15478248bf7 100644 --- a/tests/integrations/realcluster/etcd_key_test.go +++ b/tests/integrations/realcluster/etcd_key_test.go @@ -20,7 +20,6 @@ import ( "strings" "testing" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/client/pkg/utils/testutil" @@ -42,7 +41,7 @@ func TestMSEtcdKey(t *testing.T) { suite.Run(t, &etcdKeySuite{ clusterSuite: clusterSuite{ suiteName: "etcd_key", - ms: true, + mode: "ms", }, }) } @@ -95,8 +94,9 @@ var ( ) func (s *etcdKeySuite) TestEtcdKey() { + re := s.Require() var keysBackup []string - if !s.ms { + if s.mode != "ms" { keysBackup = pdKeys pdKeys = slices.DeleteFunc(pdKeys, func(s string) bool { return slices.Contains(pdMSKeys, s) @@ -106,9 +106,9 @@ func (s *etcdKeySuite) TestEtcdKey() { }() } t := s.T() - endpoints := getPDEndpoints(t) + endpoints := getPDEndpoints(re) - testutil.Eventually(require.New(t), func() bool { + testutil.Eventually(re, func() bool { keys, err := getEtcdKey(endpoints[0], "/pd") if err != nil { return false @@ -116,8 +116,8 @@ func (s *etcdKeySuite) TestEtcdKey() { return checkEtcdKey(t, keys, pdKeys) }) - if s.ms { - testutil.Eventually(require.New(t), func() bool { + if s.mode == "ms" { + testutil.Eventually(re, func() bool { keys, err := getEtcdKey(endpoints[0], "/ms") if err != nil { return false @@ -136,7 +136,6 @@ func getEtcdKey(endpoints, prefix string) ([]string, error) { return nil, err } return strings.Split(strings.TrimSpace(output), "\n"), nil - } func checkEtcdKey(t *testing.T, keys, expectedKeys []string) bool { diff --git a/tests/integrations/realcluster/reboot_pd_test.go b/tests/integrations/realcluster/reboot_pd_test.go index ef85ea0bf99..018ee7f74a5 100644 --- a/tests/integrations/realcluster/reboot_pd_test.go +++ b/tests/integrations/realcluster/reboot_pd_test.go @@ -18,7 +18,6 @@ import ( "context" "testing" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/client/http" @@ -38,10 +37,10 @@ func TestRebootPD(t *testing.T) { // https://github.com/tikv/pd/issues/6467 func (s *rebootPDSuite) TestReloadLabel() { - re := require.New(s.T()) + re := s.Require() ctx := context.Background() - pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(re)) resp, err := pdHTTPCli.GetStores(ctx) re.NoError(err) re.NotEmpty(resp.Stores) @@ -76,7 +75,7 @@ func (s *rebootPDSuite) TestReloadLabel() { // Check the label is set checkLabelsAreEqual() // Restart to reload the label - s.restart() - pdHTTPCli = http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + s.cluster.restart() + pdHTTPCli = http.NewClient("pd-real-cluster-test", getPDEndpoints(re)) checkLabelsAreEqual() } diff --git a/tests/integrations/realcluster/scheduler_test.go b/tests/integrations/realcluster/scheduler_test.go index 4daafdeca27..0ef51f45df6 100644 --- a/tests/integrations/realcluster/scheduler_test.go +++ b/tests/integrations/realcluster/scheduler_test.go @@ -21,7 +21,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -48,11 +47,11 @@ func TestScheduler(t *testing.T) { // https://github.com/tikv/pd/issues/6988#issuecomment-1694924611 // https://github.com/tikv/pd/issues/6897 func (s *schedulerSuite) TestTransferLeader() { - re := require.New(s.T()) + re := s.Require() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(re)) resp, err := pdHTTPCli.GetLeader(ctx) re.NoError(err) oldLeader := resp.Name @@ -98,11 +97,11 @@ func (s *schedulerSuite) TestTransferLeader() { } func (s *schedulerSuite) TestRegionLabelDenyScheduler() { - re := require.New(s.T()) + re := s.Require() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(re)) regions, err := pdHTTPCli.GetRegions(ctx) re.NoError(err) re.NotEmpty(regions.Regions) @@ -207,11 +206,11 @@ func (s *schedulerSuite) TestRegionLabelDenyScheduler() { } func (s *schedulerSuite) TestGrantOrEvictLeaderTwice() { - re := require.New(s.T()) + re := s.Require() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(re)) regions, err := pdHTTPCli.GetRegions(ctx) re.NoError(err) re.NotEmpty(regions.Regions) diff --git a/tests/integrations/realcluster/ts_test.go b/tests/integrations/realcluster/ts_test.go index 4df0b9181c4..24b4ebf48cc 100644 --- a/tests/integrations/realcluster/ts_test.go +++ b/tests/integrations/realcluster/ts_test.go @@ -61,7 +61,7 @@ func TestMSTS(t *testing.T) { suite.Run(t, &tsSuite{ clusterSuite: clusterSuite{ suiteName: "ts", - ms: true, + mode: "ms", }, }) }