From d3e8a46c78361a99e8042f16fd5a87326cdefeb1 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Fri, 12 Aug 2022 12:03:37 +0800 Subject: [PATCH 1/3] simulator Signed-off-by: bufferflies <1045931706@qq.com> --- tools/pd-simulator/simulator/node.go | 8 +- tools/pd-simulator/simulator/task.go | 209 ++++++++++++++++----------- 2 files changed, 129 insertions(+), 88 deletions(-) diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index 136fbeb3ba9..53728292de7 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -24,6 +24,7 @@ import ( "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/tools/pd-simulator/simulator/cases" "github.com/tikv/pd/tools/pd-simulator/simulator/info" "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" @@ -49,7 +50,7 @@ type Node struct { ctx context.Context cancel context.CancelFunc raftEngine *RaftEngine - ioRate int64 + limiter *ratelimit.RateLimiter sizeMutex sync.Mutex } @@ -77,6 +78,7 @@ func NewNode(s *cases.Store, pdAddr string, ioRate int64) (*Node, error) { cancel() return nil, err } + speed := ioRate * units.MiB return &Node{ Store: store, stats: stats, @@ -85,7 +87,7 @@ func NewNode(s *cases.Store, pdAddr string, ioRate int64) (*Node, error) { cancel: cancel, tasks: make(map[uint64]Task), receiveRegionHeartbeatCh: receiveRegionHeartbeatCh, - ioRate: ioRate * units.MiB, + limiter: ratelimit.NewRateLimiter(float64(speed), int(speed)), tick: uint64(rand.Intn(storeHeartBeatPeriod)), }, nil } @@ -142,7 +144,7 @@ func (n *Node) stepTask() { for _, task := range n.tasks { task.Step(n.raftEngine) if task.IsFinished() { - simutil.Logger.Debug("task finished", + simutil.Logger.Debug("task status", zap.Uint64("node-id", n.Id), zap.Uint64("region-id", task.RegionID()), zap.String("task", task.Desc())) diff --git a/tools/pd-simulator/simulator/task.go b/tools/pd-simulator/simulator/task.go index e502b0be40d..eb194ff90cc 100644 --- a/tools/pd-simulator/simulator/task.go +++ b/tools/pd-simulator/simulator/task.go @@ -17,7 +17,9 @@ package simulator import ( "bytes" "fmt" + "time" + "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/eraftpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -25,6 +27,28 @@ import ( "github.com/tikv/pd/tools/pd-analysis/analysis" ) +var ( + chunkSize = int64(4 * units.KiB) + maxSnapGeneratorPoolSize = uint32(2) + maxSnapReceivePoolSize = uint32(4) + compressionRatio = int64(2) +) + +type snapAction string + +const ( + generate snapAction = "generator" + receive = "receive" +) + +type snapStatus int + +const ( + pending snapStatus = iota + running + finished +) + // Task running in node. type Task interface { Desc() string @@ -45,14 +69,8 @@ func responseToTask(resp *pdpb.RegionHeartbeatResponse, r *RaftEngine) Task { case eraftpb.ConfChangeType_AddNode: return &addPeer{ regionID: regionID, - size: region.GetApproximateSize(), - keys: region.GetApproximateKeys(), - speed: 100 * 1000 * 1000, epoch: epoch, peer: changePeer.GetPeer(), - // This two variables are used to simulate sending and receiving snapshot processes. - sendingStat: &snapshotStat{"sending", region.GetApproximateSize(), false}, - receivingStat: &snapshotStat{"receiving", region.GetApproximateSize(), false}, } case eraftpb.ConfChangeType_RemoveNode: return &removePeer{ @@ -68,9 +86,11 @@ func responseToTask(resp *pdpb.RegionHeartbeatResponse, r *RaftEngine) Task { regionID: regionID, size: region.GetApproximateSize(), keys: region.GetApproximateKeys(), - speed: 100 * 1000 * 1000, epoch: epoch, peer: changePeer.GetPeer(), + // This two variables are used to simulate sending and receiving snapshot processes. + sendingStat: newSnapshotState(region.GetApproximateSize(), generate), + receivingStat: newSnapshotState(region.GetApproximateSize(), receive), } } } else if resp.GetTransferLeader() != nil { @@ -94,9 +114,22 @@ func responseToTask(resp *pdpb.RegionHeartbeatResponse, r *RaftEngine) Task { } type snapshotStat struct { - kind string + action snapAction remainSize int64 - finished bool + status snapStatus + start time.Time +} + +func newSnapshotState(size int64, action snapAction) *snapshotStat { + if action == receive { + size = size / compressionRatio + } + return &snapshotStat{ + remainSize: size, + action: action, + status: pending, + start: time.Now(), + } } type mergeRegion struct { @@ -209,15 +242,10 @@ func (t *transferLeader) IsFinished() bool { } type addPeer struct { - regionID uint64 - size int64 - keys int64 - speed int64 - epoch *metapb.RegionEpoch - peer *metapb.Peer - finished bool - sendingStat *snapshotStat - receivingStat *snapshotStat + regionID uint64 + epoch *metapb.RegionEpoch + peer *metapb.Peer + finished bool } func (a *addPeer) Desc() string { @@ -234,44 +262,19 @@ func (a *addPeer) Step(r *RaftEngine) { return } - snapshotSize := region.GetApproximateSize() - sendNode := r.conn.Nodes[region.GetLeader().GetStoreId()] - if sendNode == nil { - a.finished = true - return - } - if !processSnapshot(sendNode, a.sendingStat, snapshotSize) { - return - } - r.schedulerStats.snapshotStats.incSendSnapshot(sendNode.Id) - - recvNode := r.conn.Nodes[a.peer.GetStoreId()] - if recvNode == nil { - a.finished = true - return - } - if !processSnapshot(recvNode, a.receivingStat, snapshotSize) { - return - } - r.schedulerStats.snapshotStats.incReceiveSnapshot(recvNode.Id) - - a.size -= a.speed - if a.size < 0 { - var opts []core.RegionCreateOption - if region.GetPeer(a.peer.GetId()) == nil { - opts = append(opts, core.WithAddPeer(a.peer)) - r.schedulerStats.taskStats.incAddPeer(region.GetID()) - } else { - opts = append(opts, core.WithPromoteLearner(a.peer.GetId())) - r.schedulerStats.taskStats.incPromoteLeaner(region.GetID()) - } - opts = append(opts, core.WithIncConfVer()) - newRegion := region.Clone(opts...) - r.SetRegion(newRegion) - r.recordRegionChange(newRegion) - recvNode.incUsedSize(uint64(snapshotSize)) - a.finished = true + var opts []core.RegionCreateOption + if region.GetPeer(a.peer.GetId()) == nil { + opts = append(opts, core.WithAddPeer(a.peer)) + r.schedulerStats.taskStats.incAddPeer(region.GetID()) + } else { + opts = append(opts, core.WithPromoteLearner(a.peer.GetId())) + r.schedulerStats.taskStats.incPromoteLeaner(region.GetID()) } + opts = append(opts, core.WithIncConfVer()) + newRegion := region.Clone(opts...) + r.SetRegion(newRegion) + r.recordRegionChange(newRegion) + a.finished = true } func (a *addPeer) RegionID() uint64 { @@ -352,13 +355,14 @@ func (a *removePeer) IsFinished() bool { } type addLearner struct { - regionID uint64 - size int64 - keys int64 - speed int64 - epoch *metapb.RegionEpoch - peer *metapb.Peer - finished bool + regionID uint64 + size int64 + keys int64 + epoch *metapb.RegionEpoch + peer *metapb.Peer + finished bool + sendingStat *snapshotStat + receivingStat *snapshotStat } func (a *addLearner) Desc() string { @@ -375,21 +379,41 @@ func (a *addLearner) Step(r *RaftEngine) { return } - a.size -= a.speed - if a.size < 0 { - if region.GetPeer(a.peer.GetId()) == nil { - newRegion := region.Clone( - core.WithAddPeer(a.peer), - core.WithIncConfVer(), - ) - r.SetRegion(newRegion) - r.recordRegionChange(newRegion) - r.schedulerStats.taskStats.incAddLeaner(region.GetID()) - } + snapshotSize := region.GetApproximateSize() + sendNode := r.conn.Nodes[region.GetLeader().GetStoreId()] + if sendNode == nil { a.finished = true - if analysis.GetTransferCounter().IsValid { - analysis.GetTransferCounter().AddTarget(a.regionID, a.peer.StoreId) - } + return + } + if !processSnapshot(sendNode, a.sendingStat) { + return + } + r.schedulerStats.snapshotStats.incSendSnapshot(sendNode.Id) + + recvNode := r.conn.Nodes[a.peer.GetStoreId()] + if recvNode == nil { + a.finished = true + return + } + if !processSnapshot(recvNode, a.receivingStat) { + return + } + r.schedulerStats.snapshotStats.incReceiveSnapshot(recvNode.Id) + + if region.GetPeer(a.peer.GetId()) == nil { + newRegion := region.Clone( + core.WithAddPeer(a.peer), + core.WithIncConfVer(), + ) + r.SetRegion(newRegion) + r.recordRegionChange(newRegion) + r.schedulerStats.taskStats.incAddLeaner(region.GetID()) + recvNode.incUsedSize(uint64(snapshotSize)) + a.finished = true + } + + if analysis.GetTransferCounter().IsValid { + analysis.GetTransferCounter().AddTarget(a.regionID, a.peer.StoreId) } } @@ -401,23 +425,38 @@ func (a *addLearner) IsFinished() bool { return a.finished } -func processSnapshot(n *Node, stat *snapshotStat, snapshotSize int64) bool { - // If the statement is true, it will start to send or receive the snapshot. - if stat.remainSize == snapshotSize { - if stat.kind == "sending" { +func processSnapshot(n *Node, stat *snapshotStat) bool { + if stat.status == finished { + return true + } + if stat.status == pending { + if stat.action == generate && n.stats.SendingSnapCount > maxSnapGeneratorPoolSize { + return false + } + if stat.action == receive && n.stats.ReceivingSnapCount > maxSnapReceivePoolSize { + return false + } + stat.status = running + // If the statement is true, it will start to send or receive the snapshot. + if stat.action == generate { n.stats.SendingSnapCount++ } else { n.stats.ReceivingSnapCount++ } } - stat.remainSize -= n.ioRate - // The sending or receiving process has not finished yet. + + // store should generate/receive snapshot by chunk size. + for n.limiter.AllowN(int(chunkSize)) { + stat.remainSize -= chunkSize + } + + // The sending or receiving process has not status yet. if stat.remainSize > 0 { return false } - if !stat.finished { - stat.finished = true - if stat.kind == "sending" { + if stat.status == running { + stat.status = finished + if stat.action == generate { n.stats.SendingSnapCount-- } else { n.stats.ReceivingSnapCount-- From 5440c6bc9717fcbae7c6f549243c3f3814d8631e Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Mon, 10 Oct 2022 11:46:50 +0800 Subject: [PATCH 2/3] the limit should speed up Signed-off-by: bufferflies <1045931706@qq.com> --- tools/pd-simulator/simulator/node.go | 3 ++- tools/pd-simulator/simulator/task.go | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index 7edf56e1915..a6a9c478735 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -91,7 +91,8 @@ func NewNode(s *cases.Store, pdAddr string, config *SimConfig) (*Node, error) { cancel() return nil, err } - speed := config.StoreIOMBPerSecond * units.MiB + ratio := int64(time.Second) / config.SimTickInterval.Milliseconds() + speed := config.StoreIOMBPerSecond * units.MiB * ratio return &Node{ Store: store, stats: stats, diff --git a/tools/pd-simulator/simulator/task.go b/tools/pd-simulator/simulator/task.go index 24ba6e6be02..7013cafe14f 100644 --- a/tools/pd-simulator/simulator/task.go +++ b/tools/pd-simulator/simulator/task.go @@ -34,11 +34,11 @@ var ( compressionRatio = int64(2) ) -type snapAction string +type snapAction int const ( - Generate snapAction = "Generate" - Receive = "Receive" + Generate = iota + Receive ) type snapStatus int @@ -446,6 +446,7 @@ func processSnapshot(n *Node, stat *snapshotStat) bool { } // store should Generate/Receive snapshot by chunk size. + // todo: the process of snapshot is single thread, the later snapshot task must wait the first one. for n.limiter.AllowN(int(chunkSize)) { stat.remainSize -= chunkSize } From 44db4fd679a1467c0d6a76366cada8767ab9f0d6 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Fri, 14 Oct 2022 11:42:36 +0800 Subject: [PATCH 3/3] pass unit test Signed-off-by: bufferflies <1045931706@qq.com> --- tools/pd-simulator/simulator/task.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tools/pd-simulator/simulator/task.go b/tools/pd-simulator/simulator/task.go index 7013cafe14f..083d8b6774c 100644 --- a/tools/pd-simulator/simulator/task.go +++ b/tools/pd-simulator/simulator/task.go @@ -37,8 +37,8 @@ var ( type snapAction int const ( - Generate = iota - Receive + generate = iota + receive ) type snapStatus int @@ -89,8 +89,8 @@ func responseToTask(resp *pdpb.RegionHeartbeatResponse, r *RaftEngine) Task { epoch: epoch, peer: changePeer.GetPeer(), // This two variables are used to simulate sending and receiving snapshot processes. - sendingStat: newSnapshotState(region.GetApproximateSize(), Generate), - receivingStat: newSnapshotState(region.GetApproximateSize(), Receive), + sendingStat: newSnapshotState(region.GetApproximateSize(), generate), + receivingStat: newSnapshotState(region.GetApproximateSize(), receive), } } } else if resp.GetTransferLeader() != nil { @@ -121,8 +121,8 @@ type snapshotStat struct { } func newSnapshotState(size int64, action snapAction) *snapshotStat { - if action == Receive { - size = size / compressionRatio + if action == receive { + size /= compressionRatio } return &snapshotStat{ remainSize: size, @@ -430,15 +430,15 @@ func processSnapshot(n *Node, stat *snapshotStat) bool { return true } if stat.status == pending { - if stat.action == Generate && n.stats.SendingSnapCount > maxSnapGeneratorPoolSize { + if stat.action == generate && n.stats.SendingSnapCount > maxSnapGeneratorPoolSize { return false } - if stat.action == Receive && n.stats.ReceivingSnapCount > maxSnapReceivePoolSize { + if stat.action == receive && n.stats.ReceivingSnapCount > maxSnapReceivePoolSize { return false } stat.status = running // If the statement is true, it will start to send or Receive the snapshot. - if stat.action == Generate { + if stat.action == generate { n.stats.SendingSnapCount++ } else { n.stats.ReceivingSnapCount++ @@ -457,7 +457,7 @@ func processSnapshot(n *Node, stat *snapshotStat) bool { } if stat.status == running { stat.status = finished - if stat.action == Generate { + if stat.action == generate { n.stats.SendingSnapCount-- } else { n.stats.ReceivingSnapCount--