From c14409832fe9e80f69e800069a715109cc42e923 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 8 Jun 2020 23:41:40 +0800 Subject: [PATCH 1/3] fix hot cache and add more test for it Signed-off-by: lhy1024 --- server/statistics/hot_peer_cache.go | 22 +++- server/statistics/hot_peer_cache_test.go | 140 ++++++++++++++++++++++- 2 files changed, 150 insertions(+), 12 deletions(-) diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 20e7a7cba66..38e4b4c5a92 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -105,8 +105,6 @@ func (f *hotPeerCache) Update(item *HotPeerStat) { // CheckRegionFlow checks the flow information of region. func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *StoresStats) (ret []*HotPeerStat) { - storeIDs := f.getAllStoreIDs(region) - totalBytes := float64(f.getTotalBytes(region)) totalKeys := float64(f.getTotalKeys(region)) @@ -116,9 +114,13 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *Sto byteRate := totalBytes / float64(interval) keyRate := totalKeys / float64(interval) + // old region is in the front and new region is in the back + // which ensures it will hit the cache if moving peer or transfer leader occurs with the same replica number + var tmpItem *HotPeerStat + storeIDs := f.getAllStoreIDs(region) for _, storeID := range storeIDs { - isExpired := f.isRegionExpired(region, storeID) + isExpired := f.isRegionExpired(region, storeID) // transfer leader or remove peer oldItem := f.getOldHotPeerStat(region.GetID(), storeID) if isExpired && oldItem != nil { tmpItem = oldItem @@ -141,9 +143,17 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *Sto isLeader: region.GetLeader().GetStoreId() == storeID, } - // use the tmpItem cached from other store - if oldItem == nil && tmpItem != nil { - oldItem = tmpItem + if oldItem == nil { + if tmpItem != nil { // use the tmpItem cached from the store where this region was in before + oldItem = tmpItem + } else { // new item is new peer after adding replica + for _, storeID := range storeIDs { + oldItem = f.getOldHotPeerStat(region.GetID(), storeID) + if oldItem != nil { + break + } + } + } } newItem = f.updateHotPeerStat(newItem, oldItem, storesStats) diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index bcba785be8f..f6dfeeb674c 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -14,6 +14,8 @@ package statistics import ( + "math/rand" + . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/v4/server/core" @@ -43,12 +45,7 @@ func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { core.SetReportInterval(interval), core.SetWrittenBytes(interval*100*1024)) - res := cache.CheckRegionFlow(region, stats) - c.Assert(res, HasLen, 3) - - for _, p := range res { - cache.Update(p) - } + checkAndUpdate(c, cache, region, stats, 3) { stats := cache.RegionStats() c.Assert(stats, HasLen, 3) @@ -59,6 +56,137 @@ func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { } } +type operator int + +const ( + transferLeader operator = iota + movePeer + addReplica +) + +type testCacheCase struct { + kind FlowKind + operator operator + expect int +} + +func (t *testHotPeerCache) TestCache(c *C) { + tests := []*testCacheCase{ + {ReadFlow, transferLeader, 1}, + {ReadFlow, movePeer, 1}, + {ReadFlow, addReplica, 1}, + {WriteFlow, transferLeader, 3}, + {WriteFlow, movePeer, 4}, + {WriteFlow, addReplica, 4}, + } + for _, t := range tests { + testCache(c, t) + } +} + +func testCache(c *C, t *testCacheCase) { + cache := NewHotStoresStats(t.kind) + stats := NewStoresStats() + region := buildRegion(nil, nil, t.kind) + if t.kind == ReadFlow { + checkAndUpdate(c, cache, region, stats, 1) // only leader + } else { + checkAndUpdate(c, cache, region, stats, 3) // all peer + } + checkHit(c, cache, region, t.kind, false) // all peers are new + schedule(t.operator, region, t.kind) + checkAndUpdate(c, cache, region, stats, t.expect) + checkHit(c, cache, region, t.kind, true) // hit cache +} + +func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, stats *StoresStats, num int) { + res := cache.CheckRegionFlow(region, stats) + c.Assert(res, HasLen, num) + for _, p := range res { + cache.Update(p) + } +} + +func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind FlowKind, isHit bool) { + var peers []*metapb.Peer + if kind == ReadFlow { + peers = []*metapb.Peer{region.GetLeader()} + } else { + peers = region.GetPeers() + } + for _, peer := range peers { + item := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) + c.Assert(item, NotNil) + c.Assert(item.isNew, Equals, !isHit) + } +} + +const interval = uint64(60) + +func schedule(operator operator, region *core.RegionInfo, kind FlowKind) *core.RegionInfo { + switch operator { + case transferLeader: + _, newLeader := pickFollower(region) + return buildRegion(region.GetMeta(), newLeader, kind) + case movePeer: + index, _ := pickFollower(region) + meta := region.GetMeta() + meta.Peers[index] = &metapb.Peer{Id: 4, StoreId: 4} + return buildRegion(meta, region.GetLeader(), kind) + case addReplica: + meta := region.GetMeta() + meta.Peers = append(meta.Peers, &metapb.Peer{Id: 4, StoreId: 4}) + return buildRegion(meta, region.GetLeader(), kind) + default: + return nil + } +} + +func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { + var dst int + meta := region.GetMeta() + srcStoreID := region.GetLeader().StoreId + + for index, peer := range meta.Peers { + if peer.StoreId == srcStoreID { + continue + } + dst = index + if rand.Intn(2) == 0 { + break + } + } + return dst, meta.Peers[dst] +} + +func buildRegion(meta *metapb.Region, leader *metapb.Peer, kind FlowKind) *core.RegionInfo { + if meta == nil { + peer1 := &metapb.Peer{Id: 1, StoreId: 1} + peer2 := &metapb.Peer{Id: 2, StoreId: 2} + peer3 := &metapb.Peer{Id: 3, StoreId: 3} + + meta = &metapb.Region{ + Id: 1000, + Peers: []*metapb.Peer{peer1, peer2, peer3}, + StartKey: []byte(""), + EndKey: []byte(""), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, + } + leader = meta.Peers[rand.Intn(3)] + } + + switch kind { + case ReadFlow: + return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), + core.SetReadBytes(interval*100*1024)) + case WriteFlow: + return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), + core.SetWrittenBytes(interval*100*1024)) + default: + return nil + } +} + type genID func(i int) uint64 func newPeers(n int, pid genID, sid genID) []*metapb.Peer { From dfb15aad17d9261ef94e11a58c054642d6637eb5 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 10 Jun 2020 22:26:27 +0800 Subject: [PATCH 2/3] add test for `needDelete` Signed-off-by: lhy1024 --- server/statistics/hot_peer_cache_test.go | 48 +++++++++++++++--------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index f6dfeeb674c..71e91d6878f 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -72,7 +72,7 @@ type testCacheCase struct { func (t *testHotPeerCache) TestCache(c *C) { tests := []*testCacheCase{ - {ReadFlow, transferLeader, 1}, + {ReadFlow, transferLeader, 2}, {ReadFlow, movePeer, 1}, {ReadFlow, addReplica, 1}, {WriteFlow, transferLeader, 3}, @@ -85,26 +85,31 @@ func (t *testHotPeerCache) TestCache(c *C) { } func testCache(c *C, t *testCacheCase) { + defaultSize := map[FlowKind]int{ + ReadFlow: 1, // only leader + WriteFlow: 3, // all peers + } cache := NewHotStoresStats(t.kind) stats := NewStoresStats() region := buildRegion(nil, nil, t.kind) - if t.kind == ReadFlow { - checkAndUpdate(c, cache, region, stats, 1) // only leader - } else { - checkAndUpdate(c, cache, region, stats, 3) // all peer - } + checkAndUpdate(c, cache, region, stats, defaultSize[t.kind]) checkHit(c, cache, region, t.kind, false) // all peers are new - schedule(t.operator, region, t.kind) - checkAndUpdate(c, cache, region, stats, t.expect) + + srcStore, region := schedule(t.operator, region, t.kind) + res := checkAndUpdate(c, cache, region, stats, t.expect) checkHit(c, cache, region, t.kind, true) // hit cache + if t.expect != defaultSize[t.kind] { + checkNeedDelete(c, res, srcStore) + } } -func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, stats *StoresStats, num int) { +func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, stats *StoresStats, expect int) []*HotPeerStat { res := cache.CheckRegionFlow(region, stats) - c.Assert(res, HasLen, num) + c.Assert(res, HasLen, expect) for _, p := range res { cache.Update(p) } + return res } func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind FlowKind, isHit bool) { @@ -121,34 +126,43 @@ func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind FlowKind, } } +func checkNeedDelete(c *C, ret []*HotPeerStat, storeID uint64) { + for _, item := range ret { + if item.StoreID == storeID { + c.Assert(item.needDelete, IsTrue) + return + } + } +} + const interval = uint64(60) -func schedule(operator operator, region *core.RegionInfo, kind FlowKind) *core.RegionInfo { +func schedule(operator operator, region *core.RegionInfo, kind FlowKind) (srcStore uint64, _ *core.RegionInfo) { switch operator { case transferLeader: _, newLeader := pickFollower(region) - return buildRegion(region.GetMeta(), newLeader, kind) + return region.GetLeader().StoreId, buildRegion(region.GetMeta(), newLeader, kind) case movePeer: index, _ := pickFollower(region) meta := region.GetMeta() + srcStore := meta.Peers[index].StoreId meta.Peers[index] = &metapb.Peer{Id: 4, StoreId: 4} - return buildRegion(meta, region.GetLeader(), kind) + return srcStore, buildRegion(meta, region.GetLeader(), kind) case addReplica: meta := region.GetMeta() meta.Peers = append(meta.Peers, &metapb.Peer{Id: 4, StoreId: 4}) - return buildRegion(meta, region.GetLeader(), kind) + return 0, buildRegion(meta, region.GetLeader(), kind) default: - return nil + return 0, nil } } func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { var dst int meta := region.GetMeta() - srcStoreID := region.GetLeader().StoreId for index, peer := range meta.Peers { - if peer.StoreId == srcStoreID { + if peer.StoreId == region.GetLeader().StoreId { continue } dst = index From 62fc1e8d35b2f024ebb2f3271dc52d0aa8e3626c Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 16 Jun 2020 01:23:55 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: lhy1024 --- server/statistics/hot_peer_cache_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index 71e91d6878f..45fc384fbdf 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -135,8 +135,6 @@ func checkNeedDelete(c *C, ret []*HotPeerStat, storeID uint64) { } } -const interval = uint64(60) - func schedule(operator operator, region *core.RegionInfo, kind FlowKind) (srcStore uint64, _ *core.RegionInfo) { switch operator { case transferLeader: @@ -174,6 +172,7 @@ func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { } func buildRegion(meta *metapb.Region, leader *metapb.Peer, kind FlowKind) *core.RegionInfo { + const interval = uint64(60) if meta == nil { peer1 := &metapb.Peer{Id: 1, StoreId: 1} peer2 := &metapb.Peer{Id: 2, StoreId: 2}