Skip to content

Commit

Permalink
simulator: support change peer v2 (#5609)
Browse files Browse the repository at this point in the history
close #5469

Signed-off-by: HunDunDM <hundundm@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HunDunDM and ti-chi-bot authored Oct 27, 2022
1 parent d033fbf commit 7aba282
Show file tree
Hide file tree
Showing 6 changed files with 404 additions and 313 deletions.
4 changes: 2 additions & 2 deletions server/api/trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestTrend(t *testing.T) {
newPeerID := op.Step(0).(operator.AddLearner).PeerID
region5 = region5.Clone(core.WithAddPeer(&metapb.Peer{Id: newPeerID, StoreId: 3, Role: metapb.PeerRole_Learner}), core.WithIncConfVer())
mustRegionHeartbeat(re, svr, region5)
region5 = region5.Clone(core.WithPromoteLearner(newPeerID), core.WithRemoveStorePeer(2), core.WithIncConfVer())
region5 = region5.Clone(core.WithRole(newPeerID, metapb.PeerRole_Voter), core.WithRemoveStorePeer(2), core.WithIncConfVer())
mustRegionHeartbeat(re, svr, region5)

op, err = svr.GetHandler().GetOperator(6)
Expand All @@ -71,7 +71,7 @@ func TestTrend(t *testing.T) {
newPeerID = op.Step(0).(operator.AddLearner).PeerID
region6 = region6.Clone(core.WithAddPeer(&metapb.Peer{Id: newPeerID, StoreId: 3, Role: metapb.PeerRole_Learner}), core.WithIncConfVer())
mustRegionHeartbeat(re, svr, region6)
region6 = region6.Clone(core.WithPromoteLearner(newPeerID), core.WithLeader(region6.GetStorePeer(2)), core.WithRemoveStorePeer(1), core.WithIncConfVer())
region6 = region6.Clone(core.WithRole(newPeerID, metapb.PeerRole_Voter), core.WithLeader(region6.GetStorePeer(2)), core.WithRemoveStorePeer(1), core.WithIncConfVer())
mustRegionHeartbeat(re, svr, region6)

var trend Trend
Expand Down
6 changes: 3 additions & 3 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,12 @@ func WithAddPeer(peer *metapb.Peer) RegionCreateOption {
}
}

// WithPromoteLearner promotes the learner.
func WithPromoteLearner(peerID uint64) RegionCreateOption {
// WithRole changes the role.
func WithRole(peerID uint64, role metapb.PeerRole) RegionCreateOption {
return func(region *RegionInfo) {
for _, p := range region.GetPeers() {
if p.GetId() == peerID {
p.Role = metapb.PeerRole_Voter
p.Role = role
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() {
suite.Equal(2, stream.MsgLength())

region4 := region3.Clone(
core.WithPromoteLearner(3),
core.WithRole(3, metapb.PeerRole_Voter),
core.WithIncConfVer(),
)
suite.True(steps[1].IsFinish(region4))
Expand Down
11 changes: 5 additions & 6 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Node struct {
stats *info.StoreStats
tick uint64
wg sync.WaitGroup
tasks map[uint64]Task
tasks map[uint64]*Task
client Client
receiveRegionHeartbeatCh <-chan *pdpb.RegionHeartbeatResponse
ctx context.Context
Expand Down Expand Up @@ -99,7 +99,7 @@ func NewNode(s *cases.Store, pdAddr string, config *SimConfig) (*Node, error) {
client: client,
ctx: ctx,
cancel: cancel,
tasks: make(map[uint64]Task),
tasks: make(map[uint64]*Task),
receiveRegionHeartbeatCh: receiveRegionHeartbeatCh,
limiter: ratelimit.NewRateLimiter(float64(speed), int(speed)),
tick: uint64(rand.Intn(storeHeartBeatPeriod)),
Expand All @@ -125,7 +125,7 @@ func (n *Node) receiveRegionHeartbeat() {
for {
select {
case resp := <-n.receiveRegionHeartbeatCh:
task := responseToTask(resp, n.raftEngine)
task := responseToTask(n.raftEngine, resp)
if task != nil {
n.AddTask(task)
}
Expand Down Expand Up @@ -156,8 +156,7 @@ func (n *Node) stepTask() {
n.Lock()
defer n.Unlock()
for _, task := range n.tasks {
task.Step(n.raftEngine)
if task.IsFinished() {
if isFinished := task.Step(n.raftEngine); isFinished {
simutil.Logger.Debug("task status",
zap.Uint64("node-id", n.Id),
zap.Uint64("region-id", task.RegionID()),
Expand Down Expand Up @@ -246,7 +245,7 @@ func (n *Node) reportRegionChange() {
}

// AddTask adds task in this node.
func (n *Node) AddTask(task Task) {
func (n *Node) AddTask(task *Task) {
n.Lock()
defer n.Unlock()
if t, ok := n.tasks[task.RegionID()]; ok {
Expand Down
40 changes: 25 additions & 15 deletions tools/pd-simulator/simulator/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ import (

type taskStatistics struct {
syncutil.RWMutex
addPeer map[uint64]int
addVoter map[uint64]int
removePeer map[uint64]int
addLearner map[uint64]int
promoteLeaner map[uint64]int
demoteVoter map[uint64]int
transferLeader map[uint64]map[uint64]int
mergeRegion int
}

func newTaskStatistics() *taskStatistics {
return &taskStatistics{
addPeer: make(map[uint64]int),
addVoter: make(map[uint64]int),
removePeer: make(map[uint64]int),
addLearner: make(map[uint64]int),
promoteLeaner: make(map[uint64]int),
demoteVoter: make(map[uint64]int),
transferLeader: make(map[uint64]map[uint64]int),
}
}
Expand All @@ -45,10 +47,11 @@ func (t *taskStatistics) getStatistics() map[string]int {
t.RLock()
defer t.RUnlock()
stats := make(map[string]int)
addPeer := getSum(t.addPeer)
addVoter := getSum(t.addVoter)
removePeer := getSum(t.removePeer)
addLearner := getSum(t.addLearner)
promoteLeaner := getSum(t.promoteLeaner)
promoteLearner := getSum(t.promoteLeaner)
demoteVoter := getSum(t.demoteVoter)

var transferLeader int
for _, to := range t.transferLeader {
Expand All @@ -57,34 +60,41 @@ func (t *taskStatistics) getStatistics() map[string]int {
}
}

stats["Add Peer (task)"] = addPeer
stats["Add Voter (task)"] = addVoter
stats["Remove Peer (task)"] = removePeer
stats["Add Learner (task)"] = addLearner
stats["Promote Learner (task)"] = promoteLeaner
stats["Promote Learner (task)"] = promoteLearner
stats["Demote Voter (task)"] = demoteVoter
stats["Transfer Leader (task)"] = transferLeader
stats["Merge Region (task)"] = t.mergeRegion

return stats
}

func (t *taskStatistics) incAddPeer(regionID uint64) {
func (t *taskStatistics) incAddVoter(regionID uint64) {
t.Lock()
defer t.Unlock()
t.addPeer[regionID]++
t.addVoter[regionID]++
}

func (t *taskStatistics) incAddLeaner(regionID uint64) {
func (t *taskStatistics) incAddLearner(regionID uint64) {
t.Lock()
defer t.Unlock()
t.addLearner[regionID]++
}

func (t *taskStatistics) incPromoteLeaner(regionID uint64) {
func (t *taskStatistics) incPromoteLearner(regionID uint64) {
t.Lock()
defer t.Unlock()
t.promoteLeaner[regionID]++
}

func (t *taskStatistics) incDemoteVoter(regionID uint64) {
t.Lock()
defer t.Unlock()
t.demoteVoter[regionID]++
}

func (t *taskStatistics) incRemovePeer(regionID uint64) {
t.Lock()
defer t.Unlock()
Expand All @@ -97,16 +107,16 @@ func (t *taskStatistics) incMergeRegion() {
t.mergeRegion++
}

func (t *taskStatistics) incTransferLeader(fromPeerID, toPeerID uint64) {
func (t *taskStatistics) incTransferLeader(fromPeerStoreID, toPeerStoreID uint64) {
t.Lock()
defer t.Unlock()
_, ok := t.transferLeader[fromPeerID]
_, ok := t.transferLeader[fromPeerStoreID]
if ok {
t.transferLeader[fromPeerID][toPeerID]++
t.transferLeader[fromPeerStoreID][toPeerStoreID]++
} else {
m := make(map[uint64]int)
m[toPeerID]++
t.transferLeader[fromPeerID] = m
m[toPeerStoreID]++
t.transferLeader[fromPeerStoreID] = m
}
}

Expand Down
Loading

0 comments on commit 7aba282

Please sign in to comment.