Skip to content

Commit

Permalink
scheduler: fix region scatter may transfer leader to removed peer (ti…
Browse files Browse the repository at this point in the history
…kv#1482)

* scheduler: fix region scatter may transfer leader to removed peer

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Apr 2, 2019
1 parent fe966a6 commit 641da3a
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 30 deletions.
8 changes: 7 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,14 @@ func (s *Server) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionR
region = core.NewRegionInfo(request.GetRegion(), request.GetLeader())
}

cluster.RLock()
defer cluster.RUnlock()
co := cluster.coordinator
if op := co.regionScatterer.Scatter(region); op != nil {
op, err := co.regionScatterer.Scatter(region)
if err != nil {
return nil, err
}
if op != nil {
co.addOperator(op)
}

Expand Down
6 changes: 5 additions & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,11 @@ func (h *Handler) AddScatterRegionOperator(regionID uint64) error {
return ErrRegionNotFound(regionID)
}

op := c.regionScatterer.Scatter(region)
op, err := c.regionScatterer.Scatter(region)
if err != nil {
return err
}

if op == nil {
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ func (mc *MockCluster) ApplyOperator(op *Operator) {
if region.GetStorePeer(s.FromStore) == nil {
panic("Remove peer that doesn't exist")
}
if region.GetLeader().GetStoreId() == s.FromStore {
panic("Cannot remove the leader peer")
}
region = region.Clone(core.WithRemoveStorePeer(s.FromStore))
case AddLearner:
if region.GetStorePeer(s.ToStore) != nil {
Expand Down
39 changes: 33 additions & 6 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,8 @@ func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, r
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
return nil, err
}
// CreateAddPeerSteps creates an OperatorStep list that add a new Peer.
func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep {
var st []OperatorStep
if cluster.IsRaftLearnerEnabled() {
st = []OperatorStep{
Expand All @@ -446,6 +442,16 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf
AddPeer{ToStore: newStore, PeerID: peerID},
}
}
return st
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
return nil, err
}
st := CreateAddPeerSteps(newStore, peerID, cluster)
steps = append(st, steps...)
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...), nil
}
Expand Down Expand Up @@ -623,3 +629,24 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct

return intersection
}

// CheckOperatorValid checks if the operator is valid.
func CheckOperatorValid(op *Operator) bool {
removeStores := []uint64{}
for _, step := range op.steps {
if tr, ok := step.(TransferLeader); ok {
for _, store := range removeStores {
if store == tr.FromStore {
return false
}
if store == tr.ToStore {
return false
}
}
}
if rp, ok := step.(RemovePeer); ok {
removeStores = append(removeStores, rp.FromStore)
}
}
return true
}
104 changes: 86 additions & 18 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pkg/errors"
)

type selectedStores struct {
Expand Down Expand Up @@ -78,23 +79,28 @@ func NewRegionScatterer(cluster Cluster, classifier namespace.Classifier) *Regio
}

// Scatter relocates the region.
func (r *RegionScatterer) Scatter(region *core.RegionInfo) *Operator {
func (r *RegionScatterer) Scatter(region *core.RegionInfo) (*Operator, error) {
if r.cluster.IsRegionHot(region.GetID()) {
return nil
return nil, errors.Errorf("region %d is a hot region", region.GetID())
}

if len(region.GetPeers()) != r.cluster.GetMaxReplicas() {
return nil
return nil, errors.Errorf("the number replicas of region %d is not expected", region.GetID())
}

if region.GetLeader() == nil {
return nil, errors.Errorf("region %d has no leader", region.GetID())
}

return r.scatterRegion(region)
return r.scatterRegion(region), nil
}

func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {
steps := make([]OperatorStep, 0, len(region.GetPeers()))

stores := r.collectAvailableStores(region)
var kind OperatorKind
var (
targetPeers []*metapb.Peer
replacedPeers []*metapb.Peer
)
for _, peer := range region.GetPeers() {
if len(stores) == 0 {
// Reset selected stores if we have no available stores.
Expand All @@ -104,31 +110,93 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {

if r.selected.put(peer.GetStoreId()) {
delete(stores, peer.GetStoreId())
targetPeers = append(targetPeers, peer)
replacedPeers = append(replacedPeers, peer)
continue
}
newPeer := r.selectPeerToReplace(stores, region, peer)
if newPeer == nil {
targetPeers = append(targetPeers, peer)
replacedPeers = append(replacedPeers, peer)
continue
}

// Remove it from stores and mark it as selected.
delete(stores, newPeer.GetStoreId())
r.selected.put(newPeer.GetStoreId())
targetPeers = append(targetPeers, newPeer)
replacedPeers = append(replacedPeers, peer)
}
return r.createOperator(region, replacedPeers, targetPeers)
}

op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
continue
func (r *RegionScatterer) createOperator(origin *core.RegionInfo, replacedPeers, targetPeers []*metapb.Peer) *Operator {
// Randomly pick a leader
i := rand.Intn(len(targetPeers))
targetLeaderPeer := targetPeers[i]
originLeaderStoreID := origin.GetLeader().GetStoreId()

originStoreIDs := origin.GetStoreIds()
steps := make([]OperatorStep, 0, len(targetPeers)*3+1)
// deferSteps will append to the end of the steps
deferSteps := make([]OperatorStep, 0, 5)
var kind OperatorKind
sameLeader := targetLeaderPeer.GetStoreId() == originLeaderStoreID
// No need to do anything
if sameLeader {
isSame := true
for _, peer := range targetPeers {
if _, ok := originStoreIDs[peer.GetStoreId()]; !ok {
isSame = false
break
}
}
if isSame {
return nil
}
steps = append(steps, op.steps...)
steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()})
kind |= op.Kind()
}

if len(steps) == 0 {
return nil
// Creates the first step
if _, ok := originStoreIDs[targetLeaderPeer.GetStoreId()]; !ok {
st := CreateAddPeerSteps(targetLeaderPeer.GetStoreId(), targetLeaderPeer.GetId(), r.cluster)
steps = append(steps, st...)
// Do not transfer leader to the newly added peer
// Ref: https://github.com/tikv/tikv/issues/3819
deferSteps = append(deferSteps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()})
deferSteps = append(deferSteps, RemovePeer{FromStore: replacedPeers[i].GetStoreId()})
kind |= OpLeader
kind |= OpRegion
} else {
if !sameLeader {
steps = append(steps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()})
kind |= OpLeader
}
}

// For the other steps
for j, peer := range targetPeers {
if peer.GetId() == targetLeaderPeer.GetId() {
continue
}
if _, ok := originStoreIDs[peer.GetStoreId()]; ok {
continue
}
if replacedPeers[j].GetStoreId() == originLeaderStoreID {
st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster)
st = append(st, RemovePeer{FromStore: replacedPeers[j].GetStoreId()})
deferSteps = append(deferSteps, st...)
kind |= OpRegion | OpLeader
continue
}
st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster)
steps = append(steps, st...)
steps = append(steps, RemovePeer{FromStore: replacedPeers[j].GetStoreId()})
kind |= OpRegion
}
return NewOperator("scatter-region", region.GetID(), region.GetRegionEpoch(), kind, steps...)

steps = append(steps, deferSteps...)
op := NewOperator("scatter-region", origin.GetID(), origin.GetRegionEpoch(), kind, steps...)
op.SetPriorityLevel(core.HighPriority)
return op
}

func (r *RegionScatterer) selectPeerToReplace(stores map[uint64]*core.StoreInfo, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer {
Expand Down
12 changes: 8 additions & 4 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
log "github.com/sirupsen/logrus"
)

var _ = Suite(&testShuffleLeaderSuite{})
Expand Down Expand Up @@ -175,6 +174,10 @@ func (s *testScatterRegionSuite) TestFiveStores(c *C) {
s.scatter(c, 5, 5)
}

func (s *testScatterRegionSuite) checkOperator(op *schedule.Operator, c *C) {
c.Assert(schedule.CheckOperatorValid(op), IsTrue)
}

func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
opt := schedule.NewMockSchedulerOptions()
tc := schedule.NewMockCluster(opt)
Expand All @@ -185,7 +188,8 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
}

// Add regions 1~4.
seq := newSequencer(numStores)
seq := newSequencer(3)
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
for i := uint64(1); i <= numRegions; i++ {
tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next())
}
Expand All @@ -194,8 +198,8 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {

for i := uint64(1); i <= numRegions; i++ {
region := tc.GetRegion(i)
if op := scatterer.Scatter(region); op != nil {
log.Info(op)
if op, _ := scatterer.Scatter(region); op != nil {
s.checkOperator(op, c)
tc.ApplyOperator(op)
}
}
Expand Down

0 comments on commit 641da3a

Please sign in to comment.