diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 20e7a7cba66..38e4b4c5a92 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -105,8 +105,6 @@ func (f *hotPeerCache) Update(item *HotPeerStat) { // CheckRegionFlow checks the flow information of region. func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *StoresStats) (ret []*HotPeerStat) { - storeIDs := f.getAllStoreIDs(region) - totalBytes := float64(f.getTotalBytes(region)) totalKeys := float64(f.getTotalKeys(region)) @@ -116,9 +114,13 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *Sto byteRate := totalBytes / float64(interval) keyRate := totalKeys / float64(interval) + // old region is in the front and new region is in the back + // which ensures it will hit the cache if moving peer or transfer leader occurs with the same replica number + var tmpItem *HotPeerStat + storeIDs := f.getAllStoreIDs(region) for _, storeID := range storeIDs { - isExpired := f.isRegionExpired(region, storeID) + isExpired := f.isRegionExpired(region, storeID) // transfer leader or remove peer oldItem := f.getOldHotPeerStat(region.GetID(), storeID) if isExpired && oldItem != nil { tmpItem = oldItem @@ -141,9 +143,17 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *Sto isLeader: region.GetLeader().GetStoreId() == storeID, } - // use the tmpItem cached from other store - if oldItem == nil && tmpItem != nil { - oldItem = tmpItem + if oldItem == nil { + if tmpItem != nil { // use the tmpItem cached from the store where this region was in before + oldItem = tmpItem + } else { // new item is new peer after adding replica + for _, storeID := range storeIDs { + oldItem = f.getOldHotPeerStat(region.GetID(), storeID) + if oldItem != nil { + break + } + } + } } newItem = f.updateHotPeerStat(newItem, oldItem, storesStats) diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index bcba785be8f..45fc384fbdf 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -14,6 +14,8 @@ package statistics import ( + "math/rand" + . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/v4/server/core" @@ -43,12 +45,7 @@ func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { core.SetReportInterval(interval), core.SetWrittenBytes(interval*100*1024)) - res := cache.CheckRegionFlow(region, stats) - c.Assert(res, HasLen, 3) - - for _, p := range res { - cache.Update(p) - } + checkAndUpdate(c, cache, region, stats, 3) { stats := cache.RegionStats() c.Assert(stats, HasLen, 3) @@ -59,6 +56,150 @@ func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { } } +type operator int + +const ( + transferLeader operator = iota + movePeer + addReplica +) + +type testCacheCase struct { + kind FlowKind + operator operator + expect int +} + +func (t *testHotPeerCache) TestCache(c *C) { + tests := []*testCacheCase{ + {ReadFlow, transferLeader, 2}, + {ReadFlow, movePeer, 1}, + {ReadFlow, addReplica, 1}, + {WriteFlow, transferLeader, 3}, + {WriteFlow, movePeer, 4}, + {WriteFlow, addReplica, 4}, + } + for _, t := range tests { + testCache(c, t) + } +} + +func testCache(c *C, t *testCacheCase) { + defaultSize := map[FlowKind]int{ + ReadFlow: 1, // only leader + WriteFlow: 3, // all peers + } + cache := NewHotStoresStats(t.kind) + stats := NewStoresStats() + region := buildRegion(nil, nil, t.kind) + checkAndUpdate(c, cache, region, stats, defaultSize[t.kind]) + checkHit(c, cache, region, t.kind, false) // all peers are new + + srcStore, region := schedule(t.operator, region, t.kind) + res := checkAndUpdate(c, cache, region, stats, t.expect) + checkHit(c, cache, region, t.kind, true) // hit cache + if t.expect != defaultSize[t.kind] { + checkNeedDelete(c, res, srcStore) + } +} + +func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, stats *StoresStats, expect int) []*HotPeerStat { + res := cache.CheckRegionFlow(region, stats) + c.Assert(res, HasLen, expect) + for _, p := range res { + cache.Update(p) + } + return res +} + +func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind FlowKind, isHit bool) { + var peers []*metapb.Peer + if kind == ReadFlow { + peers = []*metapb.Peer{region.GetLeader()} + } else { + peers = region.GetPeers() + } + for _, peer := range peers { + item := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) + c.Assert(item, NotNil) + c.Assert(item.isNew, Equals, !isHit) + } +} + +func checkNeedDelete(c *C, ret []*HotPeerStat, storeID uint64) { + for _, item := range ret { + if item.StoreID == storeID { + c.Assert(item.needDelete, IsTrue) + return + } + } +} + +func schedule(operator operator, region *core.RegionInfo, kind FlowKind) (srcStore uint64, _ *core.RegionInfo) { + switch operator { + case transferLeader: + _, newLeader := pickFollower(region) + return region.GetLeader().StoreId, buildRegion(region.GetMeta(), newLeader, kind) + case movePeer: + index, _ := pickFollower(region) + meta := region.GetMeta() + srcStore := meta.Peers[index].StoreId + meta.Peers[index] = &metapb.Peer{Id: 4, StoreId: 4} + return srcStore, buildRegion(meta, region.GetLeader(), kind) + case addReplica: + meta := region.GetMeta() + meta.Peers = append(meta.Peers, &metapb.Peer{Id: 4, StoreId: 4}) + return 0, buildRegion(meta, region.GetLeader(), kind) + default: + return 0, nil + } +} + +func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { + var dst int + meta := region.GetMeta() + + for index, peer := range meta.Peers { + if peer.StoreId == region.GetLeader().StoreId { + continue + } + dst = index + if rand.Intn(2) == 0 { + break + } + } + return dst, meta.Peers[dst] +} + +func buildRegion(meta *metapb.Region, leader *metapb.Peer, kind FlowKind) *core.RegionInfo { + const interval = uint64(60) + if meta == nil { + peer1 := &metapb.Peer{Id: 1, StoreId: 1} + peer2 := &metapb.Peer{Id: 2, StoreId: 2} + peer3 := &metapb.Peer{Id: 3, StoreId: 3} + + meta = &metapb.Region{ + Id: 1000, + Peers: []*metapb.Peer{peer1, peer2, peer3}, + StartKey: []byte(""), + EndKey: []byte(""), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, + } + leader = meta.Peers[rand.Intn(3)] + } + + switch kind { + case ReadFlow: + return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), + core.SetReadBytes(interval*100*1024)) + case WriteFlow: + return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), + core.SetWrittenBytes(interval*100*1024)) + default: + return nil + } +} + type genID func(i int) uint64 func newPeers(n int, pid genID, sid genID) []*metapb.Peer {