Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: new hotspot scheduler basic part #1870

Merged
merged 14 commits into from
Nov 4, 2019
7 changes: 7 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {
mc.PutRegion(r)
}

// GetStoresStats gets stores statistics.
func (mc *Cluster) GetStoresStats() *statistics.StoresStats {
return mc.StoresStats
}

// GetStoreRegionCount gets region count with a given store.
func (mc *Cluster) GetStoreRegionCount(storeID uint64) int {
return mc.Regions.GetStoreRegionCount(storeID)
Expand Down Expand Up @@ -356,6 +361,7 @@ func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

Expand All @@ -368,6 +374,7 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) {
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

Expand Down
1 change: 1 addition & 0 deletions server/schedule/opt/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Cluster interface {
core.StoreSetController

statistics.RegionStatInformer
statistics.StoreStatInformer
Options

// TODO: it should be removed. Schedulers don't need to know anything
Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *testBalanceSpeedSuite) TestTolerantRatio(c *C) {
tc := mockcluster.NewCluster(opt)
// create a region to control average region size.
tc.AddLeaderRegion(1, 1, 2)
regionSize := int64(96 * 1024)
regionSize := int64(96 * KB)
region := tc.GetRegion(1).Clone(core.SetApproximateSize(regionSize))

tc.TolerantSizeRatio = 0
Expand Down Expand Up @@ -931,10 +931,10 @@ func (s *testReplicaCheckerSuite) TestStorageThreshold(c *C) {

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.UpdateStorageRatio(1, 0.5, 0.5)
tc.UpdateStoreRegionSize(1, 500*1024*1024)
tc.UpdateStoreRegionSize(1, 500*MB)
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
tc.UpdateStorageRatio(2, 0.1, 0.9)
tc.UpdateStoreRegionSize(2, 100*1024*1024)
tc.UpdateStoreRegionSize(2, 100*MB)
tc.AddLabelsStore(3, 1, map[string]string{"zone": "z2"})
tc.AddLabelsStore(4, 0, map[string]string{"zone": "z3"})

Expand Down
80 changes: 40 additions & 40 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,14 @@ func (h *balanceHotRegionsScheduler) Schedule(cluster opt.Cluster) []*operator.O
func (h *balanceHotRegionsScheduler) dispatch(typ BalanceType, cluster opt.Cluster) []*operator.Operator {
h.Lock()
defer h.Unlock()
storesStat := cluster.GetStoresStats()
switch typ {
case hotReadRegionBalance:
h.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), cluster, core.LeaderKind)
h.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), storesStat.GetStoresBytesReadStat(), cluster, core.LeaderKind)
return h.balanceHotReadRegions(cluster)
case hotWriteRegionBalance:
h.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), cluster, core.LeaderKind)
h.stats.writeStatAsPeer = calcScore(cluster.RegionWriteStats(), cluster, core.RegionKind)
h.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), storesStat.GetStoresBytesWriteStat(), cluster, core.LeaderKind)
h.stats.writeStatAsPeer = calcScore(cluster.RegionWriteStats(), storesStat.GetStoresBytesWriteStat(), cluster, core.RegionKind)
return h.balanceHotWriteRegions(cluster)
}
return nil
Expand Down Expand Up @@ -230,17 +231,24 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster opt.Cluster)
return nil
}

func calcScore(storeItems map[uint64][]*statistics.HotPeerStat, cluster opt.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat {
func calcScore(storeHotPeers map[uint64][]*statistics.HotPeerStat, storeBytesStat map[uint64]float64, cluster opt.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat {
stats := make(statistics.StoreHotRegionsStat)
for storeID, items := range storeItems {
// HotDegree is the update times on the hot cache. If the heartbeat report
// the flow of the region exceeds the threshold, the scheduler will update the region in
// the hot cache and the hotdegree of the region will increase.
for storeID, items := range storeHotPeers {
storeHots, ok := stats[storeID]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storeHots sounds not good enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about hotRegions? Or hotPeers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are better than storeHots and I prefer to unify the naming of HotPeerStat and storeHotRegionsDefaultLen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified similar names.

if !ok {
storeHots = &statistics.HotRegionsStat{
RegionsStat: make([]statistics.HotPeerStat, 0, storeHotRegionsDefaultLen),
}
stats[storeID] = storeHots
}

for _, r := range items {
if kind == core.LeaderKind && !r.IsLeader() {
continue
}
// HotDegree is the update times on the hot cache. If the heartbeat report
// the flow of the region exceeds the threshold, the scheduler will update the region in
// the hot cache and the hotdegree of the region will increase.
if r.HotDegree < cluster.GetHotRegionCacheHitsThreshold() {
continue
}
Expand All @@ -250,14 +258,6 @@ func calcScore(storeItems map[uint64][]*statistics.HotPeerStat, cluster opt.Clus
continue
}

storeStat, ok := stats[storeID]
if !ok {
storeStat = &statistics.HotRegionsStat{
RegionsStat: make([]statistics.HotPeerStat, 0, storeHotRegionsDefaultLen),
}
stats[storeID] = storeStat
}

s := statistics.HotPeerStat{
StoreID: storeID,
RegionID: r.RegionID,
Expand All @@ -267,9 +267,13 @@ func calcScore(storeItems map[uint64][]*statistics.HotPeerStat, cluster opt.Clus
LastUpdateTime: r.LastUpdateTime,
Version: r.Version,
}
storeStat.TotalBytesRate += r.GetBytesRate()
storeStat.RegionsCount++
storeStat.RegionsStat = append(storeStat.RegionsStat, s)
storeHots.TotalBytesRate += r.GetBytesRate()
storeHots.RegionsCount++
storeHots.RegionsStat = append(storeHots.RegionsStat, s)
}

if rate, ok := storeBytesStat[storeID]; ok {
storeHots.StoreBytesRate = rate
}
}
return stats
Expand Down Expand Up @@ -406,14 +410,17 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster opt.Cluster, stores
// Inside these stores, we choose the one with maximum flow bytes.
func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotRegionsStat) (srcStoreID uint64) {
var (
maxFlowBytes float64
maxHotStoreRegionCount int
maxFlowBytes float64
maxCount int
)

for storeID, statistics := range stats {
count, flowBytes := len(statistics.RegionsStat), statistics.TotalBytesRate
if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && flowBytes > maxFlowBytes)) {
maxHotStoreRegionCount = count
for storeID, stat := range stats {
count, flowBytes := len(stat.RegionsStat), stat.StoreBytesRate
if count <= 1 {
continue
}
if flowBytes > maxFlowBytes || (flowBytes == maxFlowBytes && count > maxCount) {
maxCount = count
maxFlowBytes = flowBytes
srcStoreID = storeID
}
Expand All @@ -423,26 +430,19 @@ func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotReg

// selectDestStore selects a target store to hold the region of the source region.
// We choose a target store based on the hot region number and flow bytes of this store.
func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionFlowBytes float64, srcStoreID uint64, storesStat statistics.StoreHotRegionsStat) (destStoreID uint64) {
sr := storesStat[srcStoreID]
srcFlowBytes := sr.TotalBytesRate
srcHotRegionsCount := len(sr.RegionsStat)
func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionBytesRate float64, srcStoreID uint64, storesStat statistics.StoreHotRegionsStat) (destStoreID uint64) {
srcBytesRate := storesStat[srcStoreID].StoreBytesRate

var (
minFlowBytes float64 = math.MaxFloat64
minRegionsCount = int(math.MaxInt32)
minBytesRate float64 = srcBytesRate*hotRegionScheduleFactor - regionBytesRate
minCount = int(math.MaxInt32)
)
for _, storeID := range candidateStoreIDs {
if s, ok := storesStat[storeID]; ok {
if srcHotRegionsCount-len(s.RegionsStat) > 1 && minRegionsCount > len(s.RegionsStat) {
destStoreID = storeID
minFlowBytes = s.TotalBytesRate
minRegionsCount = len(s.RegionsStat)
continue
}
if minRegionsCount == len(s.RegionsStat) && minFlowBytes > s.TotalBytesRate &&
srcFlowBytes*hotRegionScheduleFactor > s.TotalBytesRate+2*regionFlowBytes {
minFlowBytes = s.TotalBytesRate
count, dstBytesRate := len(s.RegionsStat), s.StoreBytesRate
if minBytesRate > dstBytesRate || (minBytesRate == dstBytesRate && minCount > count) {
minCount = count
minBytesRate = dstBytesRate
destStoreID = storeID
}
} else {
Expand Down
Loading