diff --git a/server/grpc_service.go b/server/grpc_service.go index 7639564ea11..5d5931da738 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -590,8 +590,14 @@ func (s *Server) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionR region = core.NewRegionInfo(request.GetRegion(), request.GetLeader()) } + cluster.RLock() + defer cluster.RUnlock() co := cluster.coordinator - if op := co.regionScatterer.Scatter(region); op != nil { + op, err := co.regionScatterer.Scatter(region) + if err != nil { + return nil, err + } + if op != nil { co.addOperator(op) } diff --git a/server/handler.go b/server/handler.go index 656c8178900..244258bff82 100644 --- a/server/handler.go +++ b/server/handler.go @@ -575,7 +575,11 @@ func (h *Handler) AddScatterRegionOperator(regionID uint64) error { return ErrRegionNotFound(regionID) } - op := c.regionScatterer.Scatter(region) + op, err := c.regionScatterer.Scatter(region) + if err != nil { + return err + } + if op == nil { return nil } diff --git a/server/schedule/mockcluster.go b/server/schedule/mockcluster.go index d6d78164f7c..58d8ff5ac03 100644 --- a/server/schedule/mockcluster.go +++ b/server/schedule/mockcluster.go @@ -338,6 +338,9 @@ func (mc *MockCluster) ApplyOperator(op *Operator) { if region.GetStorePeer(s.FromStore) == nil { panic("Remove peer that doesn't exist") } + if region.GetLeader().GetStoreId() == s.FromStore { + panic("Cannot remove the leader peer") + } region = region.Clone(core.WithRemoveStorePeer(s.FromStore)) case AddLearner: if region.GetStorePeer(s.ToStore) != nil { diff --git a/server/schedule/operator.go b/server/schedule/operator.go index d44171cd663..80da4954b16 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -429,12 +429,8 @@ func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, r return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil } -// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer. -func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) { - removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore)) - if err != nil { - return nil, err - } +// CreateAddPeerSteps creates an OperatorStep list that add a new Peer. +func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep { var st []OperatorStep if cluster.IsRaftLearnerEnabled() { st = []OperatorStep{ @@ -446,6 +442,16 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf AddPeer{ToStore: newStore, PeerID: peerID}, } } + return st +} + +// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer. +func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) { + removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore)) + if err != nil { + return nil, err + } + st := CreateAddPeerSteps(newStore, peerID, cluster) steps = append(st, steps...) return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...), nil } @@ -623,3 +629,24 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct return intersection } + +// CheckOperatorValid checks if the operator is valid. +func CheckOperatorValid(op *Operator) bool { + removeStores := []uint64{} + for _, step := range op.steps { + if tr, ok := step.(TransferLeader); ok { + for _, store := range removeStores { + if store == tr.FromStore { + return false + } + if store == tr.ToStore { + return false + } + } + } + if rp, ok := step.(RemovePeer); ok { + removeStores = append(removeStores, rp.FromStore) + } + } + return true +} diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 1e94d6096ae..4aea1397747 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" + "github.com/pkg/errors" ) type selectedStores struct { @@ -78,23 +79,28 @@ func NewRegionScatterer(cluster Cluster, classifier namespace.Classifier) *Regio } // Scatter relocates the region. -func (r *RegionScatterer) Scatter(region *core.RegionInfo) *Operator { +func (r *RegionScatterer) Scatter(region *core.RegionInfo) (*Operator, error) { if r.cluster.IsRegionHot(region.GetID()) { - return nil + return nil, errors.Errorf("region %d is a hot region", region.GetID()) } if len(region.GetPeers()) != r.cluster.GetMaxReplicas() { - return nil + return nil, errors.Errorf("the number replicas of region %d is not expected", region.GetID()) + } + + if region.GetLeader() == nil { + return nil, errors.Errorf("region %d has no leader", region.GetID()) } - return r.scatterRegion(region) + return r.scatterRegion(region), nil } func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator { - steps := make([]OperatorStep, 0, len(region.GetPeers())) - stores := r.collectAvailableStores(region) - var kind OperatorKind + var ( + targetPeers []*metapb.Peer + replacedPeers []*metapb.Peer + ) for _, peer := range region.GetPeers() { if len(stores) == 0 { // Reset selected stores if we have no available stores. @@ -104,31 +110,93 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator { if r.selected.put(peer.GetStoreId()) { delete(stores, peer.GetStoreId()) + targetPeers = append(targetPeers, peer) + replacedPeers = append(replacedPeers, peer) continue } newPeer := r.selectPeerToReplace(stores, region, peer) if newPeer == nil { + targetPeers = append(targetPeers, peer) + replacedPeers = append(replacedPeers, peer) continue } - // Remove it from stores and mark it as selected. delete(stores, newPeer.GetStoreId()) r.selected.put(newPeer.GetStoreId()) + targetPeers = append(targetPeers, newPeer) + replacedPeers = append(replacedPeers, peer) + } + return r.createOperator(region, replacedPeers, targetPeers) +} - op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin, - peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId()) - if err != nil { - continue +func (r *RegionScatterer) createOperator(origin *core.RegionInfo, replacedPeers, targetPeers []*metapb.Peer) *Operator { + // Randomly pick a leader + i := rand.Intn(len(targetPeers)) + targetLeaderPeer := targetPeers[i] + originLeaderStoreID := origin.GetLeader().GetStoreId() + + originStoreIDs := origin.GetStoreIds() + steps := make([]OperatorStep, 0, len(targetPeers)*3+1) + // deferSteps will append to the end of the steps + deferSteps := make([]OperatorStep, 0, 5) + var kind OperatorKind + sameLeader := targetLeaderPeer.GetStoreId() == originLeaderStoreID + // No need to do anything + if sameLeader { + isSame := true + for _, peer := range targetPeers { + if _, ok := originStoreIDs[peer.GetStoreId()]; !ok { + isSame = false + break + } + } + if isSame { + return nil } - steps = append(steps, op.steps...) - steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()}) - kind |= op.Kind() } - if len(steps) == 0 { - return nil + // Creates the first step + if _, ok := originStoreIDs[targetLeaderPeer.GetStoreId()]; !ok { + st := CreateAddPeerSteps(targetLeaderPeer.GetStoreId(), targetLeaderPeer.GetId(), r.cluster) + steps = append(steps, st...) + // Do not transfer leader to the newly added peer + // Ref: https://github.com/tikv/tikv/issues/3819 + deferSteps = append(deferSteps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()}) + deferSteps = append(deferSteps, RemovePeer{FromStore: replacedPeers[i].GetStoreId()}) + kind |= OpLeader + kind |= OpRegion + } else { + if !sameLeader { + steps = append(steps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()}) + kind |= OpLeader + } + } + + // For the other steps + for j, peer := range targetPeers { + if peer.GetId() == targetLeaderPeer.GetId() { + continue + } + if _, ok := originStoreIDs[peer.GetStoreId()]; ok { + continue + } + if replacedPeers[j].GetStoreId() == originLeaderStoreID { + st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster) + st = append(st, RemovePeer{FromStore: replacedPeers[j].GetStoreId()}) + deferSteps = append(deferSteps, st...) + kind |= OpRegion | OpLeader + continue + } + st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster) + steps = append(steps, st...) + steps = append(steps, RemovePeer{FromStore: replacedPeers[j].GetStoreId()}) + kind |= OpRegion } - return NewOperator("scatter-region", region.GetID(), region.GetRegionEpoch(), kind, steps...) + + steps = append(steps, deferSteps...) + op := NewOperator("scatter-region", origin.GetID(), origin.GetRegionEpoch(), kind, steps...) + op.SetPriorityLevel(core.HighPriority) + return op } func (r *RegionScatterer) selectPeerToReplace(stores map[uint64]*core.StoreInfo, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer { diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 3023797d6c0..d6db996b19b 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" - log "github.com/sirupsen/logrus" ) var _ = Suite(&testShuffleLeaderSuite{}) @@ -175,6 +174,10 @@ func (s *testScatterRegionSuite) TestFiveStores(c *C) { s.scatter(c, 5, 5) } +func (s *testScatterRegionSuite) checkOperator(op *schedule.Operator, c *C) { + c.Assert(schedule.CheckOperatorValid(op), IsTrue) +} + func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { opt := schedule.NewMockSchedulerOptions() tc := schedule.NewMockCluster(opt) @@ -185,7 +188,8 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { } // Add regions 1~4. - seq := newSequencer(numStores) + seq := newSequencer(3) + // Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace. for i := uint64(1); i <= numRegions; i++ { tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next()) } @@ -194,8 +198,8 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { for i := uint64(1); i <= numRegions; i++ { region := tc.GetRegion(i) - if op := scatterer.Scatter(region); op != nil { - log.Info(op) + if op, _ := scatterer.Scatter(region); op != nil { + s.checkOperator(op, c) tc.ApplyOperator(op) } }