Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: new hotspot scheduler basic part #1870

Merged
merged 14 commits into from
Nov 4, 2019
7 changes: 7 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {
mc.PutRegion(r)
}

// GetStoresStats gets stores statistics.
func (mc *Cluster) GetStoresStats() *statistics.StoresStats {
return mc.StoresStats
}

// GetStoreRegionCount gets region count with a given store.
func (mc *Cluster) GetStoreRegionCount(storeID uint64) int {
return mc.Regions.GetStoreRegionCount(storeID)
Expand Down Expand Up @@ -356,6 +361,7 @@ func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

Expand All @@ -368,6 +374,7 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) {
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

Expand Down
6 changes: 3 additions & 3 deletions server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) {
}

func (h *trendHandler) getTrendStores() ([]trendStore, error) {
var readStats, writeStats statistics.StoreHotRegionsStat
var readStats, writeStats statistics.StoreHotPeersStat
if hotRead := h.GetHotReadRegions(); hotRead != nil {
readStats = hotRead.AsLeader
}
Expand Down Expand Up @@ -140,13 +140,13 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) {
return trendStores, nil
}

func (h *trendHandler) getStoreFlow(stats statistics.StoreHotRegionsStat, storeID uint64) (storeFlow float64, regionFlows []float64) {
func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, storeID uint64) (storeFlow float64, regionFlows []float64) {
if stats == nil {
return
}
if stat, ok := stats[storeID]; ok {
storeFlow = stat.TotalBytesRate
for _, flow := range stat.RegionsStat {
for _, flow := range stat.Stats {
regionFlows = append(regionFlows, flow.GetBytesRate())
}
}
Expand Down
14 changes: 7 additions & 7 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ func (c *coordinator) stop() {
// Hack to retrieve info from scheduler.
// TODO: remove it.
type hasHotStatus interface {
GetHotReadStatus() *statistics.StoreHotRegionInfos
GetHotWriteStatus() *statistics.StoreHotRegionInfos
GetHotReadStatus() *statistics.StoreHotPeersInfos
GetHotWriteStatus() *statistics.StoreHotPeersInfos
}

func (c *coordinator) getHotWriteRegions() *statistics.StoreHotRegionInfos {
func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos {
c.RLock()
defer c.RUnlock()
s, ok := c.schedulers[hotRegionScheduleName]
Expand All @@ -277,7 +277,7 @@ func (c *coordinator) getHotWriteRegions() *statistics.StoreHotRegionInfos {
return nil
}

func (c *coordinator) getHotReadRegions() *statistics.StoreHotRegionInfos {
func (c *coordinator) getHotReadRegions() *statistics.StoreHotPeersInfos {
c.RLock()
defer c.RUnlock()
s, ok := c.schedulers[hotRegionScheduleName]
Expand Down Expand Up @@ -336,7 +336,7 @@ func (c *coordinator) collectHotSpotMetrics() {
stat, ok := status.AsPeer[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.RegionsCount))
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0)
Expand All @@ -345,7 +345,7 @@ func (c *coordinator) collectHotSpotMetrics() {
stat, ok = status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.RegionsCount))
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0)
Expand All @@ -361,7 +361,7 @@ func (c *coordinator) collectHotSpotMetrics() {
stat, ok := status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.RegionsCount))
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
Expand Down
4 changes: 2 additions & 2 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (h *Handler) GetStores() ([]*core.StoreInfo, error) {
}

// GetHotWriteRegions gets all hot write regions stats.
func (h *Handler) GetHotWriteRegions() *statistics.StoreHotRegionInfos {
func (h *Handler) GetHotWriteRegions() *statistics.StoreHotPeersInfos {
c, err := h.getCoordinator()
if err != nil {
return nil
Expand All @@ -135,7 +135,7 @@ func (h *Handler) GetHotWriteRegions() *statistics.StoreHotRegionInfos {
}

// GetHotReadRegions gets all hot read regions stats.
func (h *Handler) GetHotReadRegions() *statistics.StoreHotRegionInfos {
func (h *Handler) GetHotReadRegions() *statistics.StoreHotPeersInfos {
c, err := h.getCoordinator()
if err != nil {
return nil
Expand Down
1 change: 1 addition & 0 deletions server/schedule/opt/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Cluster interface {
core.StoreSetController

statistics.RegionStatInformer
statistics.StoreStatInformer
Options

// TODO: it should be removed. Schedulers don't need to know anything
Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *testBalanceSpeedSuite) TestTolerantRatio(c *C) {
tc := mockcluster.NewCluster(opt)
// create a region to control average region size.
tc.AddLeaderRegion(1, 1, 2)
regionSize := int64(96 * 1024)
regionSize := int64(96 * KB)
region := tc.GetRegion(1).Clone(core.SetApproximateSize(regionSize))

tc.TolerantSizeRatio = 0
Expand Down Expand Up @@ -931,10 +931,10 @@ func (s *testReplicaCheckerSuite) TestStorageThreshold(c *C) {

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.UpdateStorageRatio(1, 0.5, 0.5)
tc.UpdateStoreRegionSize(1, 500*1024*1024)
tc.UpdateStoreRegionSize(1, 500*MB)
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
tc.UpdateStorageRatio(2, 0.1, 0.9)
tc.UpdateStoreRegionSize(2, 100*1024*1024)
tc.UpdateStoreRegionSize(2, 100*MB)
tc.AddLabelsStore(3, 1, map[string]string{"zone": "z2"})
tc.AddLabelsStore(4, 0, map[string]string{"zone": "z3"})

Expand Down
Loading