Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: cache history loads in hot region scheduler #6314

Merged
merged 14 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/core/constant/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ const (
RegionKind
// WitnessKind indicates the witness kind resource
WitnessKind

// ResourceKindLen represents the ResourceKind count
ResourceKindLen
)

func (k ResourceKind) String() string {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type StoreConfig interface {
CheckRegionSize(uint64, uint64) error
CheckRegionKeys(uint64, uint64) error
IsEnableRegionBucket() bool
IsRaftKV2() bool
// for test purpose
SetRegionBucketEnabled(bool)
}
94 changes: 81 additions & 13 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ type baseHotScheduler struct {
stInfos map[uint64]*statistics.StoreSummaryInfo
// temporary states but exported to API or metrics
// Every time `Schedule()` will recalculate it.
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
stHistoryLoads *statistics.StoreHistoryLoads
// temporary states
// Every time `Schedule()` will recalculate it.
storesLoads map[uint64][]float64
Expand All @@ -106,6 +107,7 @@ func newBaseHotScheduler(opController *schedule.OperatorController) *baseHotSche
BaseScheduler: base,
types: []statistics.RWType{statistics.Write, statistics.Read},
regionPendings: make(map[uint64]*pendingInfluence),
stHistoryLoads: statistics.NewStoreHistoryLoads(statistics.DimLen),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
Expand All @@ -127,6 +129,7 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sched
h.stLoadInfos[ty] = statistics.SummaryStoresLoad(
h.stInfos,
h.storesLoads,
h.stHistoryLoads,
regionStats,
isTraceRegionFlow,
rw, resource)
Expand Down Expand Up @@ -277,7 +280,6 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster)
if h.conf.IsForbidRWType(typ) {
return nil
}

switch typ {
case statistics.Read:
return h.balanceHotReadRegions(cluster)
Expand Down Expand Up @@ -461,6 +463,8 @@ type balanceSolver struct {
minorDecRatio float64
maxPeerNum int
minHotDegree int
// todo: remove this after testing more scene in the single rocksdb
isRaftKV2 bool

firstPriorityV2Ratios *rankV2Ratios
secondPriorityV2Ratios *rankV2Ratios
Expand All @@ -474,6 +478,7 @@ type balanceSolver struct {
betterThan func(*solution) bool
rankToDimString func() string
checkByPriorityAndTolerance func(loads []float64, f func(int) bool) bool
checkHistoryLoadsByPriority func(loads [][]float64, f func(int) bool) bool
}

func (bs *balanceSolver) init() {
Expand Down Expand Up @@ -514,6 +519,7 @@ func (bs *balanceSolver) init() {
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()

switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
Expand All @@ -538,10 +544,13 @@ func (bs *balanceSolver) pickCheckPolicyV1() {
switch {
case bs.resourceTy == writeLeader:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceFirstOnly
case bs.sche.conf.IsStrictPickingStoreEnabled():
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAllOf
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceAllOf
default:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceFirstOnly
}
}

Expand Down Expand Up @@ -610,7 +619,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if !bs.isValid() {
return nil
}

bs.cur = &solution{}
tryUpdateBestSolution := func() {
if label, ok := bs.filterUniformStore(); ok {
Expand Down Expand Up @@ -789,12 +797,20 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
continue
}

if bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
if !bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("src-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
// only raftkv2 needs to check the history loads.
if bs.isRaftKV2 {
if !bs.checkSrcHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
}

ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
}
return ret
}
Expand All @@ -805,6 +821,17 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta
})
}

func (bs *balanceSolver) checkSrcHistoryLoadsByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
if len(current.HistoryLoads) == 0 {
return true
}
return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool {
return slice.AllOf(current.HistoryLoads[i], func(j int) bool {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j]
})
})
}

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
Expand Down Expand Up @@ -1003,12 +1030,20 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
}
if filter.Target(bs.GetOpts(), store, filters) {
id := store.GetID()
if bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("dst-store-failed", strconv.FormatUint(id, 10)).Inc()
if !bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("dst-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
// only raftkv2 needs to check history loads
if bs.isRaftKV2 {
if !bs.checkDstHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, dstToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
}

hotSchedulerResultCounter.WithLabelValues("dst-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
ret[id] = detail
}
}
return ret
Expand All @@ -1020,6 +1055,17 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist
})
}

func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool {
if len(current.HistoryLoads) == 0 {
return true
}
return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool {
return slice.AllOf(current.HistoryLoads[i], func(j int) bool {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
return current.HistoryLoads[i][j]*toleranceRatio < expect.HistoryLoads[i][j]
})
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
Expand All @@ -1029,6 +1075,15 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f fun
})
}

func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceAllOf(loads [][]float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return true
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
Expand All @@ -1038,10 +1093,23 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f fun
})
}

func (bs *balanceSolver) checkHistoryByPriorityAndToleranceAnyOf(loads [][]float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return false
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceFirstOnly(_ [][]float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) enableExpectation() bool {
return bs.sche.conf.GetDstToleranceRatio() > 0 && bs.sche.conf.GetSrcToleranceRatio() > 0
}
Expand Down
Loading