Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: cherry pick some fixes to release 2.1 #1490

Merged
merged 2 commits into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/api/hot_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request)
bytesWriteStats := h.GetHotBytesWriteStores()
bytesReadStats := h.GetHotBytesReadStores()
keysWriteStats := h.GetHotKeysWriteStores()
keysReadStats := h.GetHotKeysWriteStores()
keysReadStats := h.GetHotKeysReadStores()

stats := hotStoreStats{
BytesWriteStats: bytesWriteStats,
Expand Down
8 changes: 7 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,14 @@ func (s *Server) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionR
region = core.NewRegionInfo(request.GetRegion(), request.GetLeader())
}

cluster.RLock()
defer cluster.RUnlock()
co := cluster.coordinator
if op := co.regionScatterer.Scatter(region); op != nil {
op, err := co.regionScatterer.Scatter(region)
if err != nil {
return nil, err
}
if op != nil {
co.addOperator(op)
}

Expand Down
6 changes: 5 additions & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,11 @@ func (h *Handler) AddScatterRegionOperator(regionID uint64) error {
return ErrRegionNotFound(regionID)
}

op := c.regionScatterer.Scatter(region)
op, err := c.regionScatterer.Scatter(region)
if err != nil {
return err
}

if op == nil {
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ func (mc *MockCluster) ApplyOperator(op *Operator) {
if region.GetStorePeer(s.FromStore) == nil {
panic("Remove peer that doesn't exist")
}
if region.GetLeader().GetStoreId() == s.FromStore {
panic("Cannot remove the leader peer")
}
region = region.Clone(core.WithRemoveStorePeer(s.FromStore))
case AddLearner:
if region.GetStorePeer(s.ToStore) != nil {
Expand Down
39 changes: 33 additions & 6 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,8 @@ func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, r
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
return nil, err
}
// CreateAddPeerSteps creates an OperatorStep list that add a new Peer.
func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep {
var st []OperatorStep
if cluster.IsRaftLearnerEnabled() {
st = []OperatorStep{
Expand All @@ -446,6 +442,16 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf
AddPeer{ToStore: newStore, PeerID: peerID},
}
}
return st
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
return nil, err
}
st := CreateAddPeerSteps(newStore, peerID, cluster)
steps = append(st, steps...)
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...), nil
}
Expand Down Expand Up @@ -623,3 +629,24 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct

return intersection
}

// CheckOperatorValid checks if the operator is valid.
func CheckOperatorValid(op *Operator) bool {
removeStores := []uint64{}
for _, step := range op.steps {
if tr, ok := step.(TransferLeader); ok {
for _, store := range removeStores {
if store == tr.FromStore {
return false
}
if store == tr.ToStore {
return false
}
}
}
if rp, ok := step.(RemovePeer); ok {
removeStores = append(removeStores, rp.FromStore)
}
}
return true
}
104 changes: 86 additions & 18 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pkg/errors"
)

type selectedStores struct {
Expand Down Expand Up @@ -78,23 +79,28 @@ func NewRegionScatterer(cluster Cluster, classifier namespace.Classifier) *Regio
}

// Scatter relocates the region.
func (r *RegionScatterer) Scatter(region *core.RegionInfo) *Operator {
func (r *RegionScatterer) Scatter(region *core.RegionInfo) (*Operator, error) {
if r.cluster.IsRegionHot(region.GetID()) {
return nil
return nil, errors.Errorf("region %d is a hot region", region.GetID())
}

if len(region.GetPeers()) != r.cluster.GetMaxReplicas() {
return nil
return nil, errors.Errorf("the number replicas of region %d is not expected", region.GetID())
}

if region.GetLeader() == nil {
return nil, errors.Errorf("region %d has no leader", region.GetID())
}

return r.scatterRegion(region)
return r.scatterRegion(region), nil
}

func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {
steps := make([]OperatorStep, 0, len(region.GetPeers()))

stores := r.collectAvailableStores(region)
var kind OperatorKind
var (
targetPeers []*metapb.Peer
replacedPeers []*metapb.Peer
)
for _, peer := range region.GetPeers() {
if len(stores) == 0 {
// Reset selected stores if we have no available stores.
Expand All @@ -104,31 +110,93 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {

if r.selected.put(peer.GetStoreId()) {
delete(stores, peer.GetStoreId())
targetPeers = append(targetPeers, peer)
replacedPeers = append(replacedPeers, peer)
continue
}
newPeer := r.selectPeerToReplace(stores, region, peer)
if newPeer == nil {
targetPeers = append(targetPeers, peer)
replacedPeers = append(replacedPeers, peer)
continue
}

// Remove it from stores and mark it as selected.
delete(stores, newPeer.GetStoreId())
r.selected.put(newPeer.GetStoreId())
targetPeers = append(targetPeers, newPeer)
replacedPeers = append(replacedPeers, peer)
}
return r.createOperator(region, replacedPeers, targetPeers)
}

op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
continue
func (r *RegionScatterer) createOperator(origin *core.RegionInfo, replacedPeers, targetPeers []*metapb.Peer) *Operator {
// Randomly pick a leader
i := rand.Intn(len(targetPeers))
targetLeaderPeer := targetPeers[i]
originLeaderStoreID := origin.GetLeader().GetStoreId()

originStoreIDs := origin.GetStoreIds()
steps := make([]OperatorStep, 0, len(targetPeers)*3+1)
// deferSteps will append to the end of the steps
deferSteps := make([]OperatorStep, 0, 5)
var kind OperatorKind
sameLeader := targetLeaderPeer.GetStoreId() == originLeaderStoreID
// No need to do anything
if sameLeader {
isSame := true
for _, peer := range targetPeers {
if _, ok := originStoreIDs[peer.GetStoreId()]; !ok {
isSame = false
break
}
}
if isSame {
return nil
}
steps = append(steps, op.steps...)
steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()})
kind |= op.Kind()
}

if len(steps) == 0 {
return nil
// Creates the first step
if _, ok := originStoreIDs[targetLeaderPeer.GetStoreId()]; !ok {
st := CreateAddPeerSteps(targetLeaderPeer.GetStoreId(), targetLeaderPeer.GetId(), r.cluster)
steps = append(steps, st...)
// Do not transfer leader to the newly added peer
// Ref: https://github.com/tikv/tikv/issues/3819
deferSteps = append(deferSteps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()})
deferSteps = append(deferSteps, RemovePeer{FromStore: replacedPeers[i].GetStoreId()})
kind |= OpLeader
kind |= OpRegion
} else {
if !sameLeader {
steps = append(steps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()})
kind |= OpLeader
}
}

// For the other steps
for j, peer := range targetPeers {
if peer.GetId() == targetLeaderPeer.GetId() {
continue
}
if _, ok := originStoreIDs[peer.GetStoreId()]; ok {
continue
}
if replacedPeers[j].GetStoreId() == originLeaderStoreID {
st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster)
st = append(st, RemovePeer{FromStore: replacedPeers[j].GetStoreId()})
deferSteps = append(deferSteps, st...)
kind |= OpRegion | OpLeader
continue
}
st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster)
steps = append(steps, st...)
steps = append(steps, RemovePeer{FromStore: replacedPeers[j].GetStoreId()})
kind |= OpRegion
}
return NewOperator("scatter-region", region.GetID(), region.GetRegionEpoch(), kind, steps...)

steps = append(steps, deferSteps...)
op := NewOperator("scatter-region", origin.GetID(), origin.GetRegionEpoch(), kind, steps...)
op.SetPriorityLevel(core.HighPriority)
return op
}

func (r *RegionScatterer) selectPeerToReplace(stores map[uint64]*core.StoreInfo, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer {
Expand Down
12 changes: 8 additions & 4 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
log "github.com/sirupsen/logrus"
)

var _ = Suite(&testShuffleLeaderSuite{})
Expand Down Expand Up @@ -175,6 +174,10 @@ func (s *testScatterRegionSuite) TestFiveStores(c *C) {
s.scatter(c, 5, 5)
}

func (s *testScatterRegionSuite) checkOperator(op *schedule.Operator, c *C) {
c.Assert(schedule.CheckOperatorValid(op), IsTrue)
}

func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
opt := schedule.NewMockSchedulerOptions()
tc := schedule.NewMockCluster(opt)
Expand All @@ -185,7 +188,8 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
}

// Add regions 1~4.
seq := newSequencer(numStores)
seq := newSequencer(3)
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
for i := uint64(1); i <= numRegions; i++ {
tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next())
}
Expand All @@ -194,8 +198,8 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {

for i := uint64(1); i <= numRegions; i++ {
region := tc.GetRegion(i)
if op := scatterer.Scatter(region); op != nil {
log.Info(op)
if op, _ := scatterer.Scatter(region); op != nil {
s.checkOperator(op, c)
tc.ApplyOperator(op)
}
}
Expand Down