Skip to content

Commit

Permalink
simulator: make heartbeat more reasonable (#1418)
Browse files Browse the repository at this point in the history
* make heartbeat more reasonable

Signed-off-by: rleungx <rleungx@gmail.com>
  • Loading branch information
rleungx authored and nolouch committed Mar 7, 2019
1 parent 408348e commit 2fe8598
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 20 deletions.
6 changes: 5 additions & 1 deletion tools/pd-simulator/simulator/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package simulator

import (
"context"
"sync"

"go.uber.org/zap"

Expand All @@ -27,6 +28,7 @@ import (

// Driver promotes the cluster status change.
type Driver struct {
wg sync.WaitGroup
pdAddr string
simCase *cases.Case
client Client
Expand Down Expand Up @@ -106,8 +108,10 @@ func (d *Driver) Tick() {
d.eventRunner.Tick(d.tickCount)
for _, n := range d.conn.Nodes {
n.reportRegionChange()
n.Tick()
d.wg.Add(1)
go n.Tick(&d.wg)
}
d.wg.Wait()
}

// Check checks if the simulation is completed.
Expand Down
10 changes: 7 additions & 3 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package simulator
import (
"context"
"fmt"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -79,6 +80,7 @@ func NewNode(s *cases.Store, pdAddr string, ioRate int64) (*Node, error) {
tasks: make(map[uint64]Task),
receiveRegionHeartbeatCh: receiveRegionHeartbeatCh,
ioRate: ioRate * cases.MB,
tick: uint64(rand.Intn(storeHeartBeatPeriod)),
}, nil
}

Expand Down Expand Up @@ -112,7 +114,8 @@ func (n *Node) receiveRegionHeartbeat() {
}

// Tick steps node status change.
func (n *Node) Tick() {
func (n *Node) Tick(wg *sync.WaitGroup) {
defer wg.Done()
if n.GetState() != metapb.StoreState_Up {
return
}
Expand Down Expand Up @@ -185,7 +188,8 @@ func (n *Node) regionHeartBeat() {
}

func (n *Node) reportRegionChange() {
for _, regionID := range n.raftEngine.regionChange[n.Id] {
regionIDs := n.raftEngine.GetRegionChange(n.Id)
for _, regionID := range regionIDs {
region := n.raftEngine.GetRegion(regionID)
ctx, cancel := context.WithTimeout(n.ctx, pdTimeout)
err := n.client.RegionHeartbeat(ctx, region)
Expand All @@ -195,9 +199,9 @@ func (n *Node) reportRegionChange() {
zap.Uint64("region-id", region.GetID()),
zap.Error(err))
}
n.raftEngine.ResetRegionChange(n.Id, regionID)
cancel()
}
delete(n.raftEngine.regionChange, n.Id)
}

// AddTask adds task in this node.
Expand Down
22 changes: 21 additions & 1 deletion tools/pd-simulator/simulator/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,33 @@ func (r *RaftEngine) electNewLeader(region *core.RegionInfo) *metapb.Peer {
return nil
}

// GetRegion returns the RegionInfo with regionID
// GetRegion returns the RegionInfo with regionID.
func (r *RaftEngine) GetRegion(regionID uint64) *core.RegionInfo {
r.RLock()
defer r.RUnlock()
return r.regionsInfo.GetRegion(regionID)
}

// GetRegionChange returns a list of RegionID for a given store.
func (r *RaftEngine) GetRegionChange(storeID uint64) []uint64 {
r.RLock()
defer r.RUnlock()
return r.regionChange[storeID]
}

// ResetRegionChange resets RegionInfo on a specific store with a given Region ID
func (r *RaftEngine) ResetRegionChange(storeID uint64, regionID uint64) {
r.Lock()
defer r.Unlock()
regionIDs := r.regionChange[storeID]
for i, id := range regionIDs {
if id == regionID {
r.regionChange[storeID] = append(r.regionChange[storeID][:i], r.regionChange[storeID][i+1:]...)
return
}
}
}

// GetRegions gets all RegionInfo from regionMap
func (r *RaftEngine) GetRegions() []*core.RegionInfo {
r.RLock()
Expand Down
62 changes: 62 additions & 0 deletions tools/pd-simulator/simulator/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package simulator
import (
"fmt"
"math"
"sync"
)

type taskStatistics struct {
sync.RWMutex
addPeer map[uint64]int
removePeer map[uint64]int
addLearner map[uint64]int
Expand All @@ -38,6 +40,8 @@ func newTaskStatistics() *taskStatistics {
}

func (t *taskStatistics) getStatistics() map[string]int {
t.RLock()
defer t.RUnlock()
stats := make(map[string]int)
addpeer := getSum(t.addPeer)
removePeer := getSum(t.removePeer)
Expand All @@ -61,7 +65,51 @@ func (t *taskStatistics) getStatistics() map[string]int {
return stats
}

func (t *taskStatistics) incAddPeer(regionID uint64) {
t.Lock()
defer t.Unlock()
t.addPeer[regionID]++
}

func (t *taskStatistics) incAddLeaner(regionID uint64) {
t.Lock()
defer t.Unlock()
t.addLearner[regionID]++
}

func (t *taskStatistics) incPromoteLeaner(regionID uint64) {
t.Lock()
defer t.Unlock()
t.promoteLeaner[regionID]++
}

func (t *taskStatistics) incRemovePeer(regionID uint64) {
t.Lock()
defer t.Unlock()
t.removePeer[regionID]++
}

func (t *taskStatistics) incMergeRegion() {
t.Lock()
defer t.Unlock()
t.mergeRegion++
}

func (t *taskStatistics) incTransferLeader(fromPeerID, toPeerID uint64) {
t.Lock()
defer t.Unlock()
_, ok := t.transferLeader[fromPeerID]
if ok {
t.transferLeader[fromPeerID][toPeerID]++
} else {
m := make(map[uint64]int)
m[toPeerID]++
t.transferLeader[fromPeerID] = m
}
}

type snapshotStatistics struct {
sync.RWMutex
receive map[uint64]int
send map[uint64]int
}
Expand All @@ -86,6 +134,8 @@ func newSchedulerStatistics() *schedulerStatistics {
}

func (s *snapshotStatistics) getStatistics() map[string]int {
s.RLock()
defer s.RUnlock()
maxSend := getMax(s.send)
maxReceive := getMax(s.receive)
minSend := getMin(s.send)
Expand All @@ -104,6 +154,18 @@ func (s *snapshotStatistics) getStatistics() map[string]int {
return stats
}

func (s *snapshotStatistics) incSendSnapshot(storeID uint64) {
s.Lock()
defer s.Unlock()
s.send[storeID]++
}

func (s *snapshotStatistics) incReceiveSnapshot(storeID uint64) {
s.Lock()
defer s.Unlock()
s.receive[storeID]++
}

// PrintStatistics prints the statistics of the scheduler.
func (s *schedulerStatistics) PrintStatistics() {
task := s.taskStats.getStatistics()
Expand Down
23 changes: 8 additions & 15 deletions tools/pd-simulator/simulator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (m *mergeRegion) Step(r *RaftEngine) {
)
r.SetRegion(mergeRegion)
r.recordRegionChange(mergeRegion)
r.schedulerStats.taskStats.mergeRegion++
r.schedulerStats.taskStats.incMergeRegion()
m.finished = true
}

Expand Down Expand Up @@ -194,14 +194,7 @@ func (t *transferLeader) Step(r *RaftEngine) {
r.recordRegionChange(newRegion)
fromPeerID := t.fromPeer.GetId()
toPeerID := t.peer.GetId()
_, ok := r.schedulerStats.taskStats.transferLeader[fromPeerID]
if ok {
r.schedulerStats.taskStats.transferLeader[fromPeerID][toPeerID]++
} else {
m := make(map[uint64]int)
m[toPeerID]++
r.schedulerStats.taskStats.transferLeader[fromPeerID] = m
}
r.schedulerStats.taskStats.incTransferLeader(fromPeerID, toPeerID)
}

func (t *transferLeader) RegionID() uint64 {
Expand Down Expand Up @@ -248,7 +241,7 @@ func (a *addPeer) Step(r *RaftEngine) {
if !processSnapshot(sendNode, a.sendingStat, snapshotSize) {
return
}
r.schedulerStats.snapshotStats.send[sendNode.Id]++
r.schedulerStats.snapshotStats.incSendSnapshot(sendNode.Id)

recvNode := r.conn.Nodes[a.peer.GetStoreId()]
if recvNode == nil {
Expand All @@ -259,17 +252,17 @@ func (a *addPeer) Step(r *RaftEngine) {
if !processSnapshot(recvNode, a.receivingStat, snapshotSize) {
return
}
r.schedulerStats.snapshotStats.receive[recvNode.Id]++
r.schedulerStats.snapshotStats.incReceiveSnapshot(recvNode.Id)

a.size -= a.speed
if a.size < 0 {
var opts []core.RegionCreateOption
if region.GetPeer(a.peer.GetId()) == nil {
opts = append(opts, core.WithAddPeer(a.peer))
r.schedulerStats.taskStats.addPeer[region.GetID()]++
r.schedulerStats.taskStats.incAddPeer(region.GetID())
} else {
opts = append(opts, core.WithPromoteLearner(a.peer.GetId()))
r.schedulerStats.taskStats.promoteLeaner[region.GetID()]++
r.schedulerStats.taskStats.incPromoteLeaner(region.GetID())
}
opts = append(opts, core.WithIncConfVer())
newRegion := region.Clone(opts...)
Expand Down Expand Up @@ -333,7 +326,7 @@ func (a *removePeer) Step(r *RaftEngine) {
)
r.SetRegion(newRegion)
r.recordRegionChange(newRegion)
r.schedulerStats.taskStats.removePeer[region.GetID()]++
r.schedulerStats.taskStats.incRemovePeer(region.GetID())
if r.conn.Nodes[storeID] == nil {
a.finished = true
return
Expand Down Expand Up @@ -387,7 +380,7 @@ func (a *addLearner) Step(r *RaftEngine) {
)
r.SetRegion(newRegion)
r.recordRegionChange(newRegion)
r.schedulerStats.taskStats.addLearner[region.GetID()]++
r.schedulerStats.taskStats.incAddLeaner(region.GetID())
}
a.finished = true
}
Expand Down

0 comments on commit 2fe8598

Please sign in to comment.