From d29fd1a8b032986e84382596e680dfdc5dcef918 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 28 Jun 2024 16:16:27 +0800 Subject: [PATCH] schedule: fix progress cannot display when enabling scheduling service (#8334) close tikv/pd#8331 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/region.go | 7 ++ pkg/schedule/coordinator.go | 1 - pkg/schedule/prepare_checker.go | 9 ++- server/cluster/cluster.go | 15 +++- .../mcs/scheduling/server_test.go | 69 +++++++++++++++++++ tests/testutil.go | 5 ++ 6 files changed, 99 insertions(+), 7 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 851648b8fce..77eb016ccdf 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -44,6 +44,7 @@ import ( const ( randomRegionMaxRetry = 10 scanRegionLimit = 1000 + CollectFactor = 0.9 ) // errRegionIsStale is error info for region is stale. @@ -1583,6 +1584,12 @@ func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int { return r.getNotFromStorageRegionsCntByStoreLocked(storeID) } +// IsStorePrepared checks if a store is prepared. +// For each store, the number of active regions should be more than total region of the store * CollectFactor +func (r *RegionsInfo) IsStorePrepared(storeID uint64) bool { + return float64(r.GetNotFromStorageRegionsCntByStore(storeID)) >= float64(r.GetStoreRegionCount(storeID))*CollectFactor +} + // getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID. func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int { return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount() diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index fb22303f0b7..55913a13341 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -46,7 +46,6 @@ import ( const ( runSchedulerCheckInterval = 3 * time.Second checkSuspectRangesInterval = 100 * time.Millisecond - collectFactor = 0.9 collectTimeout = 5 * time.Minute maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index 126e3bba41d..df7074b9073 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -51,8 +51,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti } notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() totalRegionsCnt := c.GetTotalRegionCount() - // The number of active regions should be more than total region of all stores * collectFactor - if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) { + // The number of active regions should be more than total region of all stores * core.CollectFactor + if float64(totalRegionsCnt)*core.CollectFactor > float64(notLoadedFromRegionsCnt) { return false } for _, store := range c.GetStores() { @@ -61,11 +61,10 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti } storeID := store.GetID() // It is used to avoid sudden scheduling when scheduling service is just started. - if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { + if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { return false } - // For each store, the number of active regions should be more than total region of the store * collectFactor - if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { + if !c.IsStorePrepared(storeID) { return false } } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 05925eb1720..94713386e08 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1590,6 +1590,19 @@ func (c *RaftCluster) setStore(store *core.StoreInfo) error { return nil } +func (c *RaftCluster) isStorePrepared() bool { + for _, store := range c.GetStores() { + if !store.IsPreparing() && !store.IsServing() { + continue + } + storeID := store.GetID() + if !c.IsStorePrepared(storeID) { + return false + } + } + return true +} + func (c *RaftCluster) checkStores() { var offlineStores []*metapb.Store var upStoreCount int @@ -1621,7 +1634,7 @@ func (c *RaftCluster) checkStores() { zap.Int("region-count", c.GetTotalRegionCount()), errs.ZapError(err)) } - } else if c.IsPrepared() { + } else if c.IsPrepared() || (c.IsServiceIndependent(mcsutils.SchedulingServiceName) && c.isStorePrepared()) { threshold := c.getThreshold(stores, store) regionSize := float64(store.GetRegionSize()) log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize)) diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 82da47d18f3..0eda33130ce 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -675,3 +675,72 @@ func (suite *multipleServerTestSuite) TestReElectLeader() { rc = suite.pdLeader.GetServer().GetRaftCluster() rc.IsPrepared() } + +func (suite *serverTestSuite) TestOnlineProgress() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + rc := suite.pdLeader.GetServer().GetRaftCluster() + re.NotNil(rc) + s := &server.GrpcServer{Server: suite.pdLeader.GetServer()} + for i := uint64(1); i <= 3; i++ { + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + regionLen := 1000 + regions := tests.InitRegions(regionLen) + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + re.NoError(err) + } + time.Sleep(2 * time.Second) + + // add a new store + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: 4, + Address: fmt.Sprintf("mock://%d", 4), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + + time.Sleep(2 * time.Second) + for i, r := range regions { + if i < 50 { + r.GetMeta().Peers[2].StoreId = 4 + r.GetMeta().RegionEpoch.ConfVer = 2 + r.GetMeta().RegionEpoch.Version = 2 + err = rc.HandleRegionHeartbeat(r) + re.NoError(err) + } + } + time.Sleep(2 * time.Second) + action, progress, ls, cs, err := rc.GetProgressByID("4") + re.Equal("preparing", action) + re.NotEmpty(progress) + re.NotEmpty(cs) + re.NotEmpty(ls) + re.NoError(err) + suite.TearDownSuite() + suite.SetupSuite() +} diff --git a/tests/testutil.go b/tests/testutil.go index ea52bce310e..06e15fa2c8a 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -438,6 +438,11 @@ func InitRegions(regionLen int) []*core.RegionInfo { {Id: allocator.alloc(), StoreId: uint64(3)}, }, } + if i == 0 { + r.StartKey = []byte{} + } else if i == regionLen-1 { + r.EndKey = []byte{} + } region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat)) // Here is used to simulate the upgrade process. if i < regionLen/2 {