Skip to content

Commit

Permalink
statistics: refactor (tikv#1750)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2550168)
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
Luffbee authored and lhy1024 committed Oct 11, 2019
1 parent 3699220 commit b4d8575
Show file tree
Hide file tree
Showing 20 changed files with 622 additions and 578 deletions.
24 changes: 12 additions & 12 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Cluster struct {
*core.BasicCluster
*mockid.IDAllocator
*mockoption.ScheduleOptions
*statistics.HotSpotCache
*statistics.HotCache
*statistics.StoresStats
ID uint64
}
Expand All @@ -45,7 +45,7 @@ func NewCluster(opt *mockoption.ScheduleOptions) *Cluster {
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
ScheduleOptions: opt,
HotSpotCache: statistics.NewHotSpotCache(),
HotCache: statistics.NewHotCache(),
StoresStats: statistics.NewStoresStats(),
}
}
Expand Down Expand Up @@ -78,22 +78,22 @@ func (mc *Cluster) GetStore(storeID uint64) *core.StoreInfo {

// IsRegionHot checks if the region is hot.
func (mc *Cluster) IsRegionHot(region *core.RegionInfo) bool {
return mc.HotSpotCache.IsRegionHot(region, mc.GetHotRegionCacheHitsThreshold())
return mc.HotCache.IsRegionHot(region, mc.GetHotRegionCacheHitsThreshold())
}

// RegionReadStats returns hot region's read stats.
func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotSpotPeerStat {
return mc.HotSpotCache.RegionStats(statistics.ReadFlow)
func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
return mc.HotCache.RegionStats(statistics.ReadFlow)
}

// RegionWriteStats returns hot region's write stats.
func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotSpotPeerStat {
return mc.HotSpotCache.RegionStats(statistics.WriteFlow)
func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
return mc.HotCache.RegionStats(statistics.WriteFlow)
}

// RandHotRegionFromStore random picks a hot region in specify store.
func (mc *Cluster) RandHotRegionFromStore(store uint64, kind statistics.FlowKind) *core.RegionInfo {
r := mc.HotSpotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold())
r := mc.HotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold())
if r == nil {
return nil
}
Expand Down Expand Up @@ -244,9 +244,9 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64,
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReportInterval(reportInterval))
items := mc.HotSpotCache.CheckRead(r, mc.StoresStats)
items := mc.HotCache.CheckRead(r, mc.StoresStats)
for _, item := range items {
mc.HotSpotCache.Update(item)
mc.HotCache.Update(item)
}
mc.PutRegion(r)
}
Expand All @@ -256,9 +256,9 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetWrittenBytes(writtenBytes))
r = r.Clone(core.SetReportInterval(reportInterval))
items := mc.HotSpotCache.CheckWrite(r, mc.StoresStats)
items := mc.HotCache.CheckWrite(r, mc.StoresStats)
for _, item := range items {
mc.HotSpotCache.Update(item)
mc.HotCache.Update(item)
}
mc.PutRegion(r)
}
Expand Down
4 changes: 2 additions & 2 deletions server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func (h *trendHandler) getStoreFlow(stats statistics.StoreHotRegionsStat, storeI
return
}
if stat, ok := stats[storeID]; ok {
storeFlow = stat.TotalFlowBytes
storeFlow = stat.TotalBytesRate
for _, flow := range stat.RegionsStat {
regionFlows = append(regionFlows, flow.FlowBytes)
regionFlows = append(regionFlows, flow.BytesRate)
}
}
return
Expand Down
12 changes: 6 additions & 6 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type RaftCluster struct {
labelLevelStats *statistics.LabelStatistics
regionStats *statistics.RegionStatistics
storesStats *statistics.StoresStats
hotSpotCache *statistics.HotSpotCache
hotSpotCache *statistics.HotCache

coordinator *coordinator

Expand Down Expand Up @@ -148,7 +148,7 @@ func (c *RaftCluster) initCluster(id id.Allocator, opt *config.ScheduleOption, s
c.storesStats = statistics.NewStoresStats()
c.prepareChecker = newPrepareChecker()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.hotSpotCache = statistics.NewHotSpotCache()
c.hotSpotCache = statistics.NewHotCache()
}

func (c *RaftCluster) start() error {
Expand Down Expand Up @@ -1351,24 +1351,24 @@ func (c *RaftCluster) getStoresKeysReadStat() map[uint64]uint64 {
}

// RegionReadStats returns hot region's read stats.
func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotSpotPeerStat {
func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
// RegionStats is a thread-safe method
return c.hotSpotCache.RegionStats(statistics.ReadFlow)
}

// RegionWriteStats returns hot region's write stats.
func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotSpotPeerStat {
func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
// RegionStats is a thread-safe method
return c.hotSpotCache.RegionStats(statistics.WriteFlow)
}

// CheckWriteStatus checks the write status, returns whether need update statistics and item.
func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotSpotPeerStat {
func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotSpotCache.CheckWrite(region, c.storesStats)
}

// CheckReadStatus checks the read status, returns whether need update statistics and item.
func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotSpotPeerStat {
func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotSpotCache.CheckRead(region, c.storesStats)
}

Expand Down
6 changes: 3 additions & 3 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (c *coordinator) collectHotSpotMetrics() {
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsPeer[storeID]
if ok {
totalWriteBytes := float64(stat.TotalFlowBytes)
totalWriteBytes := float64(stat.TotalBytesRate)
hotWriteRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(totalWriteBytes)
Expand All @@ -339,7 +339,7 @@ func (c *coordinator) collectHotSpotMetrics() {

stat, ok = status.AsLeader[storeID]
if ok {
totalWriteBytes := float64(stat.TotalFlowBytes)
totalWriteBytes := float64(stat.TotalBytesRate)
hotWriteRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(totalWriteBytes)
Expand All @@ -358,7 +358,7 @@ func (c *coordinator) collectHotSpotMetrics() {
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsLeader[storeID]
if ok {
totalReadBytes := float64(stat.TotalFlowBytes)
totalReadBytes := float64(stat.TotalBytesRate)
hotReadRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(totalReadBytes)
Expand Down
2 changes: 1 addition & 1 deletion server/namespace_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (c *namespaceCluster) GetRegion(id uint64) *core.RegionInfo {
}

// RegionWriteStats returns hot region's write stats.
func (c *namespaceCluster) RegionWriteStats() map[uint64][]*statistics.HotSpotPeerStat {
func (c *namespaceCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
return c.Cluster.RegionWriteStats()
}

Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,11 +1170,11 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) {
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, uint64(2))
// check hot items
stats := tc.HotSpotCache.RegionStats(statistics.ReadFlow)
stats := tc.HotCache.RegionStats(statistics.ReadFlow)
c.Assert(len(stats), Equals, 2)
for _, ss := range stats {
for _, s := range ss {
c.Assert(s.FlowBytes, Equals, uint64(512*1024))
c.Assert(s.BytesRate, Equals, uint64(512*1024))
}
}
// Will transfer a hot region leader from store 1 to store 3, because the total count of peers
Expand Down
44 changes: 22 additions & 22 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu
return nil
}

func calcScore(storeItems map[uint64][]*statistics.HotSpotPeerStat, cluster schedule.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat {
func calcScore(storeItems map[uint64][]*statistics.HotPeerStat, cluster schedule.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat {
stats := make(statistics.StoreHotRegionsStat)
for storeID, items := range storeItems {
// HotDegree is the update times on the hot cache. If the heartbeat report
Expand All @@ -246,21 +246,21 @@ func calcScore(storeItems map[uint64][]*statistics.HotSpotPeerStat, cluster sche
storeStat, ok := stats[storeID]
if !ok {
storeStat = &statistics.HotRegionsStat{
RegionsStat: make(statistics.RegionsStat, 0, storeHotRegionsDefaultLen),
RegionsStat: make([]statistics.HotPeerStat, 0, storeHotRegionsDefaultLen),
}
stats[storeID] = storeStat
}

s := statistics.HotSpotPeerStat{
s := statistics.HotPeerStat{
StoreID: storeID,
RegionID: r.RegionID,
FlowBytes: uint64(r.Stats.Median()),
HotDegree: r.HotDegree,
LastUpdateTime: r.LastUpdateTime,
StoreID: storeID,
AntiCount: r.AntiCount,
BytesRate: uint64(r.RollingBytesRate.Median()),
LastUpdateTime: r.LastUpdateTime,
Version: r.Version,
}
storeStat.TotalFlowBytes += r.FlowBytes
storeStat.TotalBytesRate += r.BytesRate
storeStat.RegionsCount++
storeStat.RegionsStat = append(storeStat.RegionsStat, s)
}
Expand All @@ -284,7 +284,7 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto
// If we can find a target store, then return from this method.
stores := cluster.GetStores()
var destStoreID uint64
for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) {
for _, i := range h.r.Perm(len(storesStat[srcStoreID].RegionsStat)) {
rs := storesStat[srcStoreID].RegionsStat[i]
srcRegion := cluster.GetRegion(rs.RegionID)
if srcRegion == nil {
Expand Down Expand Up @@ -320,7 +320,7 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto
candidateStoreIDs = append(candidateStoreIDs, store.GetID())
}

destStoreID = h.selectDestStore(candidateStoreIDs, rs.FlowBytes, srcStoreID, storesStat)
destStoreID = h.selectDestStore(candidateStoreIDs, rs.BytesRate, srcStoreID, storesStat)
if destStoreID != 0 {
h.peerLimit = h.adjustBalanceLimit(srcStoreID, storesStat)

Expand Down Expand Up @@ -356,7 +356,7 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s
}

// select destPeer
for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) {
for _, i := range h.r.Perm(len(storesStat[srcStoreID].RegionsStat)) {
rs := storesStat[srcStoreID].RegionsStat[i]
srcRegion := cluster.GetRegion(rs.RegionID)
if srcRegion == nil {
Expand All @@ -379,7 +379,7 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s
if len(candidateStoreIDs) == 0 {
continue
}
destStoreID := h.selectDestStore(candidateStoreIDs, rs.FlowBytes, srcStoreID, storesStat)
destStoreID := h.selectDestStore(candidateStoreIDs, rs.BytesRate, srcStoreID, storesStat)
if destStoreID == 0 {
continue
}
Expand All @@ -404,7 +404,7 @@ func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotReg
)

for storeID, statistics := range stats {
count, flowBytes := statistics.RegionsStat.Len(), statistics.TotalFlowBytes
count, flowBytes := len(statistics.RegionsStat), statistics.TotalBytesRate
if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && flowBytes > maxFlowBytes)) {
maxHotStoreRegionCount = count
maxFlowBytes = flowBytes
Expand All @@ -418,24 +418,24 @@ func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotReg
// We choose a target store based on the hot region number and flow bytes of this store.
func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionFlowBytes uint64, srcStoreID uint64, storesStat statistics.StoreHotRegionsStat) (destStoreID uint64) {
sr := storesStat[srcStoreID]
srcFlowBytes := sr.TotalFlowBytes
srcHotRegionsCount := sr.RegionsStat.Len()
srcFlowBytes := sr.TotalBytesRate
srcHotRegionsCount := len(sr.RegionsStat)

var (
minFlowBytes uint64 = math.MaxUint64
minRegionsCount = int(math.MaxInt32)
)
for _, storeID := range candidateStoreIDs {
if s, ok := storesStat[storeID]; ok {
if srcHotRegionsCount-s.RegionsStat.Len() > 1 && minRegionsCount > s.RegionsStat.Len() {
if srcHotRegionsCount-len(s.RegionsStat) > 1 && minRegionsCount > len(s.RegionsStat) {
destStoreID = storeID
minFlowBytes = s.TotalFlowBytes
minRegionsCount = s.RegionsStat.Len()
minFlowBytes = s.TotalBytesRate
minRegionsCount = len(s.RegionsStat)
continue
}
if minRegionsCount == s.RegionsStat.Len() && minFlowBytes > s.TotalFlowBytes &&
uint64(float64(srcFlowBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*regionFlowBytes {
minFlowBytes = s.TotalFlowBytes
if minRegionsCount == len(s.RegionsStat) && minFlowBytes > s.TotalBytesRate &&
uint64(float64(srcFlowBytes)*hotRegionScheduleFactor) > s.TotalBytesRate+2*regionFlowBytes {
minFlowBytes = s.TotalBytesRate
destStoreID = storeID
}
} else {
Expand All @@ -451,12 +451,12 @@ func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesSt

var hotRegionTotalCount float64
for _, m := range storesStat {
hotRegionTotalCount += float64(m.RegionsStat.Len())
hotRegionTotalCount += float64(len(m.RegionsStat))
}

avgRegionCount := hotRegionTotalCount / float64(len(storesStat))
// Multiplied by hotRegionLimitFactor to avoid transfer back and forth
limit := uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor)
limit := uint64((float64(len(srcStoreStatistics.RegionsStat)) - avgRegionCount) * hotRegionLimitFactor)
return maxUint64(limit, 1)
}

Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ BalanceType, cluster schedule.C

func (s *shuffleHotRegionScheduler) randomSchedule(cluster schedule.Cluster, storeStats statistics.StoreHotRegionsStat) []*operator.Operator {
for _, stats := range storeStats {
i := s.r.Intn(stats.RegionsStat.Len())
i := s.r.Intn(len(stats.RegionsStat))
r := stats.RegionsStat[i]
// select src region
srcRegion := cluster.GetRegion(r.RegionID)
Expand Down
33 changes: 33 additions & 0 deletions server/statistics/flowkind.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

// FlowKind is a identify Flow types.
type FlowKind uint32

// Flags for flow.
const (
WriteFlow FlowKind = iota
ReadFlow
)

func (k FlowKind) String() string {
switch k {
case WriteFlow:
return "write"
case ReadFlow:
return "read"
}
return "unimplemented"
}
Loading

0 comments on commit b4d8575

Please sign in to comment.