diff --git a/tests/cluster.go b/tests/cluster.go index 35d82c4933b..bfdf7acf80d 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" - tsoserver "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/swaggerserver" @@ -86,38 +85,6 @@ func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, err return createTestServer(ctx, cfg, []string{utils.APIServiceName}) } -// StartSingleTSOTestServer creates and starts a tso server with default config for testing. -func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tsoserver.Server, func(), error) { - cfg := tsoserver.NewConfig() - cfg.BackendEndpoints = backendEndpoints - cfg.ListenAddr = listenAddrs - cfg, err := tsoserver.GenerateConfig(cfg) - re.NoError(err) - // Setup the logger. - err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) - if err != nil { - return nil, nil, err - } - zapLogOnce.Do(func() { - log.ReplaceGlobals(cfg.Logger, cfg.LogProps) - }) - re.NoError(err) - return NewTSOTestServer(ctx, cfg) -} - -// NewTSOTestServer creates a tso server with given config for testing. -func NewTSOTestServer(ctx context.Context, cfg *tsoserver.Config) (*tsoserver.Server, testutil.CleanupFunc, error) { - s := tsoserver.CreateServer(ctx, cfg) - if err := s.Run(); err != nil { - return nil, nil, err - } - cleanup := func() { - s.Close() - os.RemoveAll(cfg.DataDir) - } - return s, cleanup, nil -} - func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) { err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) if err != nil { diff --git a/tests/integrations/mcs/discovery/register_test.go b/tests/integrations/mcs/discovery/register_test.go index 27a5a937e4c..2cdcccedddd 100644 --- a/tests/integrations/mcs/discovery/register_test.go +++ b/tests/integrations/mcs/discovery/register_test.go @@ -27,7 +27,6 @@ import ( "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tests/integrations/mcs" "go.uber.org/goleak" ) @@ -97,7 +96,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) { re.Equal(addr, returnedEntry.ServiceAddr) // test primary when only one server - expectedPrimary := mcs.WaitForPrimaryServing(suite.Require(), map[string]bs.Server{addr: s}) + expectedPrimary := tests.WaitForPrimaryServing(suite.Require(), map[string]bs.Server{addr: s}) primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName) re.True(exist) re.Equal(primary, expectedPrimary) @@ -131,7 +130,7 @@ func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName strin serverMap[s.GetAddr()] = s } - expectedPrimary := mcs.WaitForPrimaryServing(suite.Require(), serverMap) + expectedPrimary := tests.WaitForPrimaryServing(suite.Require(), serverMap) primary, exist = suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName) re.True(exist) re.Equal(expectedPrimary, primary) @@ -139,7 +138,7 @@ func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName strin serverMap[primary].Close() delete(serverMap, primary) - expectedPrimary = mcs.WaitForPrimaryServing(suite.Require(), serverMap) + expectedPrimary = tests.WaitForPrimaryServing(suite.Require(), serverMap) // test API server discovery client := suite.pdLeader.GetEtcdClient() endpoints, err := discovery.Discover(client, suite.clusterID, serviceName) @@ -156,9 +155,9 @@ func (suite *serverRegisterTestSuite) addServer(serviceName string) (bs.Server, re := suite.Require() switch serviceName { case utils.TSOServiceName: - return mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + return tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) case utils.ResourceManagerServiceName: - return mcs.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + return tests.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) default: return nil, nil } diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index dc33016eafb..59aabb260ae 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tests/integrations/mcs" ) const ( @@ -86,11 +85,11 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { // add three nodes. nodes := make(map[string]bs.Server) for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount+1; i++ { - s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s } - mcs.WaitForPrimaryServing(suite.Require(), nodes) + tests.WaitForPrimaryServing(suite.Require(), nodes) // create a keyspace group. kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ @@ -135,11 +134,11 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { func (suite *keyspaceGroupTestSuite) TestAllocReplica() { nodes := make(map[string]bs.Server) for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ { - s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s } - mcs.WaitForPrimaryServing(suite.Require(), nodes) + tests.WaitForPrimaryServing(suite.Require(), nodes) // miss replica. id := 1 @@ -188,10 +187,10 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() { suite.Equal(http.StatusBadRequest, code) // the keyspace group is exist, the new replica is more than the old replica. - s2, cleanup2 := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + s2, cleanup2 := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup2() nodes[s2.GetAddr()] = s2 - mcs.WaitForPrimaryServing(suite.Require(), nodes) + tests.WaitForPrimaryServing(suite.Require(), nodes) params = &handlers.AllocNodesForKeyspaceGroupParams{ Replica: utils.DefaultKeyspaceGroupReplicaCount + 1, } @@ -228,12 +227,12 @@ func (suite *keyspaceGroupTestSuite) TestSetNodes() { nodes := make(map[string]bs.Server) nodesList := []string{} for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ { - s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s nodesList = append(nodesList, s.GetAddr()) } - mcs.WaitForPrimaryServing(suite.Require(), nodes) + tests.WaitForPrimaryServing(suite.Require(), nodes) // the keyspace group is not exist. id := 1 @@ -288,11 +287,11 @@ func (suite *keyspaceGroupTestSuite) TestSetNodes() { func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() { nodes := make(map[string]bs.Server) for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ { - s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s } - mcs.WaitForPrimaryServing(suite.Require(), nodes) + tests.WaitForPrimaryServing(suite.Require(), nodes) // the default keyspace group is exist. var kg *endpoint.KeyspaceGroup diff --git a/tests/integrations/mcs/resourcemanager/server_test.go b/tests/integrations/mcs/resourcemanager/server_test.go index 191c88b70d8..309c511ee65 100644 --- a/tests/integrations/mcs/resourcemanager/server_test.go +++ b/tests/integrations/mcs/resourcemanager/server_test.go @@ -27,7 +27,6 @@ import ( "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tests/integrations/mcs" ) func TestResourceManagerServer(t *testing.T) { @@ -45,7 +44,7 @@ func TestResourceManagerServer(t *testing.T) { leaderName := cluster.WaitLeader() leader := cluster.GetServer(leaderName) - s, cleanup := mcs.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr(), tempurl.Alloc()) + s, cleanup := tests.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr(), tempurl.Alloc()) addr := s.GetAddr() defer cleanup() diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index dee5cba29dc..bbedd65209d 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -16,38 +16,14 @@ package mcs import ( "context" - "os" "sync" - "time" - "github.com/pingcap/log" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" - bs "github.com/tikv/pd/pkg/basicserver" - rm "github.com/tikv/pd/pkg/mcs/resourcemanager/server" - tso "github.com/tikv/pd/pkg/mcs/tso/server" - "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" ) -var once sync.Once - -// InitLogger initializes the logger for test. -func InitLogger(cfg *tso.Config) (err error) { - once.Do(func() { - // Setup the logger. - err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) - if err != nil { - return - } - log.ReplaceGlobals(cfg.Logger, cfg.LogProps) - // Flushing any buffered log entries. - log.Sync() - }) - return err -} - // SetupClientWithAPIContext creates a TSO client with api context name for test. func SetupClientWithAPIContext( ctx context.Context, re *require.Assertions, apiCtx pd.APIContext, endpoints []string, opts ...pd.ClientOption, @@ -67,76 +43,6 @@ func SetupClientWithKeyspaceID( return cli } -// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing. -func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) { - cfg := rm.NewConfig() - cfg.BackendEndpoints = backendEndpoints - cfg.ListenAddr = listenAddrs - cfg, err := rm.GenerateConfig(cfg) - re.NoError(err) - - s, cleanup, err := rm.NewTestServer(ctx, re, cfg) - re.NoError(err) - testutil.Eventually(re, func() bool { - return !s.IsClosed() - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - - return s, cleanup -} - -// StartSingleTSOTestServerWithoutCheck creates and starts a tso server with default config for testing. -func StartSingleTSOTestServerWithoutCheck(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func(), error) { - cfg := tso.NewConfig() - cfg.BackendEndpoints = backendEndpoints - cfg.ListenAddr = listenAddrs - cfg, err := tso.GenerateConfig(cfg) - re.NoError(err) - // Setup the logger. - err = InitLogger(cfg) - re.NoError(err) - return NewTSOTestServer(ctx, cfg) -} - -// StartSingleTSOTestServer creates and starts a tso server with default config for testing. -func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) { - s, cleanup, err := StartSingleTSOTestServerWithoutCheck(ctx, re, backendEndpoints, listenAddrs) - re.NoError(err) - testutil.Eventually(re, func() bool { - return !s.IsClosed() - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - - return s, cleanup -} - -// NewTSOTestServer creates a tso server with given config for testing. -func NewTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) { - s := tso.CreateServer(ctx, cfg) - if err := s.Run(); err != nil { - return nil, nil, err - } - cleanup := func() { - s.Close() - os.RemoveAll(cfg.DataDir) - } - return s, cleanup, nil -} - -// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader -func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Server) string { - var primary string - testutil.Eventually(re, func() bool { - for name, s := range serverMap { - if s.IsServing() { - primary = name - return true - } - } - return false - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - - return primary -} - // WaitForTSOServiceAvailable waits for the pd client being served by the tso server side func WaitForTSOServiceAvailable( ctx context.Context, re *require.Assertions, client pd.Client, diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index 67ef3bfadce..fde6bcb8da0 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -32,7 +32,6 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tests/integrations/mcs" ) const ( @@ -51,7 +50,7 @@ type tsoAPITestSuite struct { ctx context.Context cancel context.CancelFunc pdCluster *tests.TestCluster - tsoCluster *mcs.TestTSOCluster + tsoCluster *tests.TestTSOCluster } func TestTSOAPI(t *testing.T) { @@ -70,7 +69,7 @@ func (suite *tsoAPITestSuite) SetupTest() { leaderName := suite.pdCluster.WaitLeader() pdLeaderServer := suite.pdCluster.GetServer(leaderName) re.NoError(pdLeaderServer.BootstrapCluster()) - suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr()) + suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr()) re.NoError(err) } @@ -124,10 +123,10 @@ func TestTSOServerStartFirst(t *testing.T) { addr := apiCluster.GetConfig().GetClientURL() ch := make(chan struct{}) defer close(ch) - clusterCh := make(chan *mcs.TestTSOCluster) + clusterCh := make(chan *tests.TestTSOCluster) defer close(clusterCh) go func() { - tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, addr) + tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, addr) re.NoError(err) primary := tsoCluster.WaitForDefaultPrimaryServing(re) re.NotNil(primary) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 5a20211c7e9..b7c3873527e 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -54,7 +54,7 @@ type tsoKeyspaceGroupManagerTestSuite struct { // pdLeaderServer is the leader server of the PD cluster. pdLeaderServer *tests.TestServer // tsoCluster is the TSO service cluster. - tsoCluster *mcs.TestTSOCluster + tsoCluster *tests.TestTSOCluster } func TestTSOKeyspaceGroupManager(t *testing.T) { @@ -74,7 +74,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() suite.pdLeaderServer = suite.cluster.GetServer(leaderName) re.NoError(suite.pdLeaderServer.BootstrapCluster()) - suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr()) + suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr()) re.NoError(err) } @@ -515,7 +515,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { leaderServer := tc.GetServer(tc.GetLeader()) re.NoError(leaderServer.BootstrapCluster()) - tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr) + tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) re.NoError(err) defer tsoCluster.Destroy() tsoCluster.WaitForDefaultPrimaryServing(re) @@ -707,7 +707,7 @@ func TestGetTSOImmediately(t *testing.T) { leaderServer := tc.GetServer(tc.GetLeader()) re.NoError(leaderServer.BootstrapCluster()) - tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr) + tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) re.NoError(err) defer tsoCluster.Destroy() tsoCluster.WaitForDefaultPrimaryServing(re) diff --git a/tests/integrations/mcs/tso/proxy_test.go b/tests/integrations/mcs/tso/proxy_test.go index d873665c258..45646dfa48e 100644 --- a/tests/integrations/mcs/tso/proxy_test.go +++ b/tests/integrations/mcs/tso/proxy_test.go @@ -32,7 +32,6 @@ import ( "github.com/tikv/pd/client/tsoutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tests/integrations/mcs" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -44,7 +43,7 @@ type tsoProxyTestSuite struct { apiCluster *tests.TestCluster apiLeader *tests.TestServer backendEndpoints string - tsoCluster *mcs.TestTSOCluster + tsoCluster *tests.TestTSOCluster defaultReq *pdpb.TsoRequest streams []pdpb.PD_TsoClient cleanupFuncs []testutil.CleanupFunc @@ -70,7 +69,7 @@ func (s *tsoProxyTestSuite) SetupSuite() { s.NoError(s.apiLeader.BootstrapCluster()) // Create a TSO cluster with 2 servers - s.tsoCluster, err = mcs.NewTestTSOCluster(s.ctx, 2, s.backendEndpoints) + s.tsoCluster, err = tests.NewTestTSOCluster(s.ctx, 2, s.backendEndpoints) re.NoError(err) s.tsoCluster.WaitForDefaultPrimaryServing(re) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 6de703741ad..49eab333066 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -95,7 +95,7 @@ func (suite *tsoServerTestSuite) TestTSOServerStartAndStopNormally() { }() re := suite.Require() - s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) defer cleanup() testutil.Eventually(re, func() bool { @@ -136,10 +136,10 @@ func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() { re.NoError(err) // Setup the logger. - err = mcs.InitLogger(cfg) + err = tests.InitLogger(cfg) re.NoError(err) - s, cleanup, err := mcs.NewTSOTestServer(suite.ctx, cfg) + s, cleanup, err := tests.NewTSOTestServer(suite.ctx, cfg) re.NoError(err) defer cleanup() testutil.Eventually(re, func() bool { @@ -186,7 +186,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { re.Equal(1, getEtcdTimestampKeyNum(re, client)) } - _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) + _, cleanup := tests.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) defer cleanup() cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV2(""), []string{backendEndpoints}) @@ -236,7 +236,7 @@ func TestWaitAPIServiceReady(t *testing.T) { cluster, backendEndpoints := startCluster(false /*isAPIServiceMode*/) sctx, scancel := context.WithTimeout(ctx, time.Second*10) defer scancel() - s, _, err := mcs.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc()) + s, _, err := tests.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc()) re.Error(err) re.Nil(s) cluster.Destroy() @@ -245,7 +245,7 @@ func TestWaitAPIServiceReady(t *testing.T) { cluster, backendEndpoints = startCluster(true /*isAPIServiceMode*/) sctx, scancel = context.WithTimeout(ctx, time.Second*10) defer scancel() - s, cleanup, err := mcs.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc()) + s, cleanup, err := tests.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc()) re.NoError(err) defer cluster.Destroy() defer cleanup() @@ -318,7 +318,7 @@ func (suite *APIServerForwardTestSuite) TearDownTest() { func (suite *APIServerForwardTestSuite) TestForwardTSORelated() { // Unable to use the tso-related interface without tso server suite.checkUnavailableTSO() - tc, err := mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) suite.NoError(err) defer tc.Destroy() tc.WaitForDefaultPrimaryServing(suite.Require()) @@ -328,7 +328,7 @@ func (suite *APIServerForwardTestSuite) TestForwardTSORelated() { func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() { re := suite.Require() - tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForDefaultPrimaryServing(re) @@ -367,7 +367,7 @@ func (suite *APIServerForwardTestSuite) TestResignTSOPrimaryForward() { // TODO: test random kill primary with 3 nodes re := suite.Require() - tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForDefaultPrimaryServing(re) @@ -391,7 +391,7 @@ func (suite *APIServerForwardTestSuite) TestResignTSOPrimaryForward() { func (suite *APIServerForwardTestSuite) TestResignAPIPrimaryForward() { re := suite.Require() - tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForDefaultPrimaryServing(re) @@ -435,7 +435,7 @@ func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower3() { func (suite *APIServerForwardTestSuite) checkForwardTSOUnexpectedToFollower(checkTSO func()) { re := suite.Require() - tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) re.NoError(err) tc.WaitForDefaultPrimaryServing(re) @@ -516,7 +516,7 @@ type CommonTestSuite struct { ctx context.Context cancel context.CancelFunc cluster *tests.TestCluster - tsoCluster *mcs.TestTSOCluster + tsoCluster *tests.TestTSOCluster pdLeader *tests.TestServer // tsoDefaultPrimaryServer is the primary server of the default keyspace group tsoDefaultPrimaryServer *tso.Server @@ -542,7 +542,7 @@ func (suite *CommonTestSuite) SetupSuite() { suite.backendEndpoints = suite.pdLeader.GetAddr() suite.NoError(suite.pdLeader.BootstrapCluster()) - suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) + suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) suite.NoError(err) suite.tsoCluster.WaitForDefaultPrimaryServing(re) suite.tsoDefaultPrimaryServer = suite.tsoCluster.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 6727877a1c7..d7f1bd3b5d2 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -54,7 +54,7 @@ type tsoClientTestSuite struct { // pdLeaderServer is the leader server of the PD cluster. pdLeaderServer *tests.TestServer // The TSO service in microservice mode. - tsoCluster *mcs.TestTSOCluster + tsoCluster *tests.TestTSOCluster keyspaceGroups []struct { keyspaceGroupID uint32 @@ -108,7 +108,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.clients = make([]pd.Client, 0) suite.clients = append(suite.clients, client) } else { - suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints) + suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints) re.NoError(err) suite.keyspaceGroups = []struct { @@ -430,9 +430,9 @@ func TestMixedTSODeployment(t *testing.T) { err = apiSvr.Run() re.NoError(err) - s, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) + s, cleanup := tests.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) defer cleanup() - mcs.WaitForPrimaryServing(re, map[string]bs.Server{s.GetAddr(): s}) + tests.WaitForPrimaryServing(re, map[string]bs.Server{s.GetAddr(): s}) ctx1, cancel1 := context.WithCancel(context.Background()) var wg sync.WaitGroup @@ -473,7 +473,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { re.NoError(err) // Create a TSO cluster which has 2 servers - tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, backendEndpoints) + tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints) re.NoError(err) tsoCluster.WaitForDefaultPrimaryServing(re) // The TSO service should be eventually healthy @@ -486,7 +486,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { mcs.WaitForTSOServiceAvailable(ctx, re, pdClient) // Restart the TSO cluster - tsoCluster, err = mcs.RestartTestTSOCluster(ctx, tsoCluster) + tsoCluster, err = tests.RestartTestTSOCluster(ctx, tsoCluster) re.NoError(err) // The TSO service should be eventually healthy mcs.WaitForTSOServiceAvailable(ctx, re, pdClient) diff --git a/tests/integrations/tso/consistency_test.go b/tests/integrations/tso/consistency_test.go index 70000d2abd4..1d35e8bf5e2 100644 --- a/tests/integrations/tso/consistency_test.go +++ b/tests/integrations/tso/consistency_test.go @@ -30,7 +30,6 @@ import ( pd "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tests/integrations/mcs" "google.golang.org/grpc" ) @@ -85,7 +84,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() { if suite.legacy { suite.pdClient = pd.MustNewGrpcClient(re, backendEndpoints) } else { - suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) + suite.tsoServer, suite.tsoServerCleanup = tests.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr()) } } diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index 96fc5d334b0..518335442f4 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/utils/tempurl" pd "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tests/integrations/mcs" "google.golang.org/grpc" ) @@ -83,7 +82,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { if suite.legacy { suite.pdClient = pd.MustNewGrpcClient(re, backendEndpoints) } else { - suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) + suite.tsoServer, suite.tsoServerCleanup = tests.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr()) } } diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 2ec200b0493..40315e835f8 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/server/config" @@ -108,12 +107,9 @@ func TestSplitKeyspaceGroup(t *testing.T) { re.NoError(err) pdAddr := tc.GetConfig().GetClientURL() - _, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup1() - re.NoError(err) - _, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup2() + ttc, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) re.NoError(err) + defer ttc.Destroy() cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() @@ -204,12 +200,10 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { re.NoError(err) pdAddr := tc.GetConfig().GetClientURL() - s1, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup1() - re.NoError(err) - s2, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup2() + ttc, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) re.NoError(err) + defer ttc.Destroy() + tsoAddrs := ttc.GetAddrs() cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() @@ -219,7 +213,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { // set-node keyspace group. defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID) testutil.Eventually(re, func() bool { - args := []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, s1.GetAddr(), s2.GetAddr()} + args := []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, tsoAddrs[0], tsoAddrs[1]} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) return strings.Contains(string(output), "Success") @@ -228,7 +222,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { // set-priority keyspace group. checkPriority := func(p int) { testutil.Eventually(re, func() bool { - args := []string{"-u", pdAddr, "keyspace-group", "set-priority", defaultKeyspaceGroupID, s1.GetAddr()} + args := []string{"-u", pdAddr, "keyspace-group", "set-priority", defaultKeyspaceGroupID, tsoAddrs[0]} if p >= 0 { args = append(args, strconv.Itoa(p)) } else { @@ -249,8 +243,8 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { re.Equal(utils.DefaultKeyspaceGroupID, keyspaceGroup.ID) re.Len(keyspaceGroup.Members, 2) for _, member := range keyspaceGroup.Members { - re.Contains([]string{s1.GetAddr(), s2.GetAddr()}, member.Address) - if member.Address == s1.GetAddr() { + re.Contains(tsoAddrs, member.Address) + if member.Address == tsoAddrs[0] { re.Equal(p, member.Priority) } else { re.Equal(0, member.Priority) @@ -262,7 +256,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { checkPriority(-200) // params error for set-node. - args := []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, s1.GetAddr()} + args := []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, tsoAddrs[0]} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.Contains(string(output), "invalid num of nodes") @@ -270,7 +264,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.Contains(string(output), "Failed to parse the tso node address") - args = []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, s1.GetAddr(), "http://pingcap.com"} + args = []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, tsoAddrs[0], "http://pingcap.com"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.Contains(string(output), "node does not exist") @@ -284,7 +278,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.Contains(string(output), "node does not exist") - args = []string{"-u", pdAddr, "keyspace-group", "set-priority", defaultKeyspaceGroupID, s1.GetAddr(), "xxx"} + args = []string{"-u", pdAddr, "keyspace-group", "set-priority", defaultKeyspaceGroupID, tsoAddrs[0], "xxx"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.Contains(string(output), "Failed to parse the priority") @@ -309,12 +303,9 @@ func TestMergeKeyspaceGroup(t *testing.T) { re.NoError(err) pdAddr := tc.GetConfig().GetClientURL() - _, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup1() - re.NoError(err) - _, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup2() + ttc, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) re.NoError(err) + defer ttc.Destroy() cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() @@ -355,6 +346,59 @@ func TestMergeKeyspaceGroup(t *testing.T) { re.Len(keyspaceGroup.Keyspaces, 130) re.Nil(keyspaceGroup.MergeState) + // split keyspace group multiple times. + for i := 1; i <= 10; i++ { + splitTargetID := fmt.Sprintf("%d", i) + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "split", "0", splitTargetID, splitTargetID} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + args := []string{"-u", pdAddr, "keyspace-group", "finish-split", splitTargetID} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Success") + } + + // merge keyspace group with `all` flag. + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "merge", "0", "--all"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + args = []string{"-u", pdAddr, "keyspace-group", "finish-merge", "0"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Success") + args = []string{"-u", pdAddr, "keyspace-group", "0"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + err = json.Unmarshal(output, &keyspaceGroup) + re.NoError(err) + re.Len(keyspaceGroup.Keyspaces, 130) + re.Nil(keyspaceGroup.MergeState) + + // merge keyspace group with wrong args. + args = []string{"-u", pdAddr, "keyspace-group", "merge"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Must specify the source keyspace group ID(s) or the merge all flag") + args = []string{"-u", pdAddr, "keyspace-group", "merge", "0"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Must specify the source keyspace group ID(s) or the merge all flag") + args = []string{"-u", pdAddr, "keyspace-group", "merge", "0", "1", "--all"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Must specify the source keyspace group ID(s) or the merge all flag") + args = []string{"-u", pdAddr, "keyspace-group", "merge", "1", "--all"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Unable to merge all keyspace groups into a non-default keyspace group") + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } @@ -377,12 +421,9 @@ func TestKeyspaceGroupState(t *testing.T) { re.NoError(err) pdAddr := tc.GetConfig().GetClientURL() - _, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup1() - re.NoError(err) - _, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup2() + ttc, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) re.NoError(err) + defer ttc.Destroy() cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() @@ -470,12 +511,10 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { re.NoError(err) pdAddr := tc.GetConfig().GetClientURL() - s1, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup1() - re.NoError(err) - s2, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) - defer tsoServerCleanup2() + ttc, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) re.NoError(err) + defer ttc.Destroy() + tsoAddrs := ttc.GetAddrs() cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() @@ -496,7 +535,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { return len(keyspaceGroup.Members) == 2 }) for _, member := range keyspaceGroup.Members { - re.Contains([]string{s1.GetAddr(), s2.GetAddr()}, member.Address) + re.Contains(tsoAddrs, member.Address) } // get primary for keyspace group 0. @@ -506,7 +545,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { re.NoError(err) var resp handlers.GetKeyspaceGroupPrimaryResponse json.Unmarshal(output, &resp) - return s1.GetAddr() == resp.Primary || s2.GetAddr() == resp.Primary + return tsoAddrs[0] == resp.Primary || tsoAddrs[1] == resp.Primary }) // split keyspace group. @@ -528,7 +567,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { return len(keyspaceGroup.Members) == 2 }) for _, member := range keyspaceGroup.Members { - re.Contains([]string{s1.GetAddr(), s2.GetAddr()}, member.Address) + re.Contains(tsoAddrs, member.Address) } // get primary for keyspace group 1. @@ -538,7 +577,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { re.NoError(err) var resp handlers.GetKeyspaceGroupPrimaryResponse json.Unmarshal(output, &resp) - return s1.GetAddr() == resp.Primary || s2.GetAddr() == resp.Primary + return tsoAddrs[0] == resp.Primary || tsoAddrs[1] == resp.Primary }) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) diff --git a/tests/testutil.go b/tests/testutil.go new file mode 100644 index 00000000000..25ff86c274f --- /dev/null +++ b/tests/testutil.go @@ -0,0 +1,117 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tests + +import ( + "context" + "os" + "sync" + "time" + + "github.com/pingcap/log" + "github.com/stretchr/testify/require" + bs "github.com/tikv/pd/pkg/basicserver" + rm "github.com/tikv/pd/pkg/mcs/resourcemanager/server" + tso "github.com/tikv/pd/pkg/mcs/tso/server" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/testutil" +) + +var once sync.Once + +// InitLogger initializes the logger for test. +func InitLogger(cfg *tso.Config) (err error) { + once.Do(func() { + // Setup the logger. + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + if err != nil { + return + } + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + // Flushing any buffered log entries. + log.Sync() + }) + return err +} + +// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing. +func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) { + cfg := rm.NewConfig() + cfg.BackendEndpoints = backendEndpoints + cfg.ListenAddr = listenAddrs + cfg, err := rm.GenerateConfig(cfg) + re.NoError(err) + + s, cleanup, err := rm.NewTestServer(ctx, re, cfg) + re.NoError(err) + testutil.Eventually(re, func() bool { + return !s.IsClosed() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + return s, cleanup +} + +// StartSingleTSOTestServerWithoutCheck creates and starts a tso server with default config for testing. +func StartSingleTSOTestServerWithoutCheck(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func(), error) { + cfg := tso.NewConfig() + cfg.BackendEndpoints = backendEndpoints + cfg.ListenAddr = listenAddrs + cfg, err := tso.GenerateConfig(cfg) + re.NoError(err) + // Setup the logger. + err = InitLogger(cfg) + re.NoError(err) + return NewTSOTestServer(ctx, cfg) +} + +// StartSingleTSOTestServer creates and starts a tso server with default config for testing. +func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) { + s, cleanup, err := StartSingleTSOTestServerWithoutCheck(ctx, re, backendEndpoints, listenAddrs) + re.NoError(err) + testutil.Eventually(re, func() bool { + return !s.IsClosed() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + return s, cleanup +} + +// NewTSOTestServer creates a tso server with given config for testing. +func NewTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) { + s := tso.CreateServer(ctx, cfg) + if err := s.Run(); err != nil { + return nil, nil, err + } + cleanup := func() { + s.Close() + os.RemoveAll(cfg.DataDir) + } + return s, cleanup, nil +} + +// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader +func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Server) string { + var primary string + testutil.Eventually(re, func() bool { + for name, s := range serverMap { + if s.IsServing() { + primary = name + return true + } + } + return false + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + return primary +} diff --git a/tests/integrations/mcs/cluster.go b/tests/tso_cluster.go similarity index 96% rename from tests/integrations/mcs/cluster.go rename to tests/tso_cluster.go index 65a6bf293c3..dc23c24d779 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/tso_cluster.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package mcs +package tests import ( "context" @@ -208,3 +208,12 @@ func (tc *TestTSOCluster) GetKeyspaceGroupMember() (members []endpoint.KeyspaceG } return } + +// GetAddrs returns all TSO server addresses. +func (tc *TestTSOCluster) GetAddrs() []string { + addrs := make([]string, 0, len(tc.servers)) + for _, server := range tc.servers { + addrs = append(addrs, server.GetAddr()) + } + return addrs +} diff --git a/tools/pd-ctl/pdctl/command/keyspace_group_command.go b/tools/pd-ctl/pdctl/command/keyspace_group_command.go index 615251c31f3..6dea6d78dd6 100644 --- a/tools/pd-ctl/pdctl/command/keyspace_group_command.go +++ b/tools/pd-ctl/pdctl/command/keyspace_group_command.go @@ -23,6 +23,7 @@ import ( "strings" "github.com/spf13/cobra" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" ) @@ -81,6 +82,7 @@ func newMergeKeyspaceGroupCommand() *cobra.Command { Short: "merge the keyspace group with the given IDs into the target one", Run: mergeKeyspaceGroupCommandFunc, } + r.Flags().Bool("all", false, "merge all keyspace groups into the default one") return r } @@ -247,27 +249,52 @@ func finishSplitKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { } func mergeKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { - if len(args) < 2 { - cmd.Usage() - return - } - _, err := strconv.ParseUint(args[0], 10, 32) + var ( + targetGroupID uint32 + params = map[string]interface{}{} + argNum = len(args) + ) + mergeAll, err := cmd.Flags().GetBool("all") if err != nil { - cmd.Printf("Failed to parse the target keyspace group ID: %s\n", err) + cmd.Printf("Failed to get the merge all flag: %s\n", err) return } - groups := make([]uint32, 0, len(args)-1) - for _, arg := range args[1:] { - id, err := strconv.ParseUint(arg, 10, 32) + if argNum == 1 && mergeAll { + target, err := strconv.ParseUint(args[0], 10, 32) if err != nil { - cmd.Printf("Failed to parse the keyspace ID: %s\n", err) + cmd.Printf("Failed to parse the target keyspace group ID: %s\n", err) + return + } + targetGroupID = uint32(target) + if targetGroupID != mcsutils.DefaultKeyspaceGroupID { + cmd.Println("Unable to merge all keyspace groups into a non-default keyspace group") return } - groups = append(groups, uint32(id)) + params["merge-all-into-default"] = true + } else if argNum >= 2 && !mergeAll { + target, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the target keyspace group ID: %s\n", err) + return + } + targetGroupID = uint32(target) + groups := make([]uint32, 0, len(args)-1) + for _, arg := range args[1:] { + id, err := strconv.ParseUint(arg, 10, 32) + if err != nil { + cmd.Printf("Failed to parse the keyspace ID: %s\n", err) + return + } + groups = append(groups, uint32(id)) + } + params["merge-list"] = groups + } else { + cmd.Println("Must specify the source keyspace group ID(s) or the merge all flag") + cmd.Usage() + return } - postJSON(cmd, fmt.Sprintf("%s/%s/merge", keyspaceGroupsPrefix, args[0]), map[string]interface{}{ - "merge-list": groups, - }) + // TODO: implement the retry mechanism under merge all flag. + postJSON(cmd, fmt.Sprintf("%s/%d/merge", keyspaceGroupsPrefix, targetGroupID), params) } func finishMergeKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) {