Skip to content

Commit

Permalink
schedulers,test: avoid some test branches not being reached and remov…
Browse files Browse the repository at this point in the history
…e schedulePeerPr (#8087)

close #8073

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Hu# <jinhao.hu@pingcap.com>
  • Loading branch information
lhy1024 and HuSharp authored Jul 4, 2024
1 parent 6b25787 commit 86f8030
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 159 deletions.
10 changes: 5 additions & 5 deletions pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,13 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle

func (s *grantHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
grantHotRegionCounter.Inc()
rw := s.randomRWType()
s.prepareForBalance(rw, cluster)
return s.dispatch(rw, cluster), nil
typ := s.randomType()
s.prepareForBalance(typ, cluster)
return s.dispatch(typ, cluster), nil
}

func (s *grantHotRegionScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator {
stLoadInfos := s.stLoadInfos[buildResourceType(typ, constant.RegionKind)]
func (s *grantHotRegionScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator {
stLoadInfos := s.stLoadInfos[typ]
infos := make([]*statistics.StoreLoadDetail, len(stLoadInfos))
index := 0
for _, info := range stLoadInfos {
Expand Down
99 changes: 54 additions & 45 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,26 @@ const (
// HotRegionName is balance hot region scheduler name.
HotRegionName = "balance-hot-region-scheduler"
// HotRegionType is balance hot region scheduler type.
HotRegionType = "hot-region"
splitHotReadBuckets = "split-hot-read-region"
splitHotWriteBuckets = "split-hot-write-region"
splitProgressiveRank = int64(-5)
minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
HotRegionType = "hot-region"
splitHotReadBuckets = "split-hot-read-region"
splitHotWriteBuckets = "split-hot-write-region"
splitProgressiveRank = int64(-5)
minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
defaultPendingAmpFactor = 2.0
defaultStddevThreshold = 0.1
defaultTopnPosition = 10
)

var (
// schedulePeerPr the probability of schedule the hot peer.
schedulePeerPr = 0.66
// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together
pendingAmpFactor = 2.0
pendingAmpFactor = defaultPendingAmpFactor
// If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension,
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1
stddevThreshold = defaultStddevThreshold
// topnPosition is the position of the topn peer in the hot peer list.
// We use it to judge whether to schedule the hot peer in some cases.
topnPosition = 10
topnPosition = defaultTopnPosition
// statisticsInterval is the interval to update statistics information.
statisticsInterval = time.Second
)
Expand Down Expand Up @@ -120,8 +121,9 @@ type baseHotScheduler struct {
// regionPendings stores regionID -> pendingInfluence,
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*pendingInfluence
types []utils.RWType
regionPendings map[uint64]*pendingInfluence
// types is the resource types that the scheduler considers.
types []resourceType
r *rand.Rand
updateReadTime time.Time
updateWriteTime time.Time
Expand All @@ -131,26 +133,26 @@ func newBaseHotScheduler(opController *operator.Controller, sampleDuration time.
base := NewBaseScheduler(opController)
ret := &baseHotScheduler{
BaseScheduler: base,
types: []utils.RWType{utils.Write, utils.Read},
regionPendings: make(map[uint64]*pendingInfluence),
stHistoryLoads: statistics.NewStoreHistoryLoads(utils.DimLen, sampleDuration, sampleInterval),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
ret.types = append(ret.types, ty)
ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{}
}
return ret
}

// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store, only update read or write load detail
func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.SchedulerCluster) {
func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.SchedulerCluster) {
storeInfos := statistics.SummaryStoreInfos(cluster.GetStores())
h.summaryPendingInfluence(storeInfos)
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetSchedulerConfig().IsTraceRegionFlow()

prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) {
prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, rw utils.RWType, resource constant.ResourceKind) {
ty := buildResourceType(rw, resource)
h.stLoadInfos[ty] = statistics.SummaryStoresLoad(
storeInfos,
Expand All @@ -160,23 +162,25 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched
isTraceRegionFlow,
rw, resource)
}
switch rw {
case utils.Read:
switch typ {
case readLeader, readPeer:
// update read statistics
if time.Since(h.updateReadTime) >= statisticsInterval {
regionRead := cluster.RegionReadStats()
prepare(regionRead, constant.LeaderKind)
prepare(regionRead, constant.RegionKind)
prepare(regionRead, utils.Read, constant.LeaderKind)
prepare(regionRead, utils.Read, constant.RegionKind)
h.updateReadTime = time.Now()
}
case utils.Write:
case writeLeader, writePeer:
// update write statistics
if time.Since(h.updateWriteTime) >= statisticsInterval {
regionWrite := cluster.RegionWriteStats()
prepare(regionWrite, constant.LeaderKind)
prepare(regionWrite, constant.RegionKind)
prepare(regionWrite, utils.Write, constant.LeaderKind)
prepare(regionWrite, utils.Write, constant.RegionKind)
h.updateWriteTime = time.Now()
}
default:
log.Error("invalid resource type", zap.String("type", typ.String()))
}
}

Expand Down Expand Up @@ -223,7 +227,7 @@ func setHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) {
HotPendingSum.WithLabelValues(storeLabel, rwTy, dim).Set(load)
}

func (h *baseHotScheduler) randomRWType() utils.RWType {
func (h *baseHotScheduler) randomType() resourceType {
return h.types[h.r.Int()%len(h.types)]
}

Expand Down Expand Up @@ -324,24 +328,32 @@ func (h *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {

func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
hotSchedulerCounter.Inc()
rw := h.randomRWType()
return h.dispatch(rw, cluster), nil
typ := h.randomType()
return h.dispatch(typ, cluster), nil
}

func (h *hotScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator {
func (h *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator {
h.Lock()
defer h.Unlock()
h.updateHistoryLoadConfig(h.conf.GetHistorySampleDuration(), h.conf.GetHistorySampleInterval())
h.prepareForBalance(typ, cluster)
// it can not move earlier to support to use api and metrics.
if h.conf.IsForbidRWType(typ) {
return nil
}
// IsForbidRWType can not be move earlier to support to use api and metrics.
switch typ {
case utils.Read:
case readLeader, readPeer:
if h.conf.IsForbidRWType(utils.Read) {
return nil
}
return h.balanceHotReadRegions(cluster)
case utils.Write:
return h.balanceHotWriteRegions(cluster)
case writePeer:
if h.conf.IsForbidRWType(utils.Write) {
return nil
}
return h.balanceHotWritePeers(cluster)
case writeLeader:
if h.conf.IsForbidRWType(utils.Write) {
return nil
}
return h.balanceHotWriteLeaders(cluster)
}
return nil
}
Expand Down Expand Up @@ -406,19 +418,16 @@ func (h *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*o
return nil
}

func (h *hotScheduler) balanceHotWriteRegions(cluster sche.SchedulerCluster) []*operator.Operator {
// prefer to balance by peer
s := h.r.Intn(100)
switch {
case s < int(schedulePeerPr*100):
peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 && peerSolver.tryAddPendingInfluence() {
return ops
}
default:
func (h *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*operator.Operator {
peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 && peerSolver.tryAddPendingInfluence() {
return ops
}
return nil
}

func (h *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*operator.Operator {
leaderSolver := newBalanceSolver(h, cluster, utils.Write, transferLeader)
ops := leaderSolver.solve()
if len(ops) > 0 && leaderSolver.tryAddPendingInfluence() {
Expand Down
Loading

0 comments on commit 86f8030

Please sign in to comment.