Skip to content

Commit

Permalink
remove schedulePeerPr
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Jun 25, 2024
1 parent a91615b commit ccbbef1
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 71 deletions.
10 changes: 5 additions & 5 deletions pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,13 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle

func (s *grantHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
grantHotRegionCounter.Inc()
rw := s.randomRWType()
s.prepareForBalance(rw, cluster)
return s.dispatch(rw, cluster), nil
typ := s.randomType()
s.prepareForBalance(typ, cluster)
return s.dispatch(typ, cluster), nil
}

func (s *grantHotRegionScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator {
stLoadInfos := s.stLoadInfos[buildResourceType(typ, constant.RegionKind)]
func (s *grantHotRegionScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator {
stLoadInfos := s.stLoadInfos[typ]
infos := make([]*statistics.StoreLoadDetail, len(stLoadInfos))
index := 0
for _, info := range stLoadInfos {
Expand Down
76 changes: 40 additions & 36 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,12 @@ const (
splitProgressiveRank = int64(-5)
minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
defaultSchedulePeerPr = 0.66
defaultPendingAmpFactor = 2.0
defaultStddevThreshold = 0.1
defaultTopnPosition = 10
)

var (
// schedulePeerPr the probability of schedule the hot peer.
schedulePeerPr = defaultSchedulePeerPr
// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together
pendingAmpFactor = defaultPendingAmpFactor
// If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension,
Expand Down Expand Up @@ -125,7 +122,7 @@ type baseHotScheduler struct {
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*pendingInfluence
types []utils.RWType
types []resourceType
r *rand.Rand
updateReadTime time.Time
updateWriteTime time.Time
Expand All @@ -135,26 +132,26 @@ func newBaseHotScheduler(opController *operator.Controller, sampleDuration time.
base := NewBaseScheduler(opController)
ret := &baseHotScheduler{
BaseScheduler: base,
types: []utils.RWType{utils.Write, utils.Read},
regionPendings: make(map[uint64]*pendingInfluence),
stHistoryLoads: statistics.NewStoreHistoryLoads(utils.DimLen, sampleDuration, sampleInterval),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
ret.types = append(ret.types, ty)
ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{}
}
return ret
}

// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store, only update read or write load detail
func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.SchedulerCluster) {
func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.SchedulerCluster) {
storeInfos := statistics.SummaryStoreInfos(cluster.GetStores())
h.summaryPendingInfluence(storeInfos)
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetSchedulerConfig().IsTraceRegionFlow()

prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) {
prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, rw utils.RWType, resource constant.ResourceKind) {
ty := buildResourceType(rw, resource)
h.stLoadInfos[ty] = statistics.SummaryStoresLoad(
storeInfos,
Expand All @@ -164,23 +161,25 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched
isTraceRegionFlow,
rw, resource)
}
switch rw {
case utils.Read:
switch typ {
case readLeader, readPeer:
// update read statistics
if time.Since(h.updateReadTime) >= statisticsInterval {
regionRead := cluster.RegionReadStats()
prepare(regionRead, constant.LeaderKind)
prepare(regionRead, constant.RegionKind)
prepare(regionRead, utils.Read, constant.LeaderKind)
prepare(regionRead, utils.Read, constant.RegionKind)
h.updateReadTime = time.Now()
}
case utils.Write:
case writeLeader, writePeer:
// update write statistics
if time.Since(h.updateWriteTime) >= statisticsInterval {
regionWrite := cluster.RegionWriteStats()
prepare(regionWrite, constant.LeaderKind)
prepare(regionWrite, constant.RegionKind)
prepare(regionWrite, utils.Write, constant.LeaderKind)
prepare(regionWrite, utils.Write, constant.RegionKind)
h.updateWriteTime = time.Now()
}
default:
log.Error("invalid resource type", zap.String("type", typ.String()))
}
}

Expand Down Expand Up @@ -227,7 +226,7 @@ func setHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) {
HotPendingSum.WithLabelValues(storeLabel, rwTy, dim).Set(load)
}

func (h *baseHotScheduler) randomRWType() utils.RWType {
func (h *baseHotScheduler) randomType() resourceType {
return h.types[h.r.Int()%len(h.types)]
}

Expand Down Expand Up @@ -328,24 +327,32 @@ func (h *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {

func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
hotSchedulerCounter.Inc()
rw := h.randomRWType()
return h.dispatch(rw, cluster), nil
typ := h.randomType()
return h.dispatch(typ, cluster), nil
}

func (h *hotScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator {
func (h *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator {
h.Lock()
defer h.Unlock()
h.updateHistoryLoadConfig(h.conf.GetHistorySampleDuration(), h.conf.GetHistorySampleInterval())
h.prepareForBalance(typ, cluster)
// it can not move earlier to support to use api and metrics.
if h.conf.IsForbidRWType(typ) {
return nil
}
// IsForbidRWType can not be move earlier to support to use api and metrics.
switch typ {
case utils.Read:
case readLeader, readPeer:
if h.conf.IsForbidRWType(utils.Read) {
return nil
}
return h.balanceHotReadRegions(cluster)
case utils.Write:
return h.balanceHotWriteRegions(cluster)
case writePeer:
if h.conf.IsForbidRWType(utils.Write) {
return nil
}
return h.balanceHotWritePeers(cluster)
case writeLeader:
if h.conf.IsForbidRWType(utils.Write) {
return nil
}
return h.balanceHotWriteLeaders(cluster)
}
return nil
}
Expand Down Expand Up @@ -410,19 +417,16 @@ func (h *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*o
return nil
}

func (h *hotScheduler) balanceHotWriteRegions(cluster sche.SchedulerCluster) []*operator.Operator {
// prefer to balance by peer
s := h.r.Intn(100)
switch {
case s < int(schedulePeerPr*100):
peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 && peerSolver.tryAddPendingInfluence() {
return ops
}
default:
func (h *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*operator.Operator {
peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 && peerSolver.tryAddPendingInfluence() {
return ops
}
return nil
}

func (h *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*operator.Operator {
leaderSolver := newBalanceSolver(h, cluster, utils.Write, transferLeader)
ops := leaderSolver.solve()
if len(ops) > 0 && leaderSolver.tryAddPendingInfluence() {
Expand Down
47 changes: 28 additions & 19 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func init() {
func newHotReadScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler {
ret := newHotScheduler(opController, conf)
ret.name = ""
ret.types = []utils.RWType{utils.Read}
ret.types = []resourceType{readLeader, readPeer}
return ret
}

func newHotWriteScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler {
ret := newHotScheduler(opController, conf)
ret.name = ""
ret.types = []utils.RWType{utils.Write}
ret.types = []resourceType{writeLeader, writePeer}
return ret
}

Expand Down Expand Up @@ -201,7 +201,6 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {

func TestSplitIfRegionTooHot(t *testing.T) {
re := require.New(t)
schedulePeerPr = 1.0
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
Expand Down Expand Up @@ -260,6 +259,7 @@ func TestSplitIfRegionTooHot(t *testing.T) {
{1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0},
})
hb, _ = CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
hb.(*hotScheduler).types = []resourceType{writePeer}
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 1)
expectOp, _ = operator.CreateSplitRegionOperator(splitHotReadBuckets, tc.GetRegion(1), operator.OpSplit,
Expand Down Expand Up @@ -391,7 +391,6 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {

func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) {
// This test is used to test move leader and move peer.
schedulePeerPr = 1.0
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetEnableUseJointConsensus(true)
Expand All @@ -401,6 +400,7 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...)
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).types = []resourceType{writePeer}
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)

tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"})
Expand Down Expand Up @@ -740,8 +740,10 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
for i := 0; i < 20; i++ {
clearPendingInfluence(hb)
ops, _ := hb.Schedule(tc, false)
op := ops[0]
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
if len(ops) == 0 {
continue
}
operatorutil.CheckTransferLeaderFrom(re, ops[0], operator.OpHotRegion, 1)
}
// | store_id | write_bytes_rate |
// |----------|------------------|
Expand Down Expand Up @@ -870,19 +872,21 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) {
for i := 0; i < 100; i++ {
clearPendingInfluence(hb.(*hotScheduler))
ops, _ := hb.Schedule(tc, false)
op := ops[0]
operatorutil.CheckTransferLeader(re, op, operator.OpHotRegion, 1, 3)
if len(ops) == 0 {
continue
}
operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 3)
}
}

func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) {
// This test is used to test move peer.
schedulePeerPr = 1.0
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).types = []resourceType{writePeer}
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority}
Expand Down Expand Up @@ -1016,6 +1020,7 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
hb.(*hotScheduler).types = []resourceType{writeLeader}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority}
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
re.NoError(err)
Expand Down Expand Up @@ -1132,7 +1137,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim
ops, _ := hb.Schedule(tc, false)
for len(ops) == 0 {
emptyCnt++
if emptyCnt >= 10 {
if emptyCnt >= 100 {
break testLoop
}
ops, _ = hb.Schedule(tc, false)
Expand Down Expand Up @@ -1229,9 +1234,11 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) {
for i := 0; i < 100; i++ {
clearPendingInfluence(hb.(*hotScheduler))
ops, _ := hb.Schedule(tc, false)
op := ops[0]
if len(ops) == 0 {
continue
}
// The targetID should always be 1 as leader is only allowed to be placed in store1 or store2 by placement rule
operatorutil.CheckTransferLeader(re, op, operator.OpHotRegion, 2, 1)
operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 2, 1)
ops, _ = hb.Schedule(tc, false)
re.Empty(ops)
}
Expand Down Expand Up @@ -1908,7 +1915,8 @@ func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheC

if testCase.DegreeAfterTransferLeader >= 3 {
// try schedule
hb.prepareForBalance(testCase.kind, tc)
typ := toResourceType(testCase.kind, transferLeader)
hb.prepareForBalance(typ, tc)
leaderSolver := newBalanceSolver(hb, tc, testCase.kind, transferLeader)
leaderSolver.cur = &solution{srcStore: hb.stLoadInfos[toResourceType(testCase.kind, transferLeader)][2]}
re.Empty(leaderSolver.filterHotPeers(leaderSolver.cur.srcStore)) // skip schedule
Expand Down Expand Up @@ -2036,6 +2044,7 @@ func TestInfluenceByRWType(t *testing.T) {
defer cancel()
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).types = []resourceType{writePeer}
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
Expand All @@ -2054,8 +2063,6 @@ func TestInfluenceByRWType(t *testing.T) {
addRegionInfo(tc, utils.Read, []testRegionInfo{
{1, []uint64{2, 1, 3}, 0.5 * units.MiB, 0.5 * units.MiB, 0},
})
// must move peer
schedulePeerPr = 1.0
// must move peer from 1 to 4
ops, _ := hb.Schedule(tc, false)
op := ops[0]
Expand All @@ -2080,7 +2087,7 @@ func TestInfluenceByRWType(t *testing.T) {
}

// must transfer leader
schedulePeerPr = 0
hb.(*hotScheduler).types = []resourceType{writeLeader}
// must transfer leader from 1 to 3
ops, _ = hb.Schedule(tc, false)
op = ops[0]
Expand Down Expand Up @@ -2157,6 +2164,7 @@ func TestHotScheduleWithPriority(t *testing.T) {
defer cancel()
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).types = []resourceType{writePeer}
hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05)
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
Expand All @@ -2175,8 +2183,7 @@ func TestHotScheduleWithPriority(t *testing.T) {
tc.UpdateStorageWrittenStats(3, 6*units.MiB*utils.StoreHeartBeatReportInterval, 6*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 9*units.MiB*utils.StoreHeartBeatReportInterval, 10*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 1*units.MiB*utils.StoreHeartBeatReportInterval, 1*units.MiB*utils.StoreHeartBeatReportInterval)
// must transfer peer
schedulePeerPr = 1.0

addRegionInfo(tc, utils.Write, []testRegionInfo{
{1, []uint64{1, 2, 3}, 2 * units.MiB, 1 * units.MiB, 0},
{6, []uint64{4, 2, 3}, 1 * units.MiB, 2 * units.MiB, 0},
Expand Down Expand Up @@ -2215,6 +2222,7 @@ func TestHotScheduleWithPriority(t *testing.T) {
operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 3)

hb, err = CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
hb.(*hotScheduler).types = []resourceType{writePeer}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority}
hb.(*hotScheduler).conf.RankFormulaVersion = "v1"
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
Expand Down Expand Up @@ -2254,11 +2262,11 @@ func TestHotScheduleWithPriority(t *testing.T) {

func TestHotScheduleWithStddev(t *testing.T) {
re := require.New(t)
schedulePeerPr = 1.0
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).types = []resourceType{writePeer}
hb.(*hotScheduler).conf.SetDstToleranceRatio(1.0)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.0)
hb.(*hotScheduler).conf.RankFormulaVersion = "v1"
Expand Down Expand Up @@ -2317,6 +2325,7 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) {
defer cancel()
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).types = []resourceType{writeLeader}
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetHistorySampleDuration(0)
Expand Down
Loading

0 comments on commit ccbbef1

Please sign in to comment.