Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Aug 30, 2023
1 parent b8e2f61 commit 8412f8c
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
for i, filterFunc := range context.filterFuncs {
filters[i] = filterFunc()
}
filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores)
for _, peer := range peers {
if _, ok := selectedStores[peer.GetStoreId()]; ok {
if allowLeader(oldFit, peer) {
Expand All @@ -333,7 +334,6 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
}
filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
for {
filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores)
newPeer := r.selectNewPeer(context, group, peer, filters)
targetPeers[newPeer.GetStoreId()] = newPeer
selectedStores[newPeer.GetStoreId()] = struct{}{}
Expand All @@ -355,7 +355,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
// FIXME: target leader only considers the ordinary stores, maybe we need to consider the
// special engine stores if the engine supports to become a leader. But now there is only
// one engine, tiflash, which does not support the leader, so don't consider it for now.
targetLeader, leaderHit := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
if targetLeader == 0 {
scatterSkipNoLeaderCounter.Inc()
return nil
Expand Down Expand Up @@ -391,7 +391,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
scatterSuccessCounter.Inc()
r.Put(targetPeers, targetLeader, group)
op.AdditionalInfos["group"] = group
op.AdditionalInfos["leader-hit"] = strconv.FormatUint(leaderHit, 10)
op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10)
op.SetPriorityLevel(constant.High)
}
return op
Expand Down Expand Up @@ -426,6 +426,12 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.
return region.GetLeader().GetStoreId() == targetLeader
}

// selectNewPeer return the new peer which pick the fewest picked count.
// it keeps the origin peer if the origin store's pick count is equal the fewest pick.
// it can be diveded into three steps:
// 1. found the max pick count and the min pick count.
// 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer.
// 3. otherwise, select the store which pick count is the min pick count and pass all filter.
func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer {
stores := r.cluster.GetStores()
maxStoreTotalCount := uint64(0)
Expand All @@ -442,11 +448,11 @@ func (r *RegionScatterer) selectNewPeer(context engineContext, group string, pee

var newPeer *metapb.Peer
minCount := uint64(math.MaxUint64)
sourceHit := uint64(math.MaxUint64)
originStorePickedCount := uint64(math.MaxUint64)
for _, store := range stores {
storeCount := context.selectedPeer.Get(store.GetID(), group)
if store.GetID() == peer.GetId() {
sourceHit = storeCount
originStorePickedCount = storeCount
}
// If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate.
// If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
Expand All @@ -463,7 +469,7 @@ func (r *RegionScatterer) selectNewPeer(context engineContext, group string, pee
}
}
}
if sourceHit <= minCount {
if originStorePickedCount <= minCount {
return peer
}
if newPeer == nil {
Expand All @@ -475,7 +481,7 @@ func (r *RegionScatterer) selectNewPeer(context engineContext, group string, pee
// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
// the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines.
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo,
leaderCandidateStores []uint64, context engineContext) (leaderID uint64, hit uint64) {
leaderCandidateStores []uint64, context engineContext) (leaderID uint64, leaderStorePickedCount uint64) {
sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId())
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
Expand Down

0 comments on commit 8412f8c

Please sign in to comment.