Skip to content

Commit

Permalink
schedule: fix progress cannot display when enabling scheduling service (
Browse files Browse the repository at this point in the history
#8334)

close #8331

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Jun 28, 2024
1 parent e99b3ac commit d29fd1a
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 7 deletions.
7 changes: 7 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
CollectFactor = 0.9
)

// errRegionIsStale is error info for region is stale.
Expand Down Expand Up @@ -1583,6 +1584,12 @@ func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int {
return r.getNotFromStorageRegionsCntByStoreLocked(storeID)
}

// IsStorePrepared checks if a store is prepared.
// For each store, the number of active regions should be more than total region of the store * CollectFactor
func (r *RegionsInfo) IsStorePrepared(storeID uint64) bool {
return float64(r.GetNotFromStorageRegionsCntByStore(storeID)) >= float64(r.GetStoreRegionCount(storeID))*CollectFactor
}

// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int {
return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount()
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
const (
runSchedulerCheckInterval = 3 * time.Second
checkSuspectRangesInterval = 100 * time.Millisecond
collectFactor = 0.9
collectTimeout = 5 * time.Minute
maxLoadConfigRetries = 10
// pushOperatorTickInterval is the interval try to push the operator.
Expand Down
9 changes: 4 additions & 5 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
// The number of active regions should be more than total region of all stores * collectFactor
if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) {
// The number of active regions should be more than total region of all stores * core.CollectFactor
if float64(totalRegionsCnt)*core.CollectFactor > float64(notLoadedFromRegionsCnt) {
return false
}
for _, store := range c.GetStores() {
Expand All @@ -61,11 +61,10 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
}
storeID := store.GetID()
// It is used to avoid sudden scheduling when scheduling service is just started.
if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) {
if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) {
return false
}
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
if !c.IsStorePrepared(storeID) {
return false
}
}
Expand Down
15 changes: 14 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1590,6 +1590,19 @@ func (c *RaftCluster) setStore(store *core.StoreInfo) error {
return nil
}

func (c *RaftCluster) isStorePrepared() bool {
for _, store := range c.GetStores() {
if !store.IsPreparing() && !store.IsServing() {
continue
}
storeID := store.GetID()
if !c.IsStorePrepared(storeID) {
return false
}
}
return true
}

func (c *RaftCluster) checkStores() {
var offlineStores []*metapb.Store
var upStoreCount int
Expand Down Expand Up @@ -1621,7 +1634,7 @@ func (c *RaftCluster) checkStores() {
zap.Int("region-count", c.GetTotalRegionCount()),
errs.ZapError(err))
}
} else if c.IsPrepared() {
} else if c.IsPrepared() || (c.IsServiceIndependent(mcsutils.SchedulingServiceName) && c.isStorePrepared()) {
threshold := c.getThreshold(stores, store)
regionSize := float64(store.GetRegionSize())
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize))
Expand Down
69 changes: 69 additions & 0 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,3 +675,72 @@ func (suite *multipleServerTestSuite) TestReElectLeader() {
rc = suite.pdLeader.GetServer().GetRaftCluster()
rc.IsPrepared()
}

func (suite *serverTestSuite) TestOnlineProgress() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

rc := suite.pdLeader.GetServer().GetRaftCluster()
re.NotNil(rc)
s := &server.GrpcServer{Server: suite.pdLeader.GetServer()}
for i := uint64(1); i <= 3; i++ {
resp, err := s.PutStore(
context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()},
Store: &metapb.Store{
Id: i,
Address: fmt.Sprintf("mock://%d", i),
State: metapb.StoreState_Up,
Version: "7.0.0",
},
},
)
re.NoError(err)
re.Empty(resp.GetHeader().GetError())
}
regionLen := 1000
regions := tests.InitRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
}
time.Sleep(2 * time.Second)

// add a new store
resp, err := s.PutStore(
context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()},
Store: &metapb.Store{
Id: 4,
Address: fmt.Sprintf("mock://%d", 4),
State: metapb.StoreState_Up,
Version: "7.0.0",
},
},
)
re.NoError(err)
re.Empty(resp.GetHeader().GetError())

time.Sleep(2 * time.Second)
for i, r := range regions {
if i < 50 {
r.GetMeta().Peers[2].StoreId = 4
r.GetMeta().RegionEpoch.ConfVer = 2
r.GetMeta().RegionEpoch.Version = 2
err = rc.HandleRegionHeartbeat(r)
re.NoError(err)
}
}
time.Sleep(2 * time.Second)
action, progress, ls, cs, err := rc.GetProgressByID("4")
re.Equal("preparing", action)
re.NotEmpty(progress)
re.NotEmpty(cs)
re.NotEmpty(ls)
re.NoError(err)
suite.TearDownSuite()
suite.SetupSuite()
}
5 changes: 5 additions & 0 deletions tests/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ func InitRegions(regionLen int) []*core.RegionInfo {
{Id: allocator.alloc(), StoreId: uint64(3)},
},
}
if i == 0 {
r.StartKey = []byte{}
} else if i == regionLen-1 {
r.EndKey = []byte{}
}
region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat))
// Here is used to simulate the upgrade process.
if i < regionLen/2 {
Expand Down

0 comments on commit d29fd1a

Please sign in to comment.