Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#8675
Browse files Browse the repository at this point in the history
close tikv#8674

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx authored and ti-chi-bot committed Oct 18, 2024
1 parent d92769d commit 512ca52
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 57 deletions.
116 changes: 76 additions & 40 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package checker

import (
"fmt"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -61,17 +63,31 @@ var (
// Location management, mainly used for cross data center deployment.
type ReplicaChecker struct {
PauseController
<<<<<<< HEAD
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
regionWaitingList cache.Cache
=======
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
pendingProcessedRegions *cache.TTLUint64
r *rand.Rand
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
<<<<<<< HEAD
cluster: cluster,
conf: conf,
regionWaitingList: regionWaitingList,
=======
cluster: cluster,
conf: conf,
pendingProcessedRegions: pendingProcessedRegions,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
}
}

Expand All @@ -81,40 +97,40 @@ func (r *ReplicaChecker) GetType() string {
}

// Check verifies a region's replicas, creating an operator.Operator if need.
func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
replicaCheckerCounter.Inc()
if r.IsPaused() {
if c.IsPaused() {
replicaCheckerPausedCounter.Inc()
return nil
}
if op := r.checkDownPeer(region); op != nil {
if op := c.checkDownPeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkOfflinePeer(region); op != nil {
if op := c.checkOfflinePeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkMakeUpReplica(region); op != nil {
if op := c.checkMakeUpReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkRemoveExtraReplica(region); op != nil {
if op := c.checkRemoveExtraReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
if op := r.checkLocationReplacement(region); op != nil {
if op := c.checkLocationReplacement(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
return nil
}

func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsRemoveDownReplicaEnabled() {
func (c *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsRemoveDownReplicaEnabled() {
return nil
}

Expand All @@ -124,22 +140,22 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat
continue
}
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < r.conf.GetMaxStoreDownTime() {
if store.DownTime() < c.conf.GetMaxStoreDownTime() {
continue
}
return r.fixPeer(region, storeID, downStatus)
return c.fixPeer(region, storeID, downStatus)
}
return nil
}

func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsReplaceOfflineReplicaEnabled() {
func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsReplaceOfflineReplicaEnabled() {
return nil
}

Expand All @@ -150,7 +166,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope

for _, peer := range region.GetPeers() {
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
Expand All @@ -159,71 +175,79 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope
continue
}

return r.fixPeer(region, storeID, offlineStatus)
return c.fixPeer(region, storeID, offlineStatus)
}

return nil
}

func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsMakeUpReplicaEnabled() {
func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsMakeUpReplicaEnabled() {
return nil
}
if len(region.GetPeers()) >= r.conf.GetMaxReplicas() {
if len(region.GetPeers()) >= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores)
if target == 0 {
log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID()))
replicaCheckerNoTargetStoreCounter.Inc()
if filterByTempState {
<<<<<<< HEAD
r.regionWaitingList.Put(region.GetID(), nil)
=======
c.pendingProcessedRegions.Put(region.GetID(), nil)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica)
op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica)
if err != nil {
log.Debug("create make-up-replica operator fail", errs.ZapError(err))
return nil
}
return op
}

func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsRemoveExtraReplicaEnabled() {
func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsRemoveExtraReplicaEnabled() {
return nil
}
// when add learner peer, the number of peer will exceed max replicas for a while,
// just comparing the the number of voters to avoid too many cancel add operator log.
if len(region.GetVoters()) <= r.conf.GetMaxReplicas() {
if len(region.GetVoters()) <= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
old := r.strategy(region).SelectStoreToRemove(regionStores)
regionStores := c.cluster.GetRegionStores(region)
old := c.strategy(c.r, region).SelectStoreToRemove(regionStores)
if old == 0 {
replicaCheckerNoWorstPeerCounter.Inc()
<<<<<<< HEAD
r.regionWaitingList.Put(region.GetID(), nil)
=======
c.pendingProcessedRegions.Put(region.GetID(), nil)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
return nil
}
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old)
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsLocationReplacementEnabled() {
func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsLocationReplacementEnabled() {
return nil
}

strategy := r.strategy(region)
regionStores := r.cluster.GetRegionStores(region)
strategy := c.strategy(c.r, region)
regionStores := c.cluster.GetRegionStores(region)
oldStore := strategy.SelectStoreToRemove(regionStores)
if oldStore == 0 {
replicaCheckerAllRightCounter.Inc()
Expand All @@ -237,19 +261,19 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper
}

newPeer := &metapb.Peer{StoreId: newStore}
op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer)
op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
// Check the number of replicas first.
if len(region.GetVoters()) > r.conf.GetMaxReplicas() {
if len(region.GetVoters()) > c.conf.GetMaxReplicas() {
removeExtra := fmt.Sprintf("remove-extra-%s-replica", status)
op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID)
op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID)
if err != nil {
if status == offlineStatus {
replicaCheckerRemoveExtraOfflineFailedCounter.Inc()
Expand All @@ -261,8 +285,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
return op
}

regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID)
if target == 0 {
if status == offlineStatus {
replicaCheckerNoStoreOfflineCounter.Inc()
Expand All @@ -271,13 +295,17 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
}
log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID()))
if filterByTempState {
<<<<<<< HEAD
r.regionWaitingList.Put(region.GetID(), nil)
=======
c.pendingProcessedRegions.Put(region.GetID(), nil)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
replace := fmt.Sprintf("replace-%s-replica", status)
op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer)
op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer)
if err != nil {
if status == offlineStatus {
replicaCheckerReplaceOfflineFailedCounter.Inc()
Expand All @@ -289,12 +317,20 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
return op
}

func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy {
func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy {
return &ReplicaStrategy{
<<<<<<< HEAD
checkerName: replicaCheckerName,
cluster: r.cluster,
locationLabels: r.conf.GetLocationLabels(),
isolationLevel: r.conf.GetIsolationLevel(),
=======
checkerName: c.Name(),
cluster: c.cluster,
locationLabels: c.conf.GetLocationLabels(),
isolationLevel: c.conf.GetIsolationLevel(),
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
region: region,
r: r,
}
}
7 changes: 5 additions & 2 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package checker

import (
"math/rand"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
Expand All @@ -26,6 +28,7 @@ import (
// ReplicaStrategy collects some utilities to manipulate region peers. It
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
r *rand.Rand
checkerName string // replica-checker / rule-checker
cluster sche.CheckerCluster
locationLabels []string
Expand Down Expand Up @@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level}
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()).
FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...).
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
if targetCandidate.Len() == 0 {
Expand Down Expand Up @@ -143,7 +146,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo
if s.fastFailover {
level = constant.Urgent
}
source := filter.NewCandidates(coLocationStores).
source := filter.NewCandidates(s.r, coLocationStores).
FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
KeepTheTopStores(isolationComparer, true).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), false)
Expand Down
Loading

0 comments on commit 512ca52

Please sign in to comment.