Skip to content

Commit

Permalink
schedule: fix scheduler removes region leader (#1462) (#1470)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Mar 20, 2019
1 parent 860ef89 commit 7df2fde
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 25 deletions.
10 changes: 8 additions & 2 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,10 @@ func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreI
return errors.Trace(err)
}

op := schedule.CreateMovePeerOperator("adminMovePeer", c.cluster, region, schedule.OpAdmin, fromStoreID, toStoreID, newPeer.GetId())
op, err := schedule.CreateMovePeerOperator("adminMovePeer", c.cluster, region, schedule.OpAdmin, fromStoreID, toStoreID, newPeer.GetId())
if err != nil {
return err
}
if ok := c.addOperator(op); !ok {
return errors.Trace(errAddOperator)
}
Expand Down Expand Up @@ -455,7 +458,10 @@ func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) err
return errors.Errorf("region has no peer in store %v", fromStoreID)
}

op := schedule.CreateRemovePeerOperator("adminRemovePeer", c.cluster, schedule.OpAdmin, region, fromStoreID)
op, err := schedule.CreateRemovePeerOperator("adminRemovePeer", c.cluster, schedule.OpAdmin, region, fromStoreID)
if err != nil {
return err
}
if ok := c.addOperator(op); !ok {
return errors.Trace(errAddOperator)
}
Expand Down
7 changes: 6 additions & 1 deletion server/schedule/namespace_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,13 @@ func (n *NamespaceChecker) Check(region *core.RegionInfo) *Operator {
checkerCounter.WithLabelValues("namespace_checker", "no_target_peer").Inc()
return nil
}
op, err := CreateMovePeerOperator("makeNamespaceRelocation", n.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
checkerCounter.WithLabelValues("namespace_checker", "create_operator_fail").Inc()
return nil
}
checkerCounter.WithLabelValues("namespace_checker", "new_operator").Inc()
return CreateMovePeerOperator("makeNamespaceRelocation", n.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
return op
}

checkerCounter.WithLabelValues("namespace_checker", "all_right").Inc()
Expand Down
35 changes: 27 additions & 8 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,14 +417,20 @@ func (o *Operator) History() []OperatorHistory {
}

// CreateRemovePeerOperator creates an Operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, region *core.RegionInfo, storeID uint64) *Operator {
removeKind, steps := removePeerSteps(cluster, region, storeID)
return NewOperator(desc, region.GetId(), region.GetRegionEpoch(), removeKind|kind, steps...)
func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, storeID, getRegionFollowerIDs(region))
if err != nil {
return nil, err
}
return NewOperator(desc, region.GetId(), region.GetRegionEpoch(), removeKind|kind, steps...), nil
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) *Operator {
removeKind, steps := removePeerSteps(cluster, region, oldStore)
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
return nil, err
}
var st []OperatorStep
if cluster.IsRaftLearnerEnabled() {
st = []OperatorStep{
Expand All @@ -437,20 +443,33 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf
}
}
steps = append(st, steps...)
return NewOperator(desc, region.GetId(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...)
return NewOperator(desc, region.GetId(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...), nil
}

func getRegionFollowerIDs(region *core.RegionInfo) []uint64 {
var ids []uint64
for id := range region.GetFollowers() {
ids = append(ids, id)
}
return ids
}

// removePeerSteps returns the steps to safely remove a peer. It prevents removing leader by transfer its leadership first.
func removePeerSteps(cluster Cluster, region *core.RegionInfo, storeID uint64) (kind OperatorKind, steps []OperatorStep) {
func removePeerSteps(cluster Cluster, region *core.RegionInfo, storeID uint64, followerIDs []uint64) (kind OperatorKind, steps []OperatorStep, err error) {
if region.Leader != nil && region.Leader.GetStoreId() == storeID {
for id := range region.GetFollowers() {
for _, id := range followerIDs {
follower := cluster.GetStore(id)
if follower != nil && !cluster.CheckLabelProperty(RejectLeader, follower.Labels) {
steps = append(steps, TransferLeader{FromStore: storeID, ToStore: id})
kind = OpLeader
break
}
}
if len(steps) == 0 {
err = errors.New("no suitable follower to become region leader")
log.Debugf("fail to create remove peer operator, region: %v, err: %v", region.GetId(), err)
return
}
}
steps = append(steps, RemovePeer{FromStore: storeID})
kind |= OpRegion
Expand Down
5 changes: 4 additions & 1 deletion server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {
delete(stores, newPeer.GetStoreId())
r.selected.put(newPeer.GetStoreId())

op := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
continue
}
steps = append(steps, op.steps...)
steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()})
kind |= op.Kind()
Expand Down
41 changes: 35 additions & 6 deletions server/schedule/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,13 @@ func (r *ReplicaChecker) Check(region *core.RegionInfo) *Operator {
checkerCounter.WithLabelValues("replica_checker", "no_worst_peer").Inc()
return nil
}
op, err := CreateRemovePeerOperator("removeExtraReplica", r.cluster, OpReplica, region, oldPeer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
checkerCounter.WithLabelValues("replica_checker", "new_operator").Inc()
return CreateRemovePeerOperator("removeExtraReplica", r.cluster, OpReplica, region, oldPeer.GetStoreId())
return op
}

return r.checkBestReplacement(region)
Expand Down Expand Up @@ -166,7 +171,12 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *Operator {
if stats.GetDownSeconds() < uint64(r.cluster.GetMaxStoreDownTime().Seconds()) {
continue
}
return CreateRemovePeerOperator("removeDownReplica", r.cluster, OpReplica, region, peer.GetStoreId())
op, err := CreateRemovePeerOperator("removeDownReplica", r.cluster, OpReplica, region, peer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica-checker", "create_operator_fail").Inc()
return nil
}
return op
}
return nil
}
Expand All @@ -189,15 +199,25 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *Operator {

// Check the number of replicas first.
if len(region.GetPeers()) > r.cluster.GetMaxReplicas() {
return CreateRemovePeerOperator("removeExtraOfflineReplica", r.cluster, OpReplica, region, peer.GetStoreId())
op, err := CreateRemovePeerOperator("removeExtraOfflineReplica", r.cluster, OpReplica, region, peer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
return op
}

// Consider we have 3 peers (A, B, C), we set the store that contains C to
// offline while C is pending. If we generate an operator that adds a replica
// D then removes C, D will not be successfully added util C is normal again.
// So it's better to remove C directly.
if region.GetPendingPeer(peer.GetId()) != nil {
return CreateRemovePeerOperator("removePendingOfflineReplica", r.cluster, OpReplica, region, peer.GetStoreId())
op, err := CreateRemovePeerOperator("removePendingOfflineReplica", r.cluster, OpReplica, region, peer.GetStoreId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
return op
}

storeID, _ := r.SelectBestReplacementStore(region, peer, NewStorageThresholdFilter())
Expand All @@ -209,7 +229,11 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *Operator {
if err != nil {
return nil
}
return CreateMovePeerOperator("makeUpOfflineReplica", r.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
op, err := CreateMovePeerOperator("makeUpOfflineReplica", r.cluster, region, OpReplica, peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
return nil
}
return op
}

return nil
Expand All @@ -236,6 +260,11 @@ func (r *ReplicaChecker) checkBestReplacement(region *core.RegionInfo) *Operator
if err != nil {
return nil
}
op, err := CreateMovePeerOperator("moveToBetterLocation", r.cluster, region, OpReplica, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
checkerCounter.WithLabelValues("replica_checker", "create_operator_fail").Inc()
return nil
}
checkerCounter.WithLabelValues("replica_checker", "new_operator").Inc()
return CreateMovePeerOperator("moveToBetterLocation", r.cluster, region, OpReplica, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
return op
}
6 changes: 5 additions & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,11 @@ func (l *balanceAdjacentRegionScheduler) dispersePeer(cluster schedule.Cluster,
// record the store id and exclude it in next time
l.cacheRegions.assignedStoreIds = append(l.cacheRegions.assignedStoreIds, newPeer.GetStoreId())

op := schedule.CreateMovePeerOperator("balance-adjacent-peer", cluster, region, schedule.OpAdjacent, leaderStoreID, newPeer.GetStoreId(), newPeer.GetId())
op, err := schedule.CreateMovePeerOperator("balance-adjacent-peer", cluster, region, schedule.OpAdjacent, leaderStoreID, newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(l.GetName(), "create_operator_fail").Inc()
return nil
}
op.SetPriorityLevel(core.LowPriority)
schedulerCounter.WithLabelValues(l.GetName(), "adjacent_peer").Inc()
return op
Expand Down
7 changes: 6 additions & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,10 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region *
}
balanceRegionCounter.WithLabelValues("move_peer", fmt.Sprintf("store%d-out", source.GetId())).Inc()
balanceRegionCounter.WithLabelValues("move_peer", fmt.Sprintf("store%d-in", target.GetId())).Inc()
return schedule.CreateMovePeerOperator("balance-region", cluster, region, schedule.OpBalance, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
op, err := schedule.CreateMovePeerOperator("balance-region", cluster, region, schedule.OpBalance, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create_operator_fail").Inc()
return nil
}
return op
}
4 changes: 2 additions & 2 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,15 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) {
tc.AddRegionStore(4, 16)
// Add region 1 with leader in store 4.
tc.AddLeaderRegion(1, 4)
CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 4, 1)
CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 4, 1)

// Test stateFilter.
tc.SetStoreOffline(1)
tc.UpdateRegionCount(2, 6)
cache.Remove(4)
// When store 1 is offline, it will be filtered,
// store 2 becomes the store with least regions.
CheckTransferPeer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 4, 2)
CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc, schedule.NewOpInfluence(nil, tc))[0], schedule.OpBalance, 4, 2)
opt.SetMaxReplicas(3)
c.Assert(sb.Schedule(tc, schedule.NewOpInfluence(nil, tc)), IsNil)

Expand Down
14 changes: 12 additions & 2 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,13 @@ func (h *balanceHotRegionsScheduler) balanceHotReadRegions(cluster schedule.Clus
// balance by peer
srcRegion, srcPeer, destPeer := h.balanceByPeer(cluster, h.stats.readStatAsLeader)
if srcRegion != nil {
op, err := schedule.CreateMovePeerOperator("moveHotReadRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(h.GetName(), "create_operator_fail").Inc()
return nil
}
schedulerCounter.WithLabelValues(h.GetName(), "move_peer").Inc()
return []*schedule.Operator{schedule.CreateMovePeerOperator("moveHotReadRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())}
return []*schedule.Operator{op}
}
schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
Expand All @@ -171,8 +176,13 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu
// balance by peer
srcRegion, srcPeer, destPeer := h.balanceByPeer(cluster, h.stats.writeStatAsPeer)
if srcRegion != nil {
op, err := schedule.CreateMovePeerOperator("moveHotWriteRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(h.GetName(), "create_operator_fail").Inc()
return nil
}
schedulerCounter.WithLabelValues(h.GetName(), "move_peer").Inc()
return []*schedule.Operator{schedule.CreateMovePeerOperator("moveHotWriteRegion", cluster, srcRegion, schedule.OpHotRegion, srcPeer.GetStoreId(), destPeer.GetStoreId(), destPeer.GetId())}
return []*schedule.Operator{op}
}
case 1:
// balance by leader
Expand Down
6 changes: 5 additions & 1 deletion server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ func (s *shuffleRegionScheduler) Schedule(cluster schedule.Cluster, opInfluence
return nil
}

op, err := schedule.CreateMovePeerOperator("shuffle-region", cluster, region, schedule.OpAdmin, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create_operator_fail").Inc()
return nil
}
schedulerCounter.WithLabelValues(s.GetName(), "new_operator").Inc()
op := schedule.CreateMovePeerOperator("shuffle-region", cluster, region, schedule.OpAdmin, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
op.SetPriorityLevel(core.HighPriority)
return []*schedule.Operator{op}
}
8 changes: 8 additions & 0 deletions server/schedulers/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,11 @@ func CheckTransferPeer(c *check.C, op *schedule.Operator, kind schedule.Operator
kind |= schedule.OpRegion
c.Assert(op.Kind()&kind, check.Equals, kind)
}

// CheckTransferPeerWithLeaderTransfer checks if the operator is to transfer
// peer between the specified source and target stores and it meanwhile
// transfers the leader out of source store.
func CheckTransferPeerWithLeaderTransfer(c *check.C, op *schedule.Operator, kind schedule.OperatorKind, sourceID, targetID uint64) {
c.Assert(op.Len(), check.Equals, 3)
CheckTransferPeer(c, op, kind, sourceID, targetID)
}

0 comments on commit 7df2fde

Please sign in to comment.