Skip to content

Commit

Permalink
*: optimize memory usage (#8164)
Browse files Browse the repository at this point in the history
ref #7897

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Jun 13, 2024
1 parent 14c68f6 commit 3e34b80
Show file tree
Hide file tree
Showing 19 changed files with 322 additions and 393 deletions.
22 changes: 19 additions & 3 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,25 @@ type Cluster interface {

// HandleStatsAsync handles the flow asynchronously.
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckWriteAsync(statistics.NewCheckWritePeerTask(region))
checkWritePeerTask := func(cache *statistics.HotPeerCache) {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
stats := cache.CheckPeerFlow(region, region.GetPeers(), region.GetWriteLoads(), interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}

checkExpiredTask := func(cache *statistics.HotPeerCache) {
expiredStats := cache.CollectExpiredItems(region)
for _, stat := range expiredStats {
cache.UpdateStat(stat)
}
}

c.GetHotStat().CheckWriteAsync(checkExpiredTask)
c.GetHotStat().CheckReadAsync(checkExpiredTask)
c.GetHotStat().CheckWriteAsync(checkWritePeerTask)
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

Expand Down
91 changes: 37 additions & 54 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
// the properties are Read-Only once created except buckets.
// the `buckets` could be modified by the request `report buckets` with greater version.
type RegionInfo struct {
term uint64
meta *metapb.Region
learners []*metapb.Peer
witnesses []*metapb.Peer
voters []*metapb.Peer
leader *metapb.Peer
downPeers []*pdpb.PeerStats
pendingPeers []*metapb.Peer
term uint64
cpuUsage uint64
writtenBytes uint64
writtenKeys uint64
Expand Down Expand Up @@ -136,26 +136,22 @@ func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCre

// classifyVoterAndLearner sorts out voter and learner from peers into different slice.
func classifyVoterAndLearner(region *RegionInfo) {
learners := make([]*metapb.Peer, 0, 1)
voters := make([]*metapb.Peer, 0, len(region.meta.Peers))
witnesses := make([]*metapb.Peer, 0, 1)
region.learners = make([]*metapb.Peer, 0, 1)
region.voters = make([]*metapb.Peer, 0, len(region.meta.Peers))
region.witnesses = make([]*metapb.Peer, 0, 1)
for _, p := range region.meta.Peers {
if IsLearner(p) {
learners = append(learners, p)
region.learners = append(region.learners, p)
} else {
voters = append(voters, p)
region.voters = append(region.voters, p)
}
// Whichever peer role can be a witness
if IsWitness(p) {
witnesses = append(witnesses, p)
region.witnesses = append(region.witnesses, p)
}
}
sort.Sort(peerSlice(learners))
sort.Sort(peerSlice(voters))
sort.Sort(peerSlice(witnesses))
region.learners = learners
region.voters = voters
region.witnesses = witnesses
sort.Sort(peerSlice(region.learners))
sort.Sort(peerSlice(region.voters))
sort.Sort(peerSlice(region.witnesses))
}

// peersEqualTo returns true when the peers are not changed, which may caused by: the region leader not changed,
Expand Down Expand Up @@ -213,7 +209,7 @@ type RegionHeartbeatRequest interface {
}

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, flowRoundDivisor int) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
// The size of empty region will be correct by the previous RegionInfo.
Expand All @@ -223,20 +219,21 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO
}

region := &RegionInfo{
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
flowRoundDivisor: uint64(flowRoundDivisor),
}

// scheduling service doesn't need the following fields.
Expand All @@ -246,10 +243,6 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO
region.cpuUsage = h.GetCpuUsage()
}

for _, opt := range opts {
opt(region)
}

if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize {
region.writtenKeys = 0
region.writtenBytes = 0
Expand Down Expand Up @@ -957,11 +950,11 @@ func (r *RegionsInfo) getRegionLocked(regionID uint64) *RegionInfo {
func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
r.t.Lock()
origin := r.getRegionLocked(region.GetID())
var ols []*regionItem
var ols []*RegionInfo
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
// return the state region to delete.
Expand All @@ -988,25 +981,17 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*Reg
return origin, overlaps, err
}

func convertItemsToRegions(items []*regionItem) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(items))
for _, item := range items {
regions = append(regions, item.RegionInfo)
}
return regions
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
var ols []*RegionInfo
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
Expand All @@ -1026,13 +1011,13 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R
func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
var ols []*RegionInfo
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
Expand Down Expand Up @@ -1123,7 +1108,7 @@ func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionI
if len(overlaps) == 0 {
// If the range has changed but the overlapped regions are not provided, collect them by `[]*regionItem`.
for _, item := range r.getOverlapRegionFromOverlapTreeLocked(region) {
r.removeRegionFromSubTreeLocked(item.RegionInfo)
r.removeRegionFromSubTreeLocked(item)
}
} else {
// Remove all provided overlapped regions from the subtrees.
Expand Down Expand Up @@ -1164,7 +1149,7 @@ func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionI
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*regionItem {
func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*RegionInfo {
return r.overlapTree.overlaps(&regionItem{RegionInfo: region})
}

Expand All @@ -1174,9 +1159,7 @@ func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo
defer r.t.RUnlock()
origin = r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
for _, item := range r.tree.overlaps(&regionItem{RegionInfo: region}) {
overlaps = append(overlaps, item.RegionInfo)
}
return origin, r.tree.overlaps(&regionItem{RegionInfo: region})
}
return
}
Expand Down Expand Up @@ -1211,7 +1194,7 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo,
return r.setRegionLocked(region, false)
}

func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*regionItem) (*RegionInfo, []*RegionInfo, bool) {
func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*RegionInfo) (*RegionInfo, []*RegionInfo, bool) {
var (
item *regionItem // Pointer to the *RegionInfo of this ID.
origin *RegionInfo
Expand Down Expand Up @@ -1311,7 +1294,7 @@ func (r *RegionsInfo) TreeLen() int {
}

// GetOverlaps returns the regions which are overlapped with the specified region range.
func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*regionItem {
func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.overlaps(&regionItem{RegionInfo: region})
Expand Down
8 changes: 5 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,19 @@ func TestSortedEqual(t *testing.T) {
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
}

flowRoundDivisor := 3
// test RegionFromHeartbeat
for _, testCase := range testCases {
regionA := RegionFromHeartbeat(&pdpb.RegionHeartbeatRequest{
Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsA)},
DownPeers: pickPeerStats(testCase.idsA),
PendingPeers: pickPeers(testCase.idsA),
})
}, flowRoundDivisor)
regionB := RegionFromHeartbeat(&pdpb.RegionHeartbeatRequest{
Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsB)},
DownPeers: pickPeerStats(testCase.idsB),
PendingPeers: pickPeers(testCase.idsB),
})
}, flowRoundDivisor)
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetPendingPeers(), regionB.GetPendingPeers()))
Expand Down Expand Up @@ -950,9 +951,10 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) {
PendingPeers: []*metapb.Peer{peers[1]},
DownPeers: []*pdpb.PeerStats{{Peer: peers[2], DownSeconds: 100}},
}
flowRoundDivisor := 3
b.ResetTimer()
for i := 0; i < b.N; i++ {
RegionFromHeartbeat(regionReq)
RegionFromHeartbeat(regionReq, flowRoundDivisor)
}
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (t *regionTree) notFromStorageRegionsCount() int {
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (t *regionTree) overlaps(item *regionItem) []*regionItem {
func (t *regionTree) overlaps(item *regionItem) []*RegionInfo {
// note that Find() gets the last item that is less or equal than the item.
// in the case: |_______a_______|_____b_____|___c___|
// new item is |______d______|
Expand All @@ -116,12 +116,12 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem {
result = item
}
endKey := item.GetEndKey()
var overlaps []*regionItem
var overlaps []*RegionInfo
t.tree.AscendGreaterOrEqual(result, func(i *regionItem) bool {
if len(endKey) > 0 && bytes.Compare(endKey, i.GetStartKey()) <= 0 {
return false
}
overlaps = append(overlaps, i)
overlaps = append(overlaps, i.RegionInfo)
return true
})
return overlaps
Expand All @@ -130,7 +130,7 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// update updates the tree with the region.
// It finds and deletes all the overlapped regions first, and then
// insert the region.
func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*regionItem) []*RegionInfo {
func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*RegionInfo) []*RegionInfo {
region := item.RegionInfo
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
Expand All @@ -145,15 +145,15 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
}

for _, old := range overlaps {
t.tree.Delete(old)
t.tree.Delete(&regionItem{RegionInfo: old})
}
t.tree.ReplaceOrInsert(item)
if t.countRef {
item.RegionInfo.IncRef()
}
result := make([]*RegionInfo, len(overlaps))
for i, overlap := range overlaps {
old := overlap.RegionInfo
old := overlap
result[i] = old
log.Debug("overlapping region",
zap.Uint64("region-id", old.GetID()),
Expand All @@ -174,7 +174,7 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
return result
}

// updateStat is used to update statistics when regionItem.RegionInfo is directly replaced.
// updateStat is used to update statistics when RegionInfo is directly replaced.
func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func TestRegionTree(t *testing.T) {
updateNewItem(tree, regionA)
updateNewItem(tree, regionC)
re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c"))))
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0])
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1])
re.Nil(tree.search([]byte{}))
re.Equal(regionA, tree.search([]byte("a")))
re.Nil(tree.search([]byte("b")))
Expand Down
16 changes: 14 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,23 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
c.hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{peer}, loads, interval))
checkReadPeerTask := func(cache *statistics.HotPeerCache) {
stats := cache.CheckPeerFlow(region, []*metapb.Peer{peer}, loads, interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}
c.hotStat.CheckReadAsync(checkReadPeerTask)
}

// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
collectUnReportedPeerTask := func(cache *statistics.HotPeerCache) {
stats := cache.CheckColdPeer(storeID, regions, interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}
c.hotStat.CheckReadAsync(collectUnReportedPeerTask)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request)
// scheduling service doesn't sync the pd server config, so we use 0 here
region := core.RegionFromHeartbeat(request, 0)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
Loading

0 comments on commit 3e34b80

Please sign in to comment.