From b453646b9b9520ad8ba67eb558eedcb905ea5a94 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 22 May 2024 23:46:36 +0800 Subject: [PATCH] --wip-- [skip ci] --- pkg/cluster/cluster.go | 7 +- pkg/core/peer.go | 31 ---- pkg/mcs/scheduling/server/cluster.go | 3 +- pkg/mock/mockcluster/mockcluster.go | 32 +--- pkg/statistics/hot_cache.go | 8 +- pkg/statistics/hot_cache_task.go | 11 +- pkg/statistics/hot_peer_cache.go | 97 +++++++------ pkg/statistics/hot_peer_cache_test.go | 201 +++++++++++--------------- pkg/statistics/utils/kind.go | 6 +- server/cluster/cluster.go | 3 +- tools/pd-ctl/tests/hot/hot_test.go | 3 +- 11 files changed, 160 insertions(+), 242 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 8bd2616f41f7..07636bed074d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -35,12 +35,7 @@ type Cluster interface { func HandleStatsAsync(c Cluster, region *core.RegionInfo) { c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) - } + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads())) c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) } diff --git a/pkg/core/peer.go b/pkg/core/peer.go index 659886e6d394..1f888ba58eb7 100644 --- a/pkg/core/peer.go +++ b/pkg/core/peer.go @@ -77,34 +77,3 @@ func CountInJointState(peers ...*metapb.Peer) int { } return count } - -// PeerInfo provides peer information -type PeerInfo struct { - *metapb.Peer - loads []float64 - interval uint64 -} - -// NewPeerInfo creates PeerInfo -func NewPeerInfo(meta *metapb.Peer, loads []float64, interval uint64) *PeerInfo { - return &PeerInfo{ - Peer: meta, - loads: loads, - interval: interval, - } -} - -// GetLoads provides loads -func (p *PeerInfo) GetLoads() []float64 { - return p.loads -} - -// GetPeerID provides peer id -func (p *PeerInfo) GetPeerID() uint64 { - return p.GetId() -} - -// GetInterval returns reporting interval -func (p *PeerInfo) GetInterval() uint64 { - return p.interval -} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index c6c365b03ad8..96309d20270c 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -442,8 +442,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - peerInfo := core.NewPeerInfo(peer, loads, interval) - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index e5b3e39a5020..95f42ebc5e5d 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -894,16 +894,7 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := mc.HotCache.CheckReadPeerSync(peerInfo, region) - if item != nil { - items = append(items, item) - } - } - return items + return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) } // CheckRegionWrite checks region write info with all peers @@ -911,16 +902,7 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredWriteItems(region) items = append(items, expiredItems...) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := mc.HotCache.CheckWritePeerSync(peerInfo, region) - if item != nil { - items = append(items, item) - } - } - return items + return append(items, mc.HotCache.CheckWritePeerSync(region, nil)...) } // CheckRegionLeaderRead checks region read info with leader peer @@ -928,15 +910,7 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics. items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - peer := region.GetLeader() - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := mc.HotCache.CheckReadPeerSync(peerInfo, region) - if item != nil { - items = append(items, item) - } - return items + return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) } // ObserveRegionsStats records the current stores stats from region stats. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 799fb240d108..fd7b67ae9a30 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -172,14 +172,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) { // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.writeCache.checkPeerFlow(peer, region) +func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { + return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.readCache.checkPeerFlow(peer, region) +func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { + return w.readCache.checkPeerFlow(region, region.GetPeers(), loads) } // ExpiredReadItems returns the read items which are already expired. diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index fa224b522ff1..7a6c211ff0c9 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -26,23 +26,20 @@ type FlowItemTask interface { } type checkPeerTask struct { - peerInfo *core.PeerInfo regionInfo *core.RegionInfo + loads []float64 } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask { +func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64) FlowItemTask { return &checkPeerTask{ - peerInfo: peerInfo, regionInfo: regionInfo, + loads: loads, } } func (t *checkPeerTask) runTask(cache *hotPeerCache) { - stat := cache.checkPeerFlow(t.peerInfo, t.regionInfo) - if stat != nil { - cache.updateStat(stat) - } + cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads) } type checkExpiredTask struct { diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index cd27dcad4c8d..05fdb23ea77e 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -174,58 +174,71 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt // checkPeerFlow checks the flow information of a peer. // Notice: checkPeerFlow couldn't be used concurrently. // checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - interval := peer.GetInterval() +func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64) []*HotPeerStat { + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose return nil } - storeID := peer.GetStoreId() - deltaLoads := peer.GetLoads() + + if deltaLoads == nil { + deltaLoads = region.GetLoads() + } f.collectPeerMetrics(deltaLoads, interval) // update metrics regionID := region.GetID() - oldItem := f.getOldHotPeerStat(regionID, storeID) - - // check whether the peer is allowed to be inherited - source := utils.Direct - if oldItem == nil { - for _, storeID := range f.getAllStoreIDs(region) { - oldItem = f.getOldHotPeerStat(regionID, storeID) - if oldItem != nil && oldItem.allowInherited { - source = utils.Inherit - break + + var stats []*HotPeerStat + for _, peer := range peers { + storeID := peer.GetStoreId() + oldItem := f.getOldHotPeerStat(regionID, storeID) + + // check whether the peer is allowed to be inherited + source := utils.Direct + if oldItem == nil { + for _, storeID := range f.getAllStoreIDs(region) { + oldItem = f.getOldHotPeerStat(regionID, storeID) + if oldItem != nil && oldItem.allowInherited { + source = utils.Inherit + break + } } } - } - - // check new item whether is hot - if oldItem == nil { - regionStats := f.kind.RegionStats() - thresholds := f.calcHotThresholds(storeID) - isHot := slice.AnyOf(regionStats, func(i int) bool { - return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i] - }) - if !isHot { - return nil + // check new item whether is hot + if oldItem == nil { + regionStats := f.kind.RegionStats() + thresholds := f.calcHotThresholds(storeID) + isHot := slice.AnyOf(regionStats, func(i int) bool { + return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i] + }) + if !isHot { + continue + } } - } - peers := region.GetPeers() - newItem := &HotPeerStat{ - StoreID: storeID, - RegionID: regionID, - Loads: f.kind.GetLoadRatesFromPeer(peer), - isLeader: region.GetLeader().GetStoreId() == storeID, - actionType: utils.Update, - stores: make([]uint64, len(peers)), - } - for i, peer := range peers { - newItem.stores[i] = peer.GetStoreId() - } - - if oldItem == nil { - return f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second) + newItem := &HotPeerStat{ + StoreID: storeID, + RegionID: regionID, + Loads: f.kind.GetLoadRatesFromPeer(peer, deltaLoads, interval), + isLeader: region.GetLeader().GetStoreId() == storeID, + actionType: utils.Update, + stores: make([]uint64, len(peers)), + } + for i, peer := range peers { + newItem.stores[i] = peer.GetStoreId() + } + if oldItem == nil { + if stat := f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second); stat != nil { + f.updateStat(stat) + stats = append(stats, stat) + } + continue + } + if stat := f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source); stat != nil { + f.updateStat(stat) + stats = append(stats, stat) + } } - return f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source) + return stats } // checkColdPeer checks the collect the un-heartbeat peer and maintain it. diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 36f922d38307..47857a985e24 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -16,6 +16,7 @@ package statistics import ( "context" + "fmt" "math/rand" "sort" "testing" @@ -106,17 +107,8 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer } func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() res = append(res, cache.collectExpiredItems(region)...) - for _, peer := range peers { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := cache.checkPeerFlow(peerInfo, region) - if item != nil { - res = append(res, item) - } - } - return res + return append(res, cache.checkPeerFlow(region, peers, region.GetLoads())...) } func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { @@ -164,6 +156,7 @@ func checkHit(re *require.Assertions, cache *hotPeerCache, region *core.RegionIn for _, peer := range peers { item := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) re.NotNil(item) + fmt.Println("!!!", actionType, item.actionType) re.Equal(actionType, item.actionType) } } @@ -304,89 +297,89 @@ func newPeers(n int, pid genID, sid genID) []*metapb.Peer { return peers } -func TestUpdateHotPeerStat(t *testing.T) { - re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) - storeID, regionID := uint64(1), uint64(2) - peer := &metapb.Peer{StoreId: storeID} - region := core.NewRegionInfo(&metapb.Region{Id: regionID, Peers: []*metapb.Peer{peer}}, peer) - // we statistic read peer info from store heartbeat rather than region heartbeat - m := utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval - ThresholdsUpdateInterval = 0 - defer func() { - ThresholdsUpdateInterval = 8 * time.Second - }() - - // skip interval=0 - interval := 0 - deltaLoads := []float64{0.0, 0.0, 0.0} - utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 - utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 - utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - - newItem := cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Nil(newItem) - - // new peer, interval is larger than report interval, but no hot - interval = 10 - deltaLoads = []float64{0.0, 0.0, 0.0} - utils.MinHotThresholds[utils.RegionReadBytes] = 1.0 - utils.MinHotThresholds[utils.RegionReadKeys] = 1.0 - utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0 - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Nil(newItem) - - // new peer, interval is less than report interval - interval = 4 - deltaLoads = []float64{60.0, 60.0, 60.0} - utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 - utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 - utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.NotNil(newItem) - re.Equal(0, newItem.HotDegree) - re.Equal(0, newItem.AntiCount) - // sum of interval is less than report interval - interval = 4 - deltaLoads = []float64{60.0, 60.0, 60.0} - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(0, newItem.HotDegree) - re.Equal(0, newItem.AntiCount) - // sum of interval is larger than report interval, and hot - newItem.AntiCount = utils.Read.DefaultAntiCount() - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(1, newItem.HotDegree) - re.Equal(2*m, newItem.AntiCount) - // sum of interval is less than report interval - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(1, newItem.HotDegree) - re.Equal(2*m, newItem.AntiCount) - // sum of interval is larger than report interval, and hot - interval = 10 - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(2, newItem.HotDegree) - re.Equal(2*m, newItem.AntiCount) - // sum of interval is larger than report interval, and cold - utils.MinHotThresholds[utils.RegionReadBytes] = 10.0 - utils.MinHotThresholds[utils.RegionReadKeys] = 10.0 - utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0 - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(1, newItem.HotDegree) - re.Equal(2*m-1, newItem.AntiCount) - // sum of interval is larger than report interval, and cold - for i := 0; i < 2*m-1; i++ { - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - } - re.Less(newItem.HotDegree, 0) - re.Equal(0, newItem.AntiCount) - re.Equal(utils.Remove, newItem.actionType) -} +// func TestUpdateHotPeerStat(t *testing.T) { +// re := require.New(t) +// cache := NewHotPeerCache(context.Background(), utils.Read) +// storeID, regionID := uint64(1), uint64(2) +// peer := &metapb.Peer{StoreId: storeID} +// region := core.NewRegionInfo(&metapb.Region{Id: regionID, Peers: []*metapb.Peer{peer}}, peer) +// // we statistic read peer info from store heartbeat rather than region heartbeat +// m := utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval +// ThresholdsUpdateInterval = 0 +// defer func() { +// ThresholdsUpdateInterval = 8 * time.Second +// }() + +// // skip interval=0 +// interval := 0 +// deltaLoads := []float64{0.0, 0.0, 0.0} +// utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 +// utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 +// utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 + +// newItem := cache.checkPeerFlow(region, deltaLoads) +// re.Nil(newItem) + +// // new peer, interval is larger than report interval, but no hot +// interval = 10 +// deltaLoads = []float64{0.0, 0.0, 0.0} +// utils.MinHotThresholds[utils.RegionReadBytes] = 1.0 +// utils.MinHotThresholds[utils.RegionReadKeys] = 1.0 +// utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0 +// newItem = cache.checkPeerFlow(region, deltaLoads) +// re.Nil(newItem) + +// // new peer, interval is less than report interval +// interval = 4 +// deltaLoads = []float64{60.0, 60.0, 60.0} +// utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 +// utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 +// utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 +// newItem = cache.checkPeerFlow(region, deltaLoads) +// re.NotNil(newItem) +// re.Equal(0, newItem[0].HotDegree) +// re.Equal(0, newItem[0].AntiCount) +// // sum of interval is less than report interval +// interval = 4 +// deltaLoads = []float64{60.0, 60.0, 60.0} +// cache.updateStat(newItem) +// newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) +// re.Equal(0, newItem.HotDegree) +// re.Equal(0, newItem.AntiCount) +// // sum of interval is larger than report interval, and hot +// newItem.AntiCount = utils.Read.DefaultAntiCount() +// cache.updateStat(newItem) +// newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) +// re.Equal(1, newItem.HotDegree) +// re.Equal(2*m, newItem.AntiCount) +// // sum of interval is less than report interval +// cache.updateStat(newItem) +// newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) +// re.Equal(1, newItem.HotDegree) +// re.Equal(2*m, newItem.AntiCount) +// // sum of interval is larger than report interval, and hot +// interval = 10 +// cache.updateStat(newItem) +// newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) +// re.Equal(2, newItem.HotDegree) +// re.Equal(2*m, newItem.AntiCount) +// // sum of interval is larger than report interval, and cold +// utils.MinHotThresholds[utils.RegionReadBytes] = 10.0 +// utils.MinHotThresholds[utils.RegionReadKeys] = 10.0 +// utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0 +// cache.updateStat(newItem) +// newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) +// re.Equal(1, newItem.HotDegree) +// re.Equal(2*m-1, newItem.AntiCount) +// // sum of interval is larger than report interval, and cold +// for i := 0; i < 2*m-1; i++ { +// cache.updateStat(newItem) +// newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) +// } +// re.Less(newItem.HotDegree, 0) +// re.Equal(0, newItem.AntiCount) +// re.Equal(utils.Remove, newItem.actionType) +// } func TestThresholdWithUpdateHotPeerStat(t *testing.T) { re := require.New(t) @@ -688,11 +681,7 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { StartTimestamp: start, EndTimestamp: end, })) - newPeer := core.NewPeerInfo(meta.Peers[0], region.GetLoads(), end-start) - stat := cache.checkPeerFlow(newPeer, newRegion) - if stat != nil { - cache.updateStat(stat) - } + cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil) } if ThresholdsUpdateInterval == 0 { if id < 60 { @@ -717,22 +706,8 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { func BenchmarkCheckRegionFlow(b *testing.B) { cache := NewHotPeerCache(context.Background(), utils.Read) region := buildRegion(utils.Read, 3, 10) - peerInfos := make([]*core.PeerInfo, 0) - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) - peerInfos = append(peerInfos, peerInfo) - } b.ResetTimer() for i := 0; i < b.N; i++ { - items := make([]*HotPeerStat, 0) - for _, peerInfo := range peerInfos { - item := cache.checkPeerFlow(peerInfo, region) - if item != nil { - items = append(items, item) - } - } - for _, ret := range items { - cache.updateStat(ret) - } + cache.checkPeerFlow(region, region.GetPeers(), nil) } } diff --git a/pkg/statistics/utils/kind.go b/pkg/statistics/utils/kind.go index 4d44b8d57e10..463b35c450af 100644 --- a/pkg/statistics/utils/kind.go +++ b/pkg/statistics/utils/kind.go @@ -15,7 +15,7 @@ package utils import ( - "github.com/tikv/pd/pkg/core" + "github.com/pingcap/kvproto/pkg/metapb" ) const ( @@ -231,9 +231,7 @@ func (rw RWType) DefaultAntiCount() int { } // GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo. -func (rw RWType) GetLoadRatesFromPeer(peer *core.PeerInfo) []float64 { - deltaLoads := peer.GetLoads() - interval := peer.GetInterval() +func (rw RWType) GetLoadRatesFromPeer(peer *metapb.Peer, deltaLoads []float64, interval uint64) []float64 { loads := make([]float64, DimLen) for dim, k := range rw.RegionStats() { loads[dim] = deltaLoads[k] / float64(interval) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 148b43541a23..870e2495dc3a 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -959,8 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - peerInfo := core.NewPeerInfo(peer, loads, interval) - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) } } for _, stat := range stats.GetSnapshotStats() { diff --git a/tools/pd-ctl/tests/hot/hot_test.go b/tools/pd-ctl/tests/hot/hot_test.go index 7661704aa41d..c06e42edf60e 100644 --- a/tools/pd-ctl/tests/hot/hot_test.go +++ b/tools/pd-ctl/tests/hot/hot_test.go @@ -188,11 +188,10 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) { Id: 100 + regionIDCounter, StoreId: hotStoreID, } - peerInfo := core.NewPeerInfo(leader, loads, reportInterval) region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) testutil.Eventually(re, func() bool { hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil