Skip to content

Commit

Permalink
batch process the peer task
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed May 24, 2024
1 parent 52e1681 commit 9d88e55
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 45 deletions.
4 changes: 3 additions & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ type Cluster interface {
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads()))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetPeers(), region.GetWriteLoads(), interval))
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -442,7 +443,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads))
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval))

Check warning on line 446 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L446

Added line #L446 was not covered by tests
}

// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
Expand Down
12 changes: 9 additions & 3 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,23 +894,29 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee
items := make([]*statistics.HotPeerStat, 0)
expiredItems := mc.HotCache.ExpiredReadItems(region)
items = append(items, expiredItems...)
return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...)
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
return append(items, mc.HotCache.CheckReadPeerSync(region, region.GetPeers(), region.GetLoads(), interval)...)
}

// CheckRegionWrite checks region write info with all peers
func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPeerStat {
items := make([]*statistics.HotPeerStat, 0)
expiredItems := mc.HotCache.ExpiredWriteItems(region)
items = append(items, expiredItems...)
return append(items, mc.HotCache.CheckWritePeerSync(region, nil)...)
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
return append(items, mc.HotCache.CheckWritePeerSync(region, region.GetPeers(), region.GetLoads(), interval)...)
}

// CheckRegionLeaderRead checks region read info with leader peer
func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics.HotPeerStat {
items := make([]*statistics.HotPeerStat, 0)
expiredItems := mc.HotCache.ExpiredReadItems(region)
items = append(items, expiredItems...)
return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...)
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
return append(items, mc.HotCache.CheckReadPeerSync(region, []*metapb.Peer{region.GetLeader()}, region.GetLoads(), interval)...)
}

// ObserveRegionsStats records the current stores stats from region stats.
Expand Down
9 changes: 5 additions & 4 deletions pkg/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package statistics
import (
"context"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/smallnest/chanx"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/statistics/utils"
Expand Down Expand Up @@ -172,14 +173,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) {

// CheckWritePeerSync checks the write status, returns update items.
// This is used for mockcluster, for test purpose.
func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat {
return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads)
func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat {
return w.writeCache.checkPeerFlow(region, peers, loads, interval)
}

// CheckReadPeerSync checks the read status, returns update items.
// This is used for mockcluster, for test purpose.
func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat {
return w.readCache.checkPeerFlow(region, region.GetPeers(), loads)
func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat {
return w.readCache.checkPeerFlow(region, peers, loads, interval)
}

// ExpiredReadItems returns the read items which are already expired.
Expand Down
9 changes: 7 additions & 2 deletions pkg/statistics/hot_cache_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package statistics
import (
"context"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
)

Expand All @@ -27,19 +28,23 @@ type FlowItemTask interface {

type checkPeerTask struct {
regionInfo *core.RegionInfo
peers []*metapb.Peer
loads []float64
interval uint64
}

// NewCheckPeerTask creates task to update peerInfo
func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64) FlowItemTask {
func NewCheckPeerTask(regionInfo *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) FlowItemTask {
return &checkPeerTask{
regionInfo: regionInfo,
peers: peers,
loads: loads,
interval: interval,
}
}

func (t *checkPeerTask) runTask(cache *hotPeerCache) {
stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads)
stats := cache.checkPeerFlow(t.regionInfo, t.peers, t.loads, t.interval)
for _, stat := range stats {
cache.updateStat(stat)
}
Expand Down
14 changes: 5 additions & 9 deletions pkg/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,15 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt
// checkPeerFlow checks the flow information of a peer.
// Notice: checkPeerFlow couldn't be used concurrently.
// checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here.
func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64) []*HotPeerStat {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64, interval uint64) []*HotPeerStat {
if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose
return nil
}

if deltaLoads == nil {
deltaLoads = region.GetLoads()
}
f.collectPeerMetrics(deltaLoads, interval) // update metrics
regionID := region.GetID()

regionPeers := region.GetPeers()
var stats []*HotPeerStat
for _, peer := range peers {
storeID := peer.GetStoreId()
Expand Down Expand Up @@ -218,12 +214,12 @@ func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Pe
newItem := &HotPeerStat{
StoreID: storeID,
RegionID: regionID,
Loads: f.kind.GetLoadRatesFromPeer(peer, deltaLoads, interval),
Loads: f.kind.GetLoadRates(deltaLoads, interval),
isLeader: region.GetLeader().GetStoreId() == storeID,
actionType: utils.Update,
stores: make([]uint64, len(peers)),
stores: make([]uint64, len(regionPeers)),
}
for i, peer := range peers {
for i, peer := range regionPeers {
newItem.stores[i] = peer.GetStoreId()
}
if oldItem == nil {
Expand Down
38 changes: 21 additions & 17 deletions pkg/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer
}

func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
res = append(res, cache.collectExpiredItems(region)...)
return append(res, cache.checkPeerFlow(region, peers, region.GetLoads())...)
return append(res, cache.checkPeerFlow(region, peers, region.GetLoads(), interval)...)
}

func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat {
Expand Down Expand Up @@ -309,70 +311,69 @@ func TestUpdateHotPeerStat(t *testing.T) {
}()

// skip interval=0
region = region.Clone(core.SetReportInterval(0, 0))
interval := uint64(0)
deltaLoads := []float64{0.0, 0.0, 0.0}
utils.MinHotThresholds[utils.RegionReadBytes] = 0.0
utils.MinHotThresholds[utils.RegionReadKeys] = 0.0
utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0

newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Nil(newItem)

// new peer, interval is larger than report interval, but no hot
region = region.Clone(core.SetReportInterval(10, 0))
interval = 10
deltaLoads = []float64{0.0, 0.0, 0.0}
utils.MinHotThresholds[utils.RegionReadBytes] = 1.0
utils.MinHotThresholds[utils.RegionReadKeys] = 1.0
utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Nil(newItem)

// new peer, interval is less than report interval
region = region.Clone(core.SetReportInterval(4, 0))
interval = 4
deltaLoads = []float64{60.0, 60.0, 60.0}
utils.MinHotThresholds[utils.RegionReadBytes] = 0.0
utils.MinHotThresholds[utils.RegionReadKeys] = 0.0
utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.NotNil(newItem)
re.Equal(0, newItem[0].HotDegree)
re.Equal(0, newItem[0].AntiCount)
// sum of interval is less than report interval
region = region.Clone(core.SetReportInterval(4, 0))
deltaLoads = []float64{60.0, 60.0, 60.0}
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(0, newItem[0].HotDegree)
re.Equal(0, newItem[0].AntiCount)
// sum of interval is larger than report interval, and hot
newItem[0].AntiCount = utils.Read.DefaultAntiCount()
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(1, newItem[0].HotDegree)
re.Equal(2*m, newItem[0].AntiCount)
// sum of interval is less than report interval
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(1, newItem[0].HotDegree)
re.Equal(2*m, newItem[0].AntiCount)
// sum of interval is larger than report interval, and hot
region = region.Clone(core.SetReportInterval(10, 0))
interval = 10
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(2, newItem[0].HotDegree)
re.Equal(2*m, newItem[0].AntiCount)
// sum of interval is larger than report interval, and cold
utils.MinHotThresholds[utils.RegionReadBytes] = 10.0
utils.MinHotThresholds[utils.RegionReadKeys] = 10.0
utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(1, newItem[0].HotDegree)
re.Equal(2*m-1, newItem[0].AntiCount)
// sum of interval is larger than report interval, and cold
for i := 0; i < 2*m-1; i++ {
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
}
re.Less(newItem[0].HotDegree, 0)
re.Equal(0, newItem[0].AntiCount)
Expand Down Expand Up @@ -679,7 +680,7 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) {
StartTimestamp: start,
EndTimestamp: end,
}))
stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil)
stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), newRegion.GetLoads(), end-start)
for _, stat := range stats {
cache.updateStat(stat)
}
Expand Down Expand Up @@ -709,6 +710,9 @@ func BenchmarkCheckRegionFlow(b *testing.B) {
region := buildRegion(utils.Read, 3, 10)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.checkPeerFlow(region, region.GetPeers(), nil)
stats := cache.checkPeerFlow(region, region.GetPeers(), region.GetLoads(), 10)
for _, stat := range stats {
cache.updateStat(stat)
}
}
}
8 changes: 2 additions & 6 deletions pkg/statistics/utils/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@

package utils

import (
"github.com/pingcap/kvproto/pkg/metapb"
)

const (
// BytePriority indicates hot-region-scheduler prefer byte dim
BytePriority = "byte"
Expand Down Expand Up @@ -230,8 +226,8 @@ func (rw RWType) DefaultAntiCount() int {
}
}

// GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo.
func (rw RWType) GetLoadRatesFromPeer(peer *metapb.Peer, deltaLoads []float64, interval uint64) []float64 {
// GetLoadRates gets the load rates of the read or write type.
func (rw RWType) GetLoadRates(deltaLoads []float64, interval uint64) []float64 {
loads := make([]float64, DimLen)
for dim, k := range rw.RegionStats() {
loads[dim] = deltaLoads[k] / float64(interval)
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads))
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval))
}
}
for _, stat := range stats.GetSnapshotStats() {
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/tests/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) {
region := core.NewRegionInfo(&metapb.Region{
Id: hotRegionID,
}, leader)
hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads))
hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{leader}, loads, reportInterval))
testutil.Eventually(re, func() bool {
hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID)
return hotPeerStat != nil
Expand Down

0 comments on commit 9d88e55

Please sign in to comment.