Skip to content

Commit

Permalink
refactor realcluster testing
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Feb 19, 2025
1 parent daa23f8 commit ec2de11
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 64 deletions.
6 changes: 3 additions & 3 deletions tests/integrations/realcluster/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ kill_cluster:

test:
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || (\
echo "follow is pd-0 log\n" ; \
echo "follow is pd-0 log" ; \
cat ~/.tiup/data/pd_real_cluster_test/pd-0/pd.log ; \
echo "follow is pd-1 log\n" ; \
echo "follow is pd-1 log" ; \
cat ~/.tiup/data/pd_real_cluster_test/pd-1/pd.log ; \
echo "follow is pd-2 log\n" ; \
echo "follow is pd-2 log" ; \
cat ~/.tiup/data/pd_real_cluster_test/pd-2/pd.log ; \
exit 1)

Expand Down
201 changes: 141 additions & 60 deletions tests/integrations/realcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"syscall"
"testing"
"time"

Expand All @@ -43,6 +45,68 @@ var (
tiupBin = os.Getenv("HOME") + "/.tiup/bin/tiup"
)

const (
defaultTiKVCount = 3
defaultTiDBCount = 1
defaultPDCount = 3
defaultTiFlashCount = 1
maxRetries = 3
retryInterval = 5 * time.Second
deployTimeout = 5 * time.Minute
)

// ProcessManager is used to manage the processes of the cluster.
type ProcessManager struct {
tag string
pids []int
}

// NewProcessManager creates a new ProcessManager.
func NewProcessManager(tag string) *ProcessManager {
return &ProcessManager{tag: tag}
}

// CollectPids will collect the pids of the processes.
func (pm *ProcessManager) CollectPids() error {
cmd := exec.Command("pgrep", "-f", pm.tag)
output, err := cmd.Output()
if err != nil {
return fmt.Errorf("failed to collect pids: %v", err)
}

for _, pidStr := range strings.Split(strings.TrimSpace(string(output)), "\n") {
if pid, err := strconv.Atoi(pidStr); err == nil {
pm.pids = append(pm.pids, pid)
}
}
return nil
}

// Cleanup will send SIGTERM to all the processes and wait for a while.
func (pm *ProcessManager) Cleanup() {
for _, pid := range pm.pids {
// First try SIGTERM
syscall.Kill(pid, syscall.SIGTERM)
}

// Wait and force kill if necessary
time.Sleep(3 * time.Second)
for _, pid := range pm.pids {
if isProcessRunning(pid) {
syscall.Kill(pid, syscall.SIGKILL)
}
}
}

func isProcessRunning(pid int) bool {
process, err := os.FindProcess(pid)
if err != nil {
return false
}
err = process.Signal(syscall.Signal(0))
return err == nil
}

// SetupSuite will run before the tests in the suite are run.
func (s *clusterSuite) SetupSuite() {
t := s.T()
Expand All @@ -57,7 +121,7 @@ func (s *clusterSuite) SetupSuite() {
}
s.startCluster(t)
t.Cleanup(func() {
s.stopCluster(t)
s.stopCluster()
})
}

Expand All @@ -67,24 +131,28 @@ func (s *clusterSuite) TearDownSuite() {
// If the cluster does not fail to deploy, the cluster will be destroyed in
// the cleanup function. And these code will not work.
s.clusterCnt++
s.stopCluster(s.T())
s.stopCluster()
}

func (s *clusterSuite) startCluster(t *testing.T) {
log.Info("start to deploy a cluster", zap.Bool("ms", s.ms))

tag := s.tag()
deployTiupPlayground(t, tag, s.ms)
waitTiupReady(t, tag)
s.deployTiupPlayground(t)
require.NoError(t, s.waitTiupReady())
s.clusterCnt++
}

func (s *clusterSuite) stopCluster(t *testing.T) {
func (s *clusterSuite) stopCluster() {
s.clusterCnt--
tag := s.tag()

log.Info("start to destroy a cluster", zap.String("tag", s.tag()), zap.Bool("ms", s.ms))
destroy(t, s.tag())
time.Sleep(5 * time.Second)
pm := NewProcessManager(tag)
if err := pm.CollectPids(); err != nil {
log.Warn("failed to collect pids", zap.Error(err))
return
}

pm.Cleanup()
log.Info("cluster destroyed", zap.String("tag", tag))
}

func (s *clusterSuite) tag() string {
Expand All @@ -97,26 +165,12 @@ func (s *clusterSuite) tag() string {
func (s *clusterSuite) restart() {
tag := s.tag()
log.Info("start to restart", zap.String("tag", tag))
s.stopCluster(s.T())
s.stopCluster()
s.startCluster(s.T())
log.Info("TiUP restart success")
}

func destroy(t *testing.T, tag string) {
cmdStr := fmt.Sprintf("ps -ef | grep %s | awk '{print $2}'", tag)
cmd := exec.Command("sh", "-c", cmdStr)
bytes, err := cmd.Output()
require.NoError(t, err)
pids := string(bytes)
pidArr := strings.Split(pids, "\n")
for _, pid := range pidArr {
// nolint:errcheck
runCommand("sh", "-c", "kill -9 "+pid)
}
log.Info("destroy success", zap.String("tag", tag))
}

func deployTiupPlayground(t *testing.T, tag string, ms bool) {
func (s *clusterSuite) deployTiupPlayground(t *testing.T) {
curPath, err := os.Getwd()
require.NoError(t, err)
require.NoError(t, os.Chdir("../../.."))
Expand All @@ -138,52 +192,79 @@ func deployTiupPlayground(t *testing.T, tag string, ms bool) {

// nolint:errcheck
go func() {
if ms {
runCommand("sh", "-c",
tiupBin+` playground nightly --pd.mode ms --kv 3 --tiflash 1 --db 1 --pd 3 --tso 1 --scheduling 1 \
--without-monitor --tag `+tag+` \
--pd.binpath ./bin/pd-server \
--kv.binpath ./third_bin/tikv-server \
--db.binpath ./third_bin/tidb-server \
--tiflash.binpath ./third_bin/tiflash \
--tso.binpath ./bin/pd-server \
--scheduling.binpath ./bin/pd-server \
--pd.config ./tests/integrations/realcluster/pd.toml \
> `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `)
} else {
runCommand("sh", "-c",
tiupBin+` playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 \
--without-monitor --tag `+tag+` \
--pd.binpath ./bin/pd-server \
--kv.binpath ./third_bin/tikv-server \
--db.binpath ./third_bin/tidb-server \
--tiflash.binpath ./third_bin/tiflash \
--pd.config ./tests/integrations/realcluster/pd.toml \
> `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `)
playgroundOpts := []string{
fmt.Sprintf("--kv %d", defaultTiKVCount),
fmt.Sprintf("--tiflash %d", defaultTiFlashCount),
fmt.Sprintf("--db %d", defaultTiDBCount),
fmt.Sprintf("--pd %d", defaultPDCount),
"--without-monitor",
fmt.Sprintf("--tag %s", s.tag()),
}

if s.ms {
playgroundOpts = append(playgroundOpts,
"--pd.mode ms",
"--tso 1",
"--scheduling 1",
)
}

cmd := fmt.Sprintf(`%s playground nightly %s %s > %s 2>&1 & `,
tiupBin,
strings.Join(playgroundOpts, " "),
buildBinPathsOpts(s.ms),
filepath.Join(playgroundLogDir, s.tag()+".log"),
)

runCommand("sh", "-c", cmd)
}()

// Avoid to change the dir before execute `tiup playground`.
time.Sleep(10 * time.Second)
require.NoError(t, os.Chdir(curPath))
}

func waitTiupReady(t *testing.T, tag string) {
func buildBinPathsOpts(ms bool) string {
opts := []string{
"--pd.binpath ./bin/pd-server",
"--kv.binpath ./third_bin/tikv-server",
"--db.binpath ./third_bin/tidb-server",
"--tiflash.binpath ./third_bin/tiflash",
}

if ms {
opts = append(opts,
"--tso.binpath ./bin/pd-server",
"--scheduling.binpath ./bin/pd-server",
)
}

return strings.Join(opts, " ")
}

func (s *clusterSuite) waitTiupReady() error {
const (
interval = 5
maxTimes = 20
)
log.Info("start to wait TiUP ready", zap.String("tag", tag))
for i := range maxTimes {
err := runCommand(tiupBin, "playground", "display", "--tag", tag)
if err == nil {
log.Info("TiUP is ready", zap.String("tag", tag))
return
}
log.Info("start to wait TiUP ready", zap.String("tag", s.tag()))
timeout := time.After(time.Duration(maxTimes*interval) * time.Second)
ticker := time.NewTicker(time.Duration(interval) * time.Second)
defer ticker.Stop()

log.Info("TiUP is not ready, will retry", zap.Int("retry times", i),
zap.String("tag", tag), zap.Error(err))
time.Sleep(time.Duration(interval) * time.Second)
for i := 0; i < maxTimes; i++ {
select {
case <-timeout:
return fmt.Errorf("TiUP is not ready after timeout, tag: %s", s.tag())
case <-ticker.C:
err := runCommand(tiupBin, "playground", "display", "--tag", s.tag())
if err == nil {
log.Info("TiUP is ready", zap.String("tag", s.tag()))
return nil
}
log.Info("TiUP is not ready, will retry", zap.Int("retry times", i),
zap.String("tag", s.tag()), zap.Error(err))
}
}
require.FailNowf(t, "TiUP is not ready after retry: %s", tag)
return fmt.Errorf("TiUP is not ready after max retries, tag: %s", s.tag())
}
2 changes: 1 addition & 1 deletion tests/integrations/realcluster/cluster_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *clusterIDSuite) TestClientClusterID() {
ctx := context.Background()
// deploy second cluster
s.startCluster(s.T())
defer s.stopCluster(s.T())
defer s.stopCluster()

pdEndpoints := getPDEndpoints(s.T())
// Try to create a client with the mixed endpoints.
Expand Down

0 comments on commit ec2de11

Please sign in to comment.