Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: check stats healthy by reference #8132

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type RegionInfo struct {
buckets unsafe.Pointer
// source is used to indicate region's source, such as Storage/Sync/Heartbeat.
source RegionSource
// ref is used to indicate the reference count of the region in root-tree and sub-tree.
ref atomic.Int32
}

// RegionSource is the source of region.
Expand All @@ -106,6 +108,21 @@ func (r *RegionInfo) LoadedFromSync() bool {
return r.source == Sync
}

// IncRef increases the reference count.
func (r *RegionInfo) IncRef() {
r.ref.Add(1)
}

// DecRef decreases the reference count.
func (r *RegionInfo) DecRef() {
r.ref.Add(-1)
}

// GetRef returns the reference count.
func (r *RegionInfo) GetRef() int32 {
return r.ref.Load()
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo {
regionInfo := &RegionInfo{
Expand Down Expand Up @@ -928,7 +945,7 @@ type RegionsInfo struct {
// NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers
func NewRegionsInfo() *RegionsInfo {
return &RegionsInfo{
tree: newRegionTree(),
tree: newRegionTreeWithCountRef(),
regions: make(map[uint64]*regionItem),
subRegions: make(map[uint64]*regionItem),
leaders: make(map[uint64]*regionTree),
Expand Down Expand Up @@ -1117,10 +1134,14 @@ func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem, countRef bool) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
if !countRef {
store = newRegionTree()
} else {
store = newRegionTreeWithCountRef()
}
peersMap[storeID] = store
}
store.update(item, false)
Expand All @@ -1131,17 +1152,17 @@ func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
setPeer(r.leaders, storeID, item, true)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
setPeer(r.followers, storeID, item, false)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
setPeer(peersMap, storeID, item, false)
}
}
// Add to learners.
Expand Down Expand Up @@ -1309,10 +1330,14 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem, countRef bool) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
if !countRef {
store = newRegionTree()
} else {
store = newRegionTreeWithCountRef()
}
peersMap[storeID] = store
}
store.update(item, false)
Expand All @@ -1323,17 +1348,17 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
setPeer(r.leaders, storeID, item, true)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
setPeer(r.followers, storeID, item, false)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
setPeer(peersMap, storeID, item, false)
}
}
// Add to learners.
Expand Down
87 changes: 85 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,16 +874,24 @@ func TestUpdateRegionEquivalence(t *testing.T) {
ctx := ContextTODO()
regionsOld.AtomicCheckAndPutRegion(ctx, item)
// new way
newItem := item.Clone()
ctx = ContextTODO()
regionsNew.CheckAndPutRootTree(ctx, item)
regionsNew.CheckAndPutSubTree(item)
regionsNew.CheckAndPutRootTree(ctx, newItem)
regionsNew.CheckAndPutSubTree(newItem)
}
checksEquivalence := func() {
re.Equal(regionsOld.GetRegionCount([]byte(""), []byte("")), regionsNew.GetRegionCount([]byte(""), []byte("")))
re.Equal(regionsOld.GetRegionSizeByRange([]byte(""), []byte("")), regionsNew.GetRegionSizeByRange([]byte(""), []byte("")))
checkRegions(re, regionsOld)
checkRegions(re, regionsNew)

for _, r := range regionsOld.GetRegions() {
re.Equal(int32(2), r.GetRef(), r.GetID())
}
for _, r := range regionsNew.GetRegions() {
re.Equal(int32(2), r.GetRef())
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}

for i := 1; i <= storeNums; i++ {
re.Equal(regionsOld.GetStoreRegionCount(uint64(i)), regionsNew.GetStoreRegionCount(uint64(i)))
re.Equal(regionsOld.GetStoreLeaderCount(uint64(i)), regionsNew.GetStoreLeaderCount(uint64(i)))
Expand Down Expand Up @@ -938,3 +946,78 @@ func generateTestRegions(count int, storeNum int) []*RegionInfo {
}
return items
}

func TestUpdateRegionEventualConsistency(t *testing.T) {
re := require.New(t)
regionsOld := NewRegionsInfo()
regionsNew := NewRegionsInfo()
i := 1
storeNum := 5
peer1 := &metapb.Peer{StoreId: uint64(i%storeNum + 1), Id: uint64(i*storeNum + 1)}
peer2 := &metapb.Peer{StoreId: uint64((i+1)%storeNum + 1), Id: uint64(i*storeNum + 2)}
peer3 := &metapb.Peer{StoreId: uint64((i+2)%storeNum + 1), Id: uint64(i*storeNum + 3)}
item := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer1, peer2, peer3},
StartKey: []byte(fmt.Sprintf("%20d", i*10)),
EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100},
},
peer1,
SetApproximateKeys(10),
SetApproximateSize(10),
)
regionItemA := item
regionPendingItemA := regionItemA.Clone(WithPendingPeers([]*metapb.Peer{peer3}))

regionItemB := regionItemA.Clone()
regionPendingItemB := regionItemB.Clone(WithPendingPeers([]*metapb.Peer{peer3}))
regionGuide := GenerateRegionGuideFunc(true)

// Old way
{
ctx := ContextTODO()
regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA)
re.Equal(int32(2), regionPendingItemA.GetRef())
// check new item
saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA)
re.True(needSync)
re.True(saveCache)
re.False(saveKV)
// update cache
regionsOld.AtomicCheckAndPutRegion(ctx, regionItemA)
re.Equal(int32(2), regionItemA.GetRef())
}

// New way
{
// root tree part in order, and updated in order, updated regionPendingItemB first, then regionItemB
ctx := ContextTODO()
regionsNew.CheckAndPutRootTree(ctx, regionPendingItemB)
re.Equal(int32(1), regionPendingItemB.GetRef())
ctx = ContextTODO()
regionsNew.CheckAndPutRootTree(ctx, regionItemB)
re.Equal(int32(1), regionItemB.GetRef())
re.Equal(int32(0), regionPendingItemB.GetRef())

// subtree part missing order, updated regionItemB first, then regionPendingItemB
regionsNew.CheckAndPutSubTree(regionItemB)
re.Equal(int32(2), regionItemB.GetRef())
re.Equal(int32(0), regionPendingItemB.GetRef())
regionsNew.UpdateSubTreeOrderInsensitive(regionPendingItemB)
re.Equal(int32(1), regionItemB.GetRef())
re.Equal(int32(1), regionPendingItemB.GetRef())

// heartbeat again, no need updates root tree
saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB)
re.False(needSync)
re.False(saveCache)
re.False(saveKV)

// but need update sub tree again
item := regionsNew.GetRegion(regionItemB.GetID())
re.Equal(int32(1), item.GetRef())
regionsNew.CheckAndPutSubTree(item)
re.Equal(int32(2), item.GetRef())
}
}
26 changes: 26 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type regionTree struct {
totalWriteKeysRate float64
// count the number of regions that not loaded from storage.
notFromStorageRegionsCnt int
// count reference of RegionInfo
countRef bool
}

func newRegionTree() *regionTree {
Expand All @@ -81,6 +83,17 @@ func newRegionTree() *regionTree {
}
}

func newRegionTreeWithCountRef() *regionTree {
return &regionTree{
tree: btree.NewG[*regionItem](defaultBTreeDegree),
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
notFromStorageRegionsCnt: 0,
countRef: true,
}
}

func (t *regionTree) length() int {
if t == nil {
return 0
Expand Down Expand Up @@ -140,6 +153,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
t.tree.Delete(old)
}
t.tree.ReplaceOrInsert(item)
if t.countRef {
item.RegionInfo.IncRef()
}
result := make([]*RegionInfo, len(overlaps))
for i, overlap := range overlaps {
old := overlap.RegionInfo
Expand All @@ -155,6 +171,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
if !old.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
if t.countRef {
old.DecRef()
}
}

return result
Expand All @@ -180,6 +199,10 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
if !origin.LoadedFromStorage() && region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
if t.countRef {
origin.DecRef()
region.IncRef()
}
}

// remove removes a region if the region is in the tree.
Expand All @@ -199,6 +222,9 @@ func (t *regionTree) remove(region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if t.countRef {
result.RegionInfo.DecRef()
}
if !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
Expand Down
1 change: 1 addition & 0 deletions pkg/core/region_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestRegionTree(t *testing.T) {
updateNewItem(tree, regionA)
updateNewItem(tree, regionC)
re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c"))))
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo)
re.Nil(tree.search([]byte{}))
re.Equal(regionA, tree.search([]byte("a")))
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,16 @@
},
)
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},

Check warning on line 625 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L620-L625

Added lines #L620 - L625 were not covered by tests
)
}
return nil
}
tracer.OnSaveCacheBegin()
Expand Down
10 changes: 10 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,16 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
},
)
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
)
}
return nil
}
failpoint.Inject("concurrentRegionHeartbeat", func() {
Expand Down