-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
schedulers: fix cache item in hot peer cache and add more test #2463
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,6 +14,8 @@ | |||||
package statistics | ||||||
|
||||||
import ( | ||||||
"math/rand" | ||||||
|
||||||
. "github.com/pingcap/check" | ||||||
"github.com/pingcap/kvproto/pkg/metapb" | ||||||
"github.com/pingcap/pd/v4/server/core" | ||||||
|
@@ -43,12 +45,7 @@ func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { | |||||
core.SetReportInterval(interval), | ||||||
core.SetWrittenBytes(interval*100*1024)) | ||||||
|
||||||
res := cache.CheckRegionFlow(region, stats) | ||||||
c.Assert(res, HasLen, 3) | ||||||
|
||||||
for _, p := range res { | ||||||
cache.Update(p) | ||||||
} | ||||||
checkAndUpdate(c, cache, region, stats, 3) | ||||||
{ | ||||||
stats := cache.RegionStats() | ||||||
c.Assert(stats, HasLen, 3) | ||||||
|
@@ -59,6 +56,150 @@ func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { | |||||
} | ||||||
} | ||||||
|
||||||
type operator int | ||||||
|
||||||
const ( | ||||||
transferLeader operator = iota | ||||||
movePeer | ||||||
addReplica | ||||||
) | ||||||
|
||||||
type testCacheCase struct { | ||||||
kind FlowKind | ||||||
operator operator | ||||||
expect int | ||||||
} | ||||||
|
||||||
func (t *testHotPeerCache) TestCache(c *C) { | ||||||
tests := []*testCacheCase{ | ||||||
{ReadFlow, transferLeader, 2}, | ||||||
{ReadFlow, movePeer, 1}, | ||||||
{ReadFlow, addReplica, 1}, | ||||||
{WriteFlow, transferLeader, 3}, | ||||||
{WriteFlow, movePeer, 4}, | ||||||
{WriteFlow, addReplica, 4}, | ||||||
} | ||||||
for _, t := range tests { | ||||||
testCache(c, t) | ||||||
} | ||||||
} | ||||||
|
||||||
func testCache(c *C, t *testCacheCase) { | ||||||
defaultSize := map[FlowKind]int{ | ||||||
ReadFlow: 1, // only leader | ||||||
WriteFlow: 3, // all peers | ||||||
} | ||||||
cache := NewHotStoresStats(t.kind) | ||||||
stats := NewStoresStats() | ||||||
region := buildRegion(nil, nil, t.kind) | ||||||
checkAndUpdate(c, cache, region, stats, defaultSize[t.kind]) | ||||||
checkHit(c, cache, region, t.kind, false) // all peers are new | ||||||
|
||||||
srcStore, region := schedule(t.operator, region, t.kind) | ||||||
res := checkAndUpdate(c, cache, region, stats, t.expect) | ||||||
checkHit(c, cache, region, t.kind, true) // hit cache | ||||||
if t.expect != defaultSize[t.kind] { | ||||||
checkNeedDelete(c, res, srcStore) | ||||||
} | ||||||
} | ||||||
|
||||||
func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, stats *StoresStats, expect int) []*HotPeerStat { | ||||||
res := cache.CheckRegionFlow(region, stats) | ||||||
c.Assert(res, HasLen, expect) | ||||||
for _, p := range res { | ||||||
cache.Update(p) | ||||||
} | ||||||
return res | ||||||
} | ||||||
|
||||||
func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind FlowKind, isHit bool) { | ||||||
var peers []*metapb.Peer | ||||||
if kind == ReadFlow { | ||||||
peers = []*metapb.Peer{region.GetLeader()} | ||||||
} else { | ||||||
peers = region.GetPeers() | ||||||
} | ||||||
for _, peer := range peers { | ||||||
item := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) | ||||||
c.Assert(item, NotNil) | ||||||
c.Assert(item.isNew, Equals, !isHit) | ||||||
} | ||||||
} | ||||||
|
||||||
func checkNeedDelete(c *C, ret []*HotPeerStat, storeID uint64) { | ||||||
for _, item := range ret { | ||||||
if item.StoreID == storeID { | ||||||
c.Assert(item.needDelete, IsTrue) | ||||||
return | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
func schedule(operator operator, region *core.RegionInfo, kind FlowKind) (srcStore uint64, _ *core.RegionInfo) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to indicate the meaning of this return var, I think it is better than putting it in a comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree |
||||||
switch operator { | ||||||
case transferLeader: | ||||||
_, newLeader := pickFollower(region) | ||||||
return region.GetLeader().StoreId, buildRegion(region.GetMeta(), newLeader, kind) | ||||||
case movePeer: | ||||||
index, _ := pickFollower(region) | ||||||
meta := region.GetMeta() | ||||||
srcStore := meta.Peers[index].StoreId | ||||||
meta.Peers[index] = &metapb.Peer{Id: 4, StoreId: 4} | ||||||
return srcStore, buildRegion(meta, region.GetLeader(), kind) | ||||||
case addReplica: | ||||||
meta := region.GetMeta() | ||||||
meta.Peers = append(meta.Peers, &metapb.Peer{Id: 4, StoreId: 4}) | ||||||
return 0, buildRegion(meta, region.GetLeader(), kind) | ||||||
default: | ||||||
return 0, nil | ||||||
} | ||||||
} | ||||||
|
||||||
func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { | ||||||
var dst int | ||||||
meta := region.GetMeta() | ||||||
|
||||||
for index, peer := range meta.Peers { | ||||||
if peer.StoreId == region.GetLeader().StoreId { | ||||||
continue | ||||||
} | ||||||
dst = index | ||||||
if rand.Intn(2) == 0 { | ||||||
break | ||||||
} | ||||||
} | ||||||
return dst, meta.Peers[dst] | ||||||
} | ||||||
|
||||||
func buildRegion(meta *metapb.Region, leader *metapb.Peer, kind FlowKind) *core.RegionInfo { | ||||||
const interval = uint64(60) | ||||||
if meta == nil { | ||||||
peer1 := &metapb.Peer{Id: 1, StoreId: 1} | ||||||
peer2 := &metapb.Peer{Id: 2, StoreId: 2} | ||||||
peer3 := &metapb.Peer{Id: 3, StoreId: 3} | ||||||
|
||||||
meta = &metapb.Region{ | ||||||
Id: 1000, | ||||||
Peers: []*metapb.Peer{peer1, peer2, peer3}, | ||||||
StartKey: []byte(""), | ||||||
EndKey: []byte(""), | ||||||
RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, | ||||||
} | ||||||
leader = meta.Peers[rand.Intn(3)] | ||||||
} | ||||||
|
||||||
switch kind { | ||||||
case ReadFlow: | ||||||
return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), | ||||||
core.SetReadBytes(interval*100*1024)) | ||||||
case WriteFlow: | ||||||
return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), | ||||||
core.SetWrittenBytes(interval*100*1024)) | ||||||
default: | ||||||
return nil | ||||||
} | ||||||
} | ||||||
|
||||||
type genID func(i int) uint64 | ||||||
|
||||||
func newPeers(n int, pid genID, sid genID) []*metapb.Peer { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized that it seems we can directly update the cache instead of returning the item which needs to be updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will open another PR to merge them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why directly update the cache? we need to check it first for reducing heartbeat lock overload.