Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simulator: make heartbeat more reasonable #1418

Merged
merged 5 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tools/pd-simulator/simulator/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package simulator

import (
"context"
"sync"

"go.uber.org/zap"

Expand All @@ -27,6 +28,7 @@ import (

// Driver promotes the cluster status change.
type Driver struct {
wg sync.WaitGroup
pdAddr string
simCase *cases.Case
client Client
Expand Down Expand Up @@ -106,8 +108,10 @@ func (d *Driver) Tick() {
d.eventRunner.Tick(d.tickCount)
for _, n := range d.conn.Nodes {
n.reportRegionChange()
n.Tick()
d.wg.Add(1)
go n.Tick(&d.wg)
}
d.wg.Wait()
}

// Check checks if the simulation is completed.
Expand Down
10 changes: 7 additions & 3 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package simulator
import (
"context"
"fmt"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -79,6 +80,7 @@ func NewNode(s *cases.Store, pdAddr string, ioRate int64) (*Node, error) {
tasks: make(map[uint64]Task),
receiveRegionHeartbeatCh: receiveRegionHeartbeatCh,
ioRate: ioRate * cases.MB,
tick: uint64(rand.Intn(storeHeartBeatPeriod)),
}, nil
}

Expand Down Expand Up @@ -112,7 +114,8 @@ func (n *Node) receiveRegionHeartbeat() {
}

// Tick steps node status change.
func (n *Node) Tick() {
func (n *Node) Tick(wg *sync.WaitGroup) {
defer wg.Done()
if n.GetState() != metapb.StoreState_Up {
return
}
Expand Down Expand Up @@ -185,7 +188,8 @@ func (n *Node) regionHeartBeat() {
}

func (n *Node) reportRegionChange() {
for _, regionID := range n.raftEngine.regionChange[n.Id] {
regionIDs := n.raftEngine.GetRegionChange(n.Id)
for _, regionID := range regionIDs {
region := n.raftEngine.GetRegion(regionID)
ctx, cancel := context.WithTimeout(n.ctx, pdTimeout)
err := n.client.RegionHeartbeat(ctx, region)
Expand All @@ -195,9 +199,9 @@ func (n *Node) reportRegionChange() {
zap.Uint64("region-id", region.GetID()),
zap.Error(err))
}
n.raftEngine.ResetRegionChange(n.Id, regionID)
cancel()
}
delete(n.raftEngine.regionChange, n.Id)
}

// AddTask adds task in this node.
Expand Down
22 changes: 21 additions & 1 deletion tools/pd-simulator/simulator/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,33 @@ func (r *RaftEngine) electNewLeader(region *core.RegionInfo) *metapb.Peer {
return nil
}

// GetRegion returns the RegionInfo with regionID
// GetRegion returns the RegionInfo with regionID.
func (r *RaftEngine) GetRegion(regionID uint64) *core.RegionInfo {
r.RLock()
defer r.RUnlock()
return r.regionsInfo.GetRegion(regionID)
}

// GetRegionChange returns a list of RegionID for a given store.
func (r *RaftEngine) GetRegionChange(storeID uint64) []uint64 {
r.RLock()
defer r.RUnlock()
return r.regionChange[storeID]
}

// ResetRegionChange resets RegionInfo on a specific store with a given Region ID
func (r *RaftEngine) ResetRegionChange(storeID uint64, regionID uint64) {
r.Lock()
defer r.Unlock()
regionIDs := r.regionChange[storeID]
for i, id := range regionIDs {
if id == regionID {
r.regionChange[storeID] = append(r.regionChange[storeID][:i], r.regionChange[storeID][i+1:]...)
return
}
}
}

// GetRegions gets all RegionInfo from regionMap
func (r *RaftEngine) GetRegions() []*core.RegionInfo {
r.RLock()
Expand Down
62 changes: 62 additions & 0 deletions tools/pd-simulator/simulator/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package simulator
import (
"fmt"
"math"
"sync"
)

type taskStatistics struct {
sync.RWMutex
addPeer map[uint64]int
removePeer map[uint64]int
addLearner map[uint64]int
Expand All @@ -38,6 +40,8 @@ func newTaskStatistics() *taskStatistics {
}

func (t *taskStatistics) getStatistics() map[string]int {
t.RLock()
defer t.RUnlock()
stats := make(map[string]int)
addpeer := getSum(t.addPeer)
removePeer := getSum(t.removePeer)
Expand All @@ -61,7 +65,51 @@ func (t *taskStatistics) getStatistics() map[string]int {
return stats
}

func (t *taskStatistics) incAddPeer(regionID uint64) {
t.Lock()
defer t.Unlock()
t.addPeer[regionID]++
}

func (t *taskStatistics) incAddLeaner(regionID uint64) {
t.Lock()
defer t.Unlock()
t.addLearner[regionID]++
}

func (t *taskStatistics) incPromoteLeaner(regionID uint64) {
t.Lock()
defer t.Unlock()
t.promoteLeaner[regionID]++
}

func (t *taskStatistics) incRemovePeer(regionID uint64) {
t.Lock()
defer t.Unlock()
t.removePeer[regionID]++
}

func (t *taskStatistics) incMergeRegion() {
t.Lock()
defer t.Unlock()
t.mergeRegion++
}

func (t *taskStatistics) incTransferLeader(fromPeerID, toPeerID uint64) {
t.Lock()
defer t.Unlock()
_, ok := t.transferLeader[fromPeerID]
if ok {
t.transferLeader[fromPeerID][toPeerID]++
} else {
m := make(map[uint64]int)
m[toPeerID]++
t.transferLeader[fromPeerID] = m
}
}

type snapshotStatistics struct {
sync.RWMutex
receive map[uint64]int
send map[uint64]int
}
Expand All @@ -86,6 +134,8 @@ func newSchedulerStatistics() *schedulerStatistics {
}

func (s *snapshotStatistics) getStatistics() map[string]int {
s.RLock()
defer s.RUnlock()
maxSend := getMax(s.send)
maxReceive := getMax(s.receive)
minSend := getMin(s.send)
Expand All @@ -104,6 +154,18 @@ func (s *snapshotStatistics) getStatistics() map[string]int {
return stats
}

func (s *snapshotStatistics) incSendSnapshot(storeID uint64) {
s.Lock()
defer s.Unlock()
s.send[storeID]++
}

func (s *snapshotStatistics) incReceiveSnapshot(storeID uint64) {
s.Lock()
defer s.Unlock()
s.receive[storeID]++
}

// PrintStatistics prints the statistics of the scheduler.
func (s *schedulerStatistics) PrintStatistics() {
task := s.taskStats.getStatistics()
Expand Down
23 changes: 8 additions & 15 deletions tools/pd-simulator/simulator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (m *mergeRegion) Step(r *RaftEngine) {
)
r.SetRegion(mergeRegion)
r.recordRegionChange(mergeRegion)
r.schedulerStats.taskStats.mergeRegion++
r.schedulerStats.taskStats.incMergeRegion()
m.finished = true
}

Expand Down Expand Up @@ -194,14 +194,7 @@ func (t *transferLeader) Step(r *RaftEngine) {
r.recordRegionChange(newRegion)
fromPeerID := t.fromPeer.GetId()
toPeerID := t.peer.GetId()
_, ok := r.schedulerStats.taskStats.transferLeader[fromPeerID]
if ok {
r.schedulerStats.taskStats.transferLeader[fromPeerID][toPeerID]++
} else {
m := make(map[uint64]int)
m[toPeerID]++
r.schedulerStats.taskStats.transferLeader[fromPeerID] = m
}
r.schedulerStats.taskStats.incTransferLeader(fromPeerID, toPeerID)
}

func (t *transferLeader) RegionID() uint64 {
Expand Down Expand Up @@ -248,7 +241,7 @@ func (a *addPeer) Step(r *RaftEngine) {
if !processSnapshot(sendNode, a.sendingStat, snapshotSize) {
return
}
r.schedulerStats.snapshotStats.send[sendNode.Id]++
r.schedulerStats.snapshotStats.incSendSnapshot(sendNode.Id)

recvNode := r.conn.Nodes[a.peer.GetStoreId()]
if recvNode == nil {
Expand All @@ -259,17 +252,17 @@ func (a *addPeer) Step(r *RaftEngine) {
if !processSnapshot(recvNode, a.receivingStat, snapshotSize) {
return
}
r.schedulerStats.snapshotStats.receive[recvNode.Id]++
r.schedulerStats.snapshotStats.incReceiveSnapshot(recvNode.Id)

a.size -= a.speed
if a.size < 0 {
var opts []core.RegionCreateOption
if region.GetPeer(a.peer.GetId()) == nil {
opts = append(opts, core.WithAddPeer(a.peer))
r.schedulerStats.taskStats.addPeer[region.GetID()]++
r.schedulerStats.taskStats.incAddPeer(region.GetID())
} else {
opts = append(opts, core.WithPromoteLearner(a.peer.GetId()))
r.schedulerStats.taskStats.promoteLeaner[region.GetID()]++
r.schedulerStats.taskStats.incPromoteLeaner(region.GetID())
}
opts = append(opts, core.WithIncConfVer())
newRegion := region.Clone(opts...)
Expand Down Expand Up @@ -333,7 +326,7 @@ func (a *removePeer) Step(r *RaftEngine) {
)
r.SetRegion(newRegion)
r.recordRegionChange(newRegion)
r.schedulerStats.taskStats.removePeer[region.GetID()]++
r.schedulerStats.taskStats.incRemovePeer(region.GetID())
if r.conn.Nodes[storeID] == nil {
a.finished = true
return
Expand Down Expand Up @@ -387,7 +380,7 @@ func (a *addLearner) Step(r *RaftEngine) {
)
r.SetRegion(newRegion)
r.recordRegionChange(newRegion)
r.schedulerStats.taskStats.addLearner[region.GetID()]++
r.schedulerStats.taskStats.incAddLeaner(region.GetID())
}
a.finished = true
}
Expand Down