Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simulator: the operator should share one limiter. #5323

Merged
merged 5 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/tools/pd-simulator/simulator/cases"
"github.com/tikv/pd/tools/pd-simulator/simulator/info"
"github.com/tikv/pd/tools/pd-simulator/simulator/simutil"
Expand All @@ -48,7 +49,7 @@ type Node struct {
ctx context.Context
cancel context.CancelFunc
raftEngine *RaftEngine
ioRate int64
limiter *ratelimit.RateLimiter
sizeMutex sync.Mutex
}

Expand Down Expand Up @@ -90,6 +91,8 @@ func NewNode(s *cases.Store, pdAddr string, config *SimConfig) (*Node, error) {
cancel()
return nil, err
}
ratio := int64(time.Second) / config.SimTickInterval.Milliseconds()
speed := config.StoreIOMBPerSecond * units.MiB * ratio
return &Node{
Store: store,
stats: stats,
Expand All @@ -98,7 +101,7 @@ func NewNode(s *cases.Store, pdAddr string, config *SimConfig) (*Node, error) {
cancel: cancel,
tasks: make(map[uint64]Task),
receiveRegionHeartbeatCh: receiveRegionHeartbeatCh,
ioRate: config.StoreIOMBPerSecond * units.MiB,
limiter: ratelimit.NewRateLimiter(float64(speed), int(speed)),
tick: uint64(rand.Intn(storeHeartBeatPeriod)),
}, nil
}
Expand Down Expand Up @@ -155,7 +158,7 @@ func (n *Node) stepTask() {
for _, task := range n.tasks {
task.Step(n.raftEngine)
if task.IsFinished() {
simutil.Logger.Debug("task finished",
simutil.Logger.Debug("task status",
zap.Uint64("node-id", n.Id),
zap.Uint64("region-id", task.RegionID()),
zap.String("task", task.Desc()))
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-simulator/simulator/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (r *RaftEngine) stepSplit(region *core.RegionInfo) {
if r.useTiDBEncodedKey {
splitKey, err = simutil.GenerateTiDBEncodedSplitKey(region.GetStartKey(), region.GetEndKey())
if err != nil {
simutil.Logger.Fatal("generate TiDB encoded split key failed", zap.Error(err))
simutil.Logger.Fatal("Generate TiDB encoded split key failed", zap.Error(err))
}
} else {
splitKey = simutil.GenerateSplitKey(region.GetStartKey(), region.GetEndKey())
Expand Down
210 changes: 125 additions & 85 deletions tools/pd-simulator/simulator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,38 @@ package simulator
import (
"bytes"
"fmt"
"time"

"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/eraftpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/tools/pd-analysis/analysis"
)

var (
chunkSize = int64(4 * units.KiB)
maxSnapGeneratorPoolSize = uint32(2)
maxSnapReceivePoolSize = uint32(4)
compressionRatio = int64(2)
)

type snapAction int

const (
Generate = iota
Receive
)

type snapStatus int

const (
pending snapStatus = iota
running
finished
)

// Task running in node.
type Task interface {
Desc() string
Expand All @@ -45,14 +69,8 @@ func responseToTask(resp *pdpb.RegionHeartbeatResponse, r *RaftEngine) Task {
case eraftpb.ConfChangeType_AddNode:
return &addPeer{
regionID: regionID,
size: region.GetApproximateSize(),
keys: region.GetApproximateKeys(),
speed: 100 * 1000 * 1000,
epoch: epoch,
peer: changePeer.GetPeer(),
// This two variables are used to simulate sending and receiving snapshot processes.
sendingStat: &snapshotStat{"sending", region.GetApproximateSize(), false},
receivingStat: &snapshotStat{"receiving", region.GetApproximateSize(), false},
}
case eraftpb.ConfChangeType_RemoveNode:
return &removePeer{
Expand All @@ -68,9 +86,11 @@ func responseToTask(resp *pdpb.RegionHeartbeatResponse, r *RaftEngine) Task {
regionID: regionID,
size: region.GetApproximateSize(),
keys: region.GetApproximateKeys(),
speed: 100 * 1000 * 1000,
epoch: epoch,
peer: changePeer.GetPeer(),
// This two variables are used to simulate sending and receiving snapshot processes.
sendingStat: newSnapshotState(region.GetApproximateSize(), Generate),
receivingStat: newSnapshotState(region.GetApproximateSize(), Receive),
}
}
} else if resp.GetTransferLeader() != nil {
Expand All @@ -94,9 +114,22 @@ func responseToTask(resp *pdpb.RegionHeartbeatResponse, r *RaftEngine) Task {
}

type snapshotStat struct {
kind string
action snapAction
remainSize int64
finished bool
status snapStatus
start time.Time
}

func newSnapshotState(size int64, action snapAction) *snapshotStat {
if action == Receive {
size = size / compressionRatio
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
}
return &snapshotStat{
remainSize: size,
action: action,
status: pending,
start: time.Now(),
}
}

type mergeRegion struct {
Expand Down Expand Up @@ -209,15 +242,10 @@ func (t *transferLeader) IsFinished() bool {
}

type addPeer struct {
regionID uint64
size int64
keys int64
speed int64
epoch *metapb.RegionEpoch
peer *metapb.Peer
finished bool
sendingStat *snapshotStat
receivingStat *snapshotStat
regionID uint64
epoch *metapb.RegionEpoch
peer *metapb.Peer
finished bool
}

func (a *addPeer) Desc() string {
Expand All @@ -234,44 +262,19 @@ func (a *addPeer) Step(r *RaftEngine) {
return
}

snapshotSize := region.GetApproximateSize()
sendNode := r.conn.Nodes[region.GetLeader().GetStoreId()]
if sendNode == nil {
a.finished = true
return
}
if !processSnapshot(sendNode, a.sendingStat, snapshotSize) {
return
}
r.schedulerStats.snapshotStats.incSendSnapshot(sendNode.Id)

recvNode := r.conn.Nodes[a.peer.GetStoreId()]
if recvNode == nil {
a.finished = true
return
}
if !processSnapshot(recvNode, a.receivingStat, snapshotSize) {
return
}
r.schedulerStats.snapshotStats.incReceiveSnapshot(recvNode.Id)

a.size -= a.speed
if a.size < 0 {
var opts []core.RegionCreateOption
if region.GetPeer(a.peer.GetId()) == nil {
opts = append(opts, core.WithAddPeer(a.peer))
r.schedulerStats.taskStats.incAddPeer(region.GetID())
} else {
opts = append(opts, core.WithPromoteLearner(a.peer.GetId()))
r.schedulerStats.taskStats.incPromoteLeaner(region.GetID())
}
opts = append(opts, core.WithIncConfVer())
newRegion := region.Clone(opts...)
r.SetRegion(newRegion)
r.recordRegionChange(newRegion)
recvNode.incUsedSize(uint64(snapshotSize))
a.finished = true
var opts []core.RegionCreateOption
if region.GetPeer(a.peer.GetId()) == nil {
opts = append(opts, core.WithAddPeer(a.peer))
r.schedulerStats.taskStats.incAddPeer(region.GetID())
} else {
opts = append(opts, core.WithPromoteLearner(a.peer.GetId()))
r.schedulerStats.taskStats.incPromoteLeaner(region.GetID())
}
opts = append(opts, core.WithIncConfVer())
newRegion := region.Clone(opts...)
r.SetRegion(newRegion)
r.recordRegionChange(newRegion)
a.finished = true
}

func (a *addPeer) RegionID() uint64 {
Expand Down Expand Up @@ -352,13 +355,14 @@ func (a *removePeer) IsFinished() bool {
}

type addLearner struct {
regionID uint64
size int64
keys int64
speed int64
epoch *metapb.RegionEpoch
peer *metapb.Peer
finished bool
regionID uint64
size int64
keys int64
epoch *metapb.RegionEpoch
peer *metapb.Peer
finished bool
sendingStat *snapshotStat
receivingStat *snapshotStat
}

func (a *addLearner) Desc() string {
Expand All @@ -375,21 +379,41 @@ func (a *addLearner) Step(r *RaftEngine) {
return
}

a.size -= a.speed
if a.size < 0 {
if region.GetPeer(a.peer.GetId()) == nil {
newRegion := region.Clone(
core.WithAddPeer(a.peer),
core.WithIncConfVer(),
)
r.SetRegion(newRegion)
r.recordRegionChange(newRegion)
r.schedulerStats.taskStats.incAddLeaner(region.GetID())
}
snapshotSize := region.GetApproximateSize()
sendNode := r.conn.Nodes[region.GetLeader().GetStoreId()]
if sendNode == nil {
a.finished = true
if analysis.GetTransferCounter().IsValid {
analysis.GetTransferCounter().AddTarget(a.regionID, a.peer.StoreId)
}
return
}
if !processSnapshot(sendNode, a.sendingStat) {
return
}
r.schedulerStats.snapshotStats.incSendSnapshot(sendNode.Id)

recvNode := r.conn.Nodes[a.peer.GetStoreId()]
if recvNode == nil {
a.finished = true
return
}
if !processSnapshot(recvNode, a.receivingStat) {
return
}
r.schedulerStats.snapshotStats.incReceiveSnapshot(recvNode.Id)

if region.GetPeer(a.peer.GetId()) == nil {
newRegion := region.Clone(
core.WithAddPeer(a.peer),
core.WithIncConfVer(),
)
r.SetRegion(newRegion)
r.recordRegionChange(newRegion)
r.schedulerStats.taskStats.incAddLeaner(region.GetID())
recvNode.incUsedSize(uint64(snapshotSize))
a.finished = true
}

if analysis.GetTransferCounter().IsValid {
analysis.GetTransferCounter().AddTarget(a.regionID, a.peer.StoreId)
}
}

Expand All @@ -401,23 +425,39 @@ func (a *addLearner) IsFinished() bool {
return a.finished
}

func processSnapshot(n *Node, stat *snapshotStat, snapshotSize int64) bool {
// If the statement is true, it will start to send or receive the snapshot.
if stat.remainSize == snapshotSize {
if stat.kind == "sending" {
func processSnapshot(n *Node, stat *snapshotStat) bool {
if stat.status == finished {
return true
}
if stat.status == pending {
if stat.action == Generate && n.stats.SendingSnapCount > maxSnapGeneratorPoolSize {
return false
}
if stat.action == Receive && n.stats.ReceivingSnapCount > maxSnapReceivePoolSize {
return false
}
stat.status = running
// If the statement is true, it will start to send or Receive the snapshot.
if stat.action == Generate {
n.stats.SendingSnapCount++
} else {
n.stats.ReceivingSnapCount++
}
}
stat.remainSize -= n.ioRate
// The sending or receiving process has not finished yet.

// store should Generate/Receive snapshot by chunk size.
// todo: the process of snapshot is single thread, the later snapshot task must wait the first one.
for n.limiter.AllowN(int(chunkSize)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot be acceleration. we can configure the tick to sped up the simulator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix it

stat.remainSize -= chunkSize
}

// The sending or receiving process has not status yet.
if stat.remainSize > 0 {
return false
}
if !stat.finished {
stat.finished = true
if stat.kind == "sending" {
if stat.status == running {
stat.status = finished
if stat.action == Generate {
n.stats.SendingSnapCount--
} else {
n.stats.ReceivingSnapCount--
Expand Down