From ccbbef161d090fce9f2d6473732f00305b30754c Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 25 Jun 2024 20:45:19 +0800 Subject: [PATCH] remove schedulePeerPr Signed-off-by: lhy1024 --- pkg/schedule/schedulers/grant_hot_region.go | 10 +-- pkg/schedule/schedulers/hot_region.go | 76 ++++++++++--------- pkg/schedule/schedulers/hot_region_test.go | 47 +++++++----- pkg/schedule/schedulers/hot_region_v2_test.go | 6 +- pkg/schedule/schedulers/scheduler_test.go | 12 +-- pkg/schedule/schedulers/shuffle_hot_region.go | 6 +- 6 files changed, 86 insertions(+), 71 deletions(-) diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 56ed7cd730e..905462fb55a 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -258,13 +258,13 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle func (s *grantHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { grantHotRegionCounter.Inc() - rw := s.randomRWType() - s.prepareForBalance(rw, cluster) - return s.dispatch(rw, cluster), nil + typ := s.randomType() + s.prepareForBalance(typ, cluster) + return s.dispatch(typ, cluster), nil } -func (s *grantHotRegionScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator { - stLoadInfos := s.stLoadInfos[buildResourceType(typ, constant.RegionKind)] +func (s *grantHotRegionScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator { + stLoadInfos := s.stLoadInfos[typ] infos := make([]*statistics.StoreLoadDetail, len(stLoadInfos)) index := 0 for _, info := range stLoadInfos { diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 284cf07032c..9189741d01d 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -53,15 +53,12 @@ const ( splitProgressiveRank = int64(-5) minHotScheduleInterval = time.Second maxHotScheduleInterval = 20 * time.Second - defaultSchedulePeerPr = 0.66 defaultPendingAmpFactor = 2.0 defaultStddevThreshold = 0.1 defaultTopnPosition = 10 ) var ( - // schedulePeerPr the probability of schedule the hot peer. - schedulePeerPr = defaultSchedulePeerPr // pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together pendingAmpFactor = defaultPendingAmpFactor // If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension, @@ -125,7 +122,7 @@ type baseHotScheduler struct { // this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't // be selected if its owner region is tracked in this attribute. regionPendings map[uint64]*pendingInfluence - types []utils.RWType + types []resourceType r *rand.Rand updateReadTime time.Time updateWriteTime time.Time @@ -135,12 +132,12 @@ func newBaseHotScheduler(opController *operator.Controller, sampleDuration time. base := NewBaseScheduler(opController) ret := &baseHotScheduler{ BaseScheduler: base, - types: []utils.RWType{utils.Write, utils.Read}, regionPendings: make(map[uint64]*pendingInfluence), stHistoryLoads: statistics.NewStoreHistoryLoads(utils.DimLen, sampleDuration, sampleInterval), r: rand.New(rand.NewSource(time.Now().UnixNano())), } for ty := resourceType(0); ty < resourceTypeLen; ty++ { + ret.types = append(ret.types, ty) ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{} } return ret @@ -148,13 +145,13 @@ func newBaseHotScheduler(opController *operator.Controller, sampleDuration time. // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store, only update read or write load detail -func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.SchedulerCluster) { +func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.SchedulerCluster) { storeInfos := statistics.SummaryStoreInfos(cluster.GetStores()) h.summaryPendingInfluence(storeInfos) storesLoads := cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetSchedulerConfig().IsTraceRegionFlow() - prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) { + prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, rw utils.RWType, resource constant.ResourceKind) { ty := buildResourceType(rw, resource) h.stLoadInfos[ty] = statistics.SummaryStoresLoad( storeInfos, @@ -164,23 +161,25 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched isTraceRegionFlow, rw, resource) } - switch rw { - case utils.Read: + switch typ { + case readLeader, readPeer: // update read statistics if time.Since(h.updateReadTime) >= statisticsInterval { regionRead := cluster.RegionReadStats() - prepare(regionRead, constant.LeaderKind) - prepare(regionRead, constant.RegionKind) + prepare(regionRead, utils.Read, constant.LeaderKind) + prepare(regionRead, utils.Read, constant.RegionKind) h.updateReadTime = time.Now() } - case utils.Write: + case writeLeader, writePeer: // update write statistics if time.Since(h.updateWriteTime) >= statisticsInterval { regionWrite := cluster.RegionWriteStats() - prepare(regionWrite, constant.LeaderKind) - prepare(regionWrite, constant.RegionKind) + prepare(regionWrite, utils.Write, constant.LeaderKind) + prepare(regionWrite, utils.Write, constant.RegionKind) h.updateWriteTime = time.Now() } + default: + log.Error("invalid resource type", zap.String("type", typ.String())) } } @@ -227,7 +226,7 @@ func setHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) { HotPendingSum.WithLabelValues(storeLabel, rwTy, dim).Set(load) } -func (h *baseHotScheduler) randomRWType() utils.RWType { +func (h *baseHotScheduler) randomType() resourceType { return h.types[h.r.Int()%len(h.types)] } @@ -328,24 +327,32 @@ func (h *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { hotSchedulerCounter.Inc() - rw := h.randomRWType() - return h.dispatch(rw, cluster), nil + typ := h.randomType() + return h.dispatch(typ, cluster), nil } -func (h *hotScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator { +func (h *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator { h.Lock() defer h.Unlock() h.updateHistoryLoadConfig(h.conf.GetHistorySampleDuration(), h.conf.GetHistorySampleInterval()) h.prepareForBalance(typ, cluster) - // it can not move earlier to support to use api and metrics. - if h.conf.IsForbidRWType(typ) { - return nil - } + // IsForbidRWType can not be move earlier to support to use api and metrics. switch typ { - case utils.Read: + case readLeader, readPeer: + if h.conf.IsForbidRWType(utils.Read) { + return nil + } return h.balanceHotReadRegions(cluster) - case utils.Write: - return h.balanceHotWriteRegions(cluster) + case writePeer: + if h.conf.IsForbidRWType(utils.Write) { + return nil + } + return h.balanceHotWritePeers(cluster) + case writeLeader: + if h.conf.IsForbidRWType(utils.Write) { + return nil + } + return h.balanceHotWriteLeaders(cluster) } return nil } @@ -410,19 +417,16 @@ func (h *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*o return nil } -func (h *hotScheduler) balanceHotWriteRegions(cluster sche.SchedulerCluster) []*operator.Operator { - // prefer to balance by peer - s := h.r.Intn(100) - switch { - case s < int(schedulePeerPr*100): - peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer) - ops := peerSolver.solve() - if len(ops) > 0 && peerSolver.tryAddPendingInfluence() { - return ops - } - default: +func (h *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*operator.Operator { + peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer) + ops := peerSolver.solve() + if len(ops) > 0 && peerSolver.tryAddPendingInfluence() { + return ops } + return nil +} +func (h *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*operator.Operator { leaderSolver := newBalanceSolver(h, cluster, utils.Write, transferLeader) ops := leaderSolver.solve() if len(ops) > 0 && leaderSolver.tryAddPendingInfluence() { diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 641f40b94f8..df20cd0180e 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -56,14 +56,14 @@ func init() { func newHotReadScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler { ret := newHotScheduler(opController, conf) ret.name = "" - ret.types = []utils.RWType{utils.Read} + ret.types = []resourceType{readLeader, readPeer} return ret } func newHotWriteScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler { ret := newHotScheduler(opController, conf) ret.name = "" - ret.types = []utils.RWType{utils.Write} + ret.types = []resourceType{writeLeader, writePeer} return ret } @@ -201,7 +201,6 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { func TestSplitIfRegionTooHot(t *testing.T) { re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) @@ -260,6 +259,7 @@ func TestSplitIfRegionTooHot(t *testing.T) { {1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0}, }) hb, _ = CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb.(*hotScheduler).types = []resourceType{writePeer} ops, _ = hb.Schedule(tc, false) re.Len(ops, 1) expectOp, _ = operator.CreateSplitRegionOperator(splitHotReadBuckets, tc.GetRegion(1), operator.OpSplit, @@ -391,7 +391,6 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) { // This test is used to test move leader and move peer. - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetEnableUseJointConsensus(true) @@ -401,6 +400,7 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetHistorySampleDuration(0) tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"}) @@ -740,8 +740,10 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { for i := 0; i < 20; i++ { clearPendingInfluence(hb) ops, _ := hb.Schedule(tc, false) - op := ops[0] - operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) + if len(ops) == 0 { + continue + } + operatorutil.CheckTransferLeaderFrom(re, ops[0], operator.OpHotRegion, 1) } // | store_id | write_bytes_rate | // |----------|------------------| @@ -870,19 +872,21 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { for i := 0; i < 100; i++ { clearPendingInfluence(hb.(*hotScheduler)) ops, _ := hb.Schedule(tc, false) - op := ops[0] - operatorutil.CheckTransferLeader(re, op, operator.OpHotRegion, 1, 3) + if len(ops) == 0 { + continue + } + operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 3) } } func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) { // This test is used to test move peer. - schedulePeerPr = 1.0 re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} @@ -1016,6 +1020,7 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb.(*hotScheduler).types = []resourceType{writeLeader} hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.SetHistorySampleDuration(0) re.NoError(err) @@ -1132,7 +1137,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim ops, _ := hb.Schedule(tc, false) for len(ops) == 0 { emptyCnt++ - if emptyCnt >= 10 { + if emptyCnt >= 100 { break testLoop } ops, _ = hb.Schedule(tc, false) @@ -1229,9 +1234,11 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) { for i := 0; i < 100; i++ { clearPendingInfluence(hb.(*hotScheduler)) ops, _ := hb.Schedule(tc, false) - op := ops[0] + if len(ops) == 0 { + continue + } // The targetID should always be 1 as leader is only allowed to be placed in store1 or store2 by placement rule - operatorutil.CheckTransferLeader(re, op, operator.OpHotRegion, 2, 1) + operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 2, 1) ops, _ = hb.Schedule(tc, false) re.Empty(ops) } @@ -1908,7 +1915,8 @@ func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheC if testCase.DegreeAfterTransferLeader >= 3 { // try schedule - hb.prepareForBalance(testCase.kind, tc) + typ := toResourceType(testCase.kind, transferLeader) + hb.prepareForBalance(typ, tc) leaderSolver := newBalanceSolver(hb, tc, testCase.kind, transferLeader) leaderSolver.cur = &solution{srcStore: hb.stLoadInfos[toResourceType(testCase.kind, transferLeader)][2]} re.Empty(leaderSolver.filterHotPeers(leaderSolver.cur.srcStore)) // skip schedule @@ -2036,6 +2044,7 @@ func TestInfluenceByRWType(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) @@ -2054,8 +2063,6 @@ func TestInfluenceByRWType(t *testing.T) { addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{2, 1, 3}, 0.5 * units.MiB, 0.5 * units.MiB, 0}, }) - // must move peer - schedulePeerPr = 1.0 // must move peer from 1 to 4 ops, _ := hb.Schedule(tc, false) op := ops[0] @@ -2080,7 +2087,7 @@ func TestInfluenceByRWType(t *testing.T) { } // must transfer leader - schedulePeerPr = 0 + hb.(*hotScheduler).types = []resourceType{writeLeader} // must transfer leader from 1 to 3 ops, _ = hb.Schedule(tc, false) op = ops[0] @@ -2157,6 +2164,7 @@ func TestHotScheduleWithPriority(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) @@ -2175,8 +2183,7 @@ func TestHotScheduleWithPriority(t *testing.T) { tc.UpdateStorageWrittenStats(3, 6*units.MiB*utils.StoreHeartBeatReportInterval, 6*units.MiB*utils.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenStats(4, 9*units.MiB*utils.StoreHeartBeatReportInterval, 10*units.MiB*utils.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenStats(5, 1*units.MiB*utils.StoreHeartBeatReportInterval, 1*units.MiB*utils.StoreHeartBeatReportInterval) - // must transfer peer - schedulePeerPr = 1.0 + addRegionInfo(tc, utils.Write, []testRegionInfo{ {1, []uint64{1, 2, 3}, 2 * units.MiB, 1 * units.MiB, 0}, {6, []uint64{4, 2, 3}, 1 * units.MiB, 2 * units.MiB, 0}, @@ -2215,6 +2222,7 @@ func TestHotScheduleWithPriority(t *testing.T) { operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 3) hb, err = CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.RankFormulaVersion = "v1" hb.(*hotScheduler).conf.SetHistorySampleDuration(0) @@ -2254,11 +2262,11 @@ func TestHotScheduleWithPriority(t *testing.T) { func TestHotScheduleWithStddev(t *testing.T) { re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writePeer} hb.(*hotScheduler).conf.SetDstToleranceRatio(1.0) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.0) hb.(*hotScheduler).conf.RankFormulaVersion = "v1" @@ -2317,6 +2325,7 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { defer cancel() hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) + hb.(*hotScheduler).types = []resourceType{writeLeader} hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetHistorySampleDuration(0) diff --git a/pkg/schedule/schedulers/hot_region_v2_test.go b/pkg/schedule/schedulers/hot_region_v2_test.go index 9913faeae5c..dd1d99fc01d 100644 --- a/pkg/schedule/schedulers/hot_region_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_v2_test.go @@ -30,12 +30,12 @@ import ( func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -1. re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) + hb.types = []resourceType{writePeer} hb.conf.SetDstToleranceRatio(0.0) hb.conf.SetSrcToleranceRatio(0.0) hb.conf.SetRankFormulaVersion("v1") @@ -91,12 +91,12 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -3. re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) + hb.types = []resourceType{writePeer} hb.conf.SetDstToleranceRatio(0.0) hb.conf.SetSrcToleranceRatio(0.0) hb.conf.SetRankFormulaVersion("v1") @@ -143,12 +143,12 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -2. re := require.New(t) - schedulePeerPr = 1.0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) + hb.types = []resourceType{writePeer} hb.conf.SetDstToleranceRatio(0.0) hb.conf.SetSrcToleranceRatio(0.0) hb.conf.SetRankFormulaVersion("v1") diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index d503931f0b2..5a603515942 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -41,7 +41,6 @@ func prepareSchedulersTest(needToRunStream ...bool) (func(), config.SchedulerCon clean := func() { cancel() // reset some config to avoid affecting other tests - schedulePeerPr = defaultSchedulePeerPr pendingAmpFactor = defaultPendingAmpFactor stddevThreshold = defaultStddevThreshold topnPosition = defaultTopnPosition @@ -317,7 +316,6 @@ func TestShuffleRegionRole(t *testing.T) { } func TestSpecialUseHotRegion(t *testing.T) { - schedulePeerPr = 1.0 re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -363,9 +361,13 @@ func TestSpecialUseHotRegion(t *testing.T) { tc.AddLeaderRegionWithWriteInfo(5, 3, 512*units.KiB*utils.RegionHeartBeatReportInterval, 0, 0, utils.RegionHeartBeatReportInterval, []uint64{1, 2}) hs, err := CreateScheduler(utils.Write.String(), oc, storage, cd) re.NoError(err) - ops, _ = hs.Schedule(tc, false) - re.Len(ops, 1) - operatorutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 1, 4) + for i := 0; i < 100; i++ { + ops, _ = hs.Schedule(tc, false) + if len(ops) == 0 { + continue + } + operatorutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 1, 4) + } } func TestSpecialUseReserved(t *testing.T) { diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 0b9021267cb..5e6600e531f 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -159,9 +159,9 @@ func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerClus func (s *shuffleHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { shuffleHotRegionCounter.Inc() - rw := s.randomRWType() - s.prepareForBalance(rw, cluster) - operators := s.randomSchedule(cluster, s.stLoadInfos[buildResourceType(rw, constant.LeaderKind)]) + typ := s.randomType() + s.prepareForBalance(typ, cluster) + operators := s.randomSchedule(cluster, s.stLoadInfos[typ]) return operators, nil }