Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker: replace down check with disconnect check when fixing orphan peer(#7294) #7295

Merged
merged 2 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,9 @@ var (
// tikv's store heartbeat for a short time, maybe caused by process restart or
// temporary network failure.
func (s *StoreInfo) IsDisconnected() bool {
if s == nil {
return true
}
return s.DownTime() > storeDisconnectDuration
}

Expand Down
60 changes: 37 additions & 23 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
if len(fit.OrphanPeers) == 0 {
return nil, nil
}
var pinDownPeer *metapb.Peer

isUnhealthyPeer := func(id uint64) bool {
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
Expand All @@ -404,31 +404,41 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
}
return false
}

isDisconnectedPeer := func(p *metapb.Peer) bool {
// avoid to meet down store when fix orphan peers,
// Isdisconnected is more strictly than IsUnhealthy.
return c.cluster.GetStore(p.GetStoreId()).IsDisconnected()
}

checkDownPeer := func(peers []*metapb.Peer) (*metapb.Peer, bool) {
for _, p := range peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
return p, true
}
return nil, true
}
if isDisconnectedPeer(p) {
return p, true
}
}
return nil, false
}

// remove orphan peers only when all rules are satisfied (count+role) and all peers selected
// by RuleFits is not pending or down.
var pinDownPeer *metapb.Peer
hasUnhealthyFit := false
loopFits:
for _, rf := range fit.RuleFits {
if !rf.IsSatisfied() {
hasUnhealthyFit = true
break
}
for _, p := range rf.Peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
pinDownPeer = p
}
hasUnhealthyFit = true
break loopFits
}
// avoid to meet down store when fix orpahn peers,
// Isdisconnected is more strictly than IsUnhealthy.
if c.cluster.GetStore(p.GetStoreId()).IsDisconnected() {
hasUnhealthyFit = true
pinDownPeer = p
break loopFits
}
pinDownPeer, hasUnhealthyFit = checkDownPeer(rf.Peers)
if hasUnhealthyFit {
break
}
}

Expand All @@ -445,15 +455,15 @@ loopFits:
continue
}
// make sure the orphan peer is healthy.
if isUnhealthyPeer(orphanPeer.GetId()) {
if isUnhealthyPeer(orphanPeer.GetId()) || isDisconnectedPeer(orphanPeer) {
continue
}
// no consider witness in this path.
if pinDownPeer.GetIsWitness() || orphanPeer.GetIsWitness() {
continue
}
// down peer's store should be down.
if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetStoreId()) {
// down peer's store should be disconnected
if !isDisconnectedPeer(pinDownPeer) {
continue
}
// check if down peer can replace with orphan peer.
Expand All @@ -468,7 +478,7 @@ loopFits:
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner:
return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Voter &&
c.cluster.GetStore(pinDownPeer.GetStoreId()).IsDisconnected() && !dstStore.IsDisconnected():
isDisconnectedPeer(pinDownPeer) && !dstStore.IsDisconnected():
return operator.CreateRemovePeerOperator("remove-replaced-orphan-peer", c.cluster, 0, region, pinDownPeer.GetStoreId())
default:
// destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now.
Expand All @@ -485,7 +495,11 @@ loopFits:
for _, orphanPeer := range fit.OrphanPeers {
if isUnhealthyPeer(orphanPeer.GetId()) {
checkerCounter.WithLabelValues("rule_checker", "remove-orphan-peer").Inc()
return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
return operator.CreateRemovePeerOperator("remove-unhealthy-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
}
if isDisconnectedPeer(orphanPeer) {
checkerCounter.WithLabelValues("rule_checker", "remove-orphan-peer").Inc()
return operator.CreateRemovePeerOperator("remove-disconnected-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
}
if hasHealthPeer {
// there already exists a healthy orphan peer, so we can remove other orphan Peers.
Expand Down
176 changes: 174 additions & 2 deletions server/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (suite *ruleCheckerTestSuite) TestFixToManyOrphanPeers() {
suite.cluster.PutRegion(region)
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-orphan-peer", op.Desc())
suite.Equal("remove-unhealthy-orphan-peer", op.Desc())
suite.Equal(uint64(4), op.Step(0).(operator.RemovePeer).FromStore)
}

Expand Down Expand Up @@ -663,7 +663,7 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.cluster.PutRegion(testRegion)
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-orphan-peer", op.Desc())
suite.Equal("remove-unhealthy-orphan-peer", op.Desc())
suite.IsType(remove, op.Step(0))
// Ref #3521
suite.cluster.SetStoreOffline(2)
Expand All @@ -684,6 +684,178 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.Equal("remove-orphan-peer", op.Desc())
}

// Ref https://github.com/tikv/pd/issues/7249 https://github.com/tikv/tikv/issues/15799
func (suite *ruleCheckerTestSuite) TestFixOrphanPeerWithDisconnectedStoreAndRuleChanged() {
// init cluster with 5 replicas
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
storeIDs := []uint64{1, 2, 3, 4, 5}
suite.cluster.AddLeaderRegionWithRange(1, "", "", storeIDs[0], storeIDs[1:]...)
rule := &placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 5,
StartKey: []byte{},
EndKey: []byte{},
}
suite.ruleManager.SetRule(rule)
op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Nil(op)

// set store 1, 2 to disconnected
suite.cluster.SetStoreDisconnect(1)
suite.cluster.SetStoreDisconnect(2)

// change rule to 3 replicas
rule = &placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 3,
StartKey: []byte{},
EndKey: []byte{},
Override: true,
}
suite.ruleManager.SetRule(rule)

// remove store 1 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 2)
newLeaderID := op.Step(0).(operator.TransferLeader).ToStore
removedPeerID := op.Step(1).(operator.RemovePeer).FromStore
r1 := suite.cluster.GetRegion(1)
r1 = r1.Clone(
core.WithLeader(r1.GetPeer(newLeaderID)),
core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 4)

// remove store 2 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 1)
removedPeerID = op.Step(0).(operator.RemovePeer).FromStore
r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 3)
for _, p := range r1.GetPeers() {
suite.NotEqual(p.GetStoreId(), 1)
suite.NotEqual(p.GetStoreId(), 2)
}
}

// Ref https://github.com/tikv/pd/issues/7249 https://github.com/tikv/tikv/issues/15799
func (suite *ruleCheckerTestSuite) TestFixOrphanPeerWithDisconnectedStoreAndRuleChanged2() {
// init cluster with 5 voters and 1 learner
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
suite.cluster.AddLabelsStore(6, 1, map[string]string{"host": "host6"})
storeIDs := []uint64{1, 2, 3, 4, 5}
suite.cluster.AddLeaderRegionWithRange(1, "", "", storeIDs[0], storeIDs[1:]...)
r1 := suite.cluster.GetRegion(1)
r1 = r1.Clone(core.WithAddPeer(&metapb.Peer{Id: 6, StoreId: 6, Role: metapb.PeerRole_Learner}))
suite.cluster.PutRegion(r1)
err := suite.ruleManager.SetRules([]*placement.Rule{
{
GroupID: "pd",
ID: "default",
Index: 100,
Override: true,
Role: placement.Voter,
Count: 5,
IsWitness: false,
},
{
GroupID: "pd",
ID: "r1",
Index: 100,
Override: false,
Role: placement.Learner,
Count: 1,
IsWitness: false,
},
})
suite.NoError(err)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Nil(op)

// set store 1, 2 to disconnected
suite.cluster.SetStoreDisconnect(1)
suite.cluster.SetStoreDisconnect(2)
suite.cluster.SetStoreDisconnect(3)

// change rule to 3 replicas
suite.ruleManager.DeleteRuleGroup("pd")
suite.ruleManager.SetRule(&placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 2,
StartKey: []byte{},
EndKey: []byte{},
Override: true,
})

// remove store 1 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 2)
newLeaderID := op.Step(0).(operator.TransferLeader).ToStore
removedPeerID := op.Step(1).(operator.RemovePeer).FromStore
r1 = suite.cluster.GetRegion(1)
r1 = r1.Clone(
core.WithLeader(r1.GetPeer(newLeaderID)),
core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 5)

// remove store 2 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 1)
removedPeerID = op.Step(0).(operator.RemovePeer).FromStore
r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 4)
for _, p := range r1.GetPeers() {
fmt.Println(p.GetStoreId(), p.Role.String())
}

// remove store 3 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
suite.Equal(op.Len(), 1)
removedPeerID = op.Step(0).(operator.RemovePeer).FromStore
r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 3)

for _, p := range r1.GetPeers() {
suite.NotEqual(p.GetStoreId(), 1)
suite.NotEqual(p.GetStoreId(), 2)
suite.NotEqual(p.GetStoreId(), 3)
}
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
Expand Down
Loading