diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index edfccedd54d..b51287edc3e 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -34,7 +34,7 @@ type Cluster struct { *core.BasicCluster *mockid.IDAllocator *mockoption.ScheduleOptions - *statistics.HotSpotCache + *statistics.HotCache *statistics.StoresStats ID uint64 } @@ -45,7 +45,7 @@ func NewCluster(opt *mockoption.ScheduleOptions) *Cluster { BasicCluster: core.NewBasicCluster(), IDAllocator: mockid.NewIDAllocator(), ScheduleOptions: opt, - HotSpotCache: statistics.NewHotSpotCache(), + HotCache: statistics.NewHotCache(), StoresStats: statistics.NewStoresStats(), } } @@ -78,22 +78,22 @@ func (mc *Cluster) GetStore(storeID uint64) *core.StoreInfo { // IsRegionHot checks if the region is hot. func (mc *Cluster) IsRegionHot(region *core.RegionInfo) bool { - return mc.HotSpotCache.IsRegionHot(region, mc.GetHotRegionCacheHitsThreshold()) + return mc.HotCache.IsRegionHot(region, mc.GetHotRegionCacheHitsThreshold()) } // RegionReadStats returns hot region's read stats. -func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotSpotPeerStat { - return mc.HotSpotCache.RegionStats(statistics.ReadFlow) +func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + return mc.HotCache.RegionStats(statistics.ReadFlow) } // RegionWriteStats returns hot region's write stats. -func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotSpotPeerStat { - return mc.HotSpotCache.RegionStats(statistics.WriteFlow) +func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + return mc.HotCache.RegionStats(statistics.WriteFlow) } // RandHotRegionFromStore random picks a hot region in specify store. func (mc *Cluster) RandHotRegionFromStore(store uint64, kind statistics.FlowKind) *core.RegionInfo { - r := mc.HotSpotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold()) + r := mc.HotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold()) if r == nil { return nil } @@ -244,9 +244,9 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r = r.Clone(core.SetReadBytes(readBytes)) r = r.Clone(core.SetReportInterval(reportInterval)) - items := mc.HotSpotCache.CheckRead(r, mc.StoresStats) + items := mc.HotCache.CheckRead(r, mc.StoresStats) for _, item := range items { - mc.HotSpotCache.Update(item) + mc.HotCache.Update(item) } mc.PutRegion(r) } @@ -256,9 +256,9 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64 r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r = r.Clone(core.SetWrittenBytes(writtenBytes)) r = r.Clone(core.SetReportInterval(reportInterval)) - items := mc.HotSpotCache.CheckWrite(r, mc.StoresStats) + items := mc.HotCache.CheckWrite(r, mc.StoresStats) for _, item := range items { - mc.HotSpotCache.Update(item) + mc.HotCache.Update(item) } mc.PutRegion(r) } diff --git a/server/api/trend.go b/server/api/trend.go index f98a4892bec..f33dcd1ef90 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -145,9 +145,9 @@ func (h *trendHandler) getStoreFlow(stats statistics.StoreHotRegionsStat, storeI return } if stat, ok := stats[storeID]; ok { - storeFlow = stat.TotalFlowBytes + storeFlow = stat.TotalBytesRate for _, flow := range stat.RegionsStat { - regionFlows = append(regionFlows, flow.FlowBytes) + regionFlows = append(regionFlows, flow.BytesRate) } } return diff --git a/server/cluster.go b/server/cluster.go index 2764702b8dc..dd44c26db45 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -74,7 +74,7 @@ type RaftCluster struct { labelLevelStats *statistics.LabelStatistics regionStats *statistics.RegionStatistics storesStats *statistics.StoresStats - hotSpotCache *statistics.HotSpotCache + hotSpotCache *statistics.HotCache coordinator *coordinator @@ -148,7 +148,7 @@ func (c *RaftCluster) initCluster(id id.Allocator, opt *config.ScheduleOption, s c.storesStats = statistics.NewStoresStats() c.prepareChecker = newPrepareChecker() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) - c.hotSpotCache = statistics.NewHotSpotCache() + c.hotSpotCache = statistics.NewHotCache() } func (c *RaftCluster) start() error { @@ -1351,24 +1351,24 @@ func (c *RaftCluster) getStoresKeysReadStat() map[uint64]uint64 { } // RegionReadStats returns hot region's read stats. -func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotSpotPeerStat { +func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { // RegionStats is a thread-safe method return c.hotSpotCache.RegionStats(statistics.ReadFlow) } // RegionWriteStats returns hot region's write stats. -func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotSpotPeerStat { +func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { // RegionStats is a thread-safe method return c.hotSpotCache.RegionStats(statistics.WriteFlow) } // CheckWriteStatus checks the write status, returns whether need update statistics and item. -func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotSpotPeerStat { +func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotPeerStat { return c.hotSpotCache.CheckWrite(region, c.storesStats) } // CheckReadStatus checks the read status, returns whether need update statistics and item. -func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotSpotPeerStat { +func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotPeerStat { return c.hotSpotCache.CheckRead(region, c.storesStats) } diff --git a/server/coordinator.go b/server/coordinator.go index 24a85754ca2..34c91afbf26 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -327,7 +327,7 @@ func (c *coordinator) collectHotSpotMetrics() { storeLabel := fmt.Sprintf("%d", storeID) stat, ok := status.AsPeer[storeID] if ok { - totalWriteBytes := float64(stat.TotalFlowBytes) + totalWriteBytes := float64(stat.TotalBytesRate) hotWriteRegionCount := float64(stat.RegionsCount) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(totalWriteBytes) @@ -339,7 +339,7 @@ func (c *coordinator) collectHotSpotMetrics() { stat, ok = status.AsLeader[storeID] if ok { - totalWriteBytes := float64(stat.TotalFlowBytes) + totalWriteBytes := float64(stat.TotalBytesRate) hotWriteRegionCount := float64(stat.RegionsCount) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(totalWriteBytes) @@ -358,7 +358,7 @@ func (c *coordinator) collectHotSpotMetrics() { storeLabel := fmt.Sprintf("%d", storeID) stat, ok := status.AsLeader[storeID] if ok { - totalReadBytes := float64(stat.TotalFlowBytes) + totalReadBytes := float64(stat.TotalBytesRate) hotReadRegionCount := float64(stat.RegionsCount) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(totalReadBytes) diff --git a/server/namespace_cluster.go b/server/namespace_cluster.go index 3b21816dfef..66a2b2a3713 100644 --- a/server/namespace_cluster.go +++ b/server/namespace_cluster.go @@ -126,7 +126,7 @@ func (c *namespaceCluster) GetRegion(id uint64) *core.RegionInfo { } // RegionWriteStats returns hot region's write stats. -func (c *namespaceCluster) RegionWriteStats() map[uint64][]*statistics.HotSpotPeerStat { +func (c *namespaceCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { return c.Cluster.RegionWriteStats() } diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 94f94ea3008..0da798b720c 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -1170,11 +1170,11 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, uint64(2)) // check hot items - stats := tc.HotSpotCache.RegionStats(statistics.ReadFlow) + stats := tc.HotCache.RegionStats(statistics.ReadFlow) c.Assert(len(stats), Equals, 2) for _, ss := range stats { for _, s := range ss { - c.Assert(s.FlowBytes, Equals, uint64(512*1024)) + c.Assert(s.BytesRate, Equals, uint64(512*1024)) } } // Will transfer a hot region leader from store 1 to store 3, because the total count of peers diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 449890622a9..a3531b676a4 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -223,7 +223,7 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu return nil } -func calcScore(storeItems map[uint64][]*statistics.HotSpotPeerStat, cluster schedule.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat { +func calcScore(storeItems map[uint64][]*statistics.HotPeerStat, cluster schedule.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat { stats := make(statistics.StoreHotRegionsStat) for storeID, items := range storeItems { // HotDegree is the update times on the hot cache. If the heartbeat report @@ -246,21 +246,21 @@ func calcScore(storeItems map[uint64][]*statistics.HotSpotPeerStat, cluster sche storeStat, ok := stats[storeID] if !ok { storeStat = &statistics.HotRegionsStat{ - RegionsStat: make(statistics.RegionsStat, 0, storeHotRegionsDefaultLen), + RegionsStat: make([]statistics.HotPeerStat, 0, storeHotRegionsDefaultLen), } stats[storeID] = storeStat } - s := statistics.HotSpotPeerStat{ + s := statistics.HotPeerStat{ + StoreID: storeID, RegionID: r.RegionID, - FlowBytes: uint64(r.Stats.Median()), HotDegree: r.HotDegree, - LastUpdateTime: r.LastUpdateTime, - StoreID: storeID, AntiCount: r.AntiCount, + BytesRate: uint64(r.RollingBytesRate.Median()), + LastUpdateTime: r.LastUpdateTime, Version: r.Version, } - storeStat.TotalFlowBytes += r.FlowBytes + storeStat.TotalBytesRate += r.BytesRate storeStat.RegionsCount++ storeStat.RegionsStat = append(storeStat.RegionsStat, s) } @@ -284,7 +284,7 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto // If we can find a target store, then return from this method. stores := cluster.GetStores() var destStoreID uint64 - for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) { + for _, i := range h.r.Perm(len(storesStat[srcStoreID].RegionsStat)) { rs := storesStat[srcStoreID].RegionsStat[i] srcRegion := cluster.GetRegion(rs.RegionID) if srcRegion == nil { @@ -320,7 +320,7 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto candidateStoreIDs = append(candidateStoreIDs, store.GetID()) } - destStoreID = h.selectDestStore(candidateStoreIDs, rs.FlowBytes, srcStoreID, storesStat) + destStoreID = h.selectDestStore(candidateStoreIDs, rs.BytesRate, srcStoreID, storesStat) if destStoreID != 0 { h.peerLimit = h.adjustBalanceLimit(srcStoreID, storesStat) @@ -356,7 +356,7 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s } // select destPeer - for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) { + for _, i := range h.r.Perm(len(storesStat[srcStoreID].RegionsStat)) { rs := storesStat[srcStoreID].RegionsStat[i] srcRegion := cluster.GetRegion(rs.RegionID) if srcRegion == nil { @@ -379,7 +379,7 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s if len(candidateStoreIDs) == 0 { continue } - destStoreID := h.selectDestStore(candidateStoreIDs, rs.FlowBytes, srcStoreID, storesStat) + destStoreID := h.selectDestStore(candidateStoreIDs, rs.BytesRate, srcStoreID, storesStat) if destStoreID == 0 { continue } @@ -404,7 +404,7 @@ func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotReg ) for storeID, statistics := range stats { - count, flowBytes := statistics.RegionsStat.Len(), statistics.TotalFlowBytes + count, flowBytes := len(statistics.RegionsStat), statistics.TotalBytesRate if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && flowBytes > maxFlowBytes)) { maxHotStoreRegionCount = count maxFlowBytes = flowBytes @@ -418,8 +418,8 @@ func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotReg // We choose a target store based on the hot region number and flow bytes of this store. func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionFlowBytes uint64, srcStoreID uint64, storesStat statistics.StoreHotRegionsStat) (destStoreID uint64) { sr := storesStat[srcStoreID] - srcFlowBytes := sr.TotalFlowBytes - srcHotRegionsCount := sr.RegionsStat.Len() + srcFlowBytes := sr.TotalBytesRate + srcHotRegionsCount := len(sr.RegionsStat) var ( minFlowBytes uint64 = math.MaxUint64 @@ -427,15 +427,15 @@ func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, ) for _, storeID := range candidateStoreIDs { if s, ok := storesStat[storeID]; ok { - if srcHotRegionsCount-s.RegionsStat.Len() > 1 && minRegionsCount > s.RegionsStat.Len() { + if srcHotRegionsCount-len(s.RegionsStat) > 1 && minRegionsCount > len(s.RegionsStat) { destStoreID = storeID - minFlowBytes = s.TotalFlowBytes - minRegionsCount = s.RegionsStat.Len() + minFlowBytes = s.TotalBytesRate + minRegionsCount = len(s.RegionsStat) continue } - if minRegionsCount == s.RegionsStat.Len() && minFlowBytes > s.TotalFlowBytes && - uint64(float64(srcFlowBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*regionFlowBytes { - minFlowBytes = s.TotalFlowBytes + if minRegionsCount == len(s.RegionsStat) && minFlowBytes > s.TotalBytesRate && + uint64(float64(srcFlowBytes)*hotRegionScheduleFactor) > s.TotalBytesRate+2*regionFlowBytes { + minFlowBytes = s.TotalBytesRate destStoreID = storeID } } else { @@ -451,12 +451,12 @@ func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesSt var hotRegionTotalCount float64 for _, m := range storesStat { - hotRegionTotalCount += float64(m.RegionsStat.Len()) + hotRegionTotalCount += float64(len(m.RegionsStat)) } avgRegionCount := hotRegionTotalCount / float64(len(storesStat)) // Multiplied by hotRegionLimitFactor to avoid transfer back and forth - limit := uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor) + limit := uint64((float64(len(srcStoreStatistics.RegionsStat)) - avgRegionCount) * hotRegionLimitFactor) return maxUint64(limit, 1) } diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 28176768838..089bf9de189 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -100,7 +100,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ BalanceType, cluster schedule.C func (s *shuffleHotRegionScheduler) randomSchedule(cluster schedule.Cluster, storeStats statistics.StoreHotRegionsStat) []*operator.Operator { for _, stats := range storeStats { - i := s.r.Intn(stats.RegionsStat.Len()) + i := s.r.Intn(len(stats.RegionsStat)) r := stats.RegionsStat[i] // select src region srcRegion := cluster.GetRegion(r.RegionID) diff --git a/server/statistics/flowkind.go b/server/statistics/flowkind.go new file mode 100644 index 00000000000..7be44d5cc90 --- /dev/null +++ b/server/statistics/flowkind.go @@ -0,0 +1,33 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +// FlowKind is a identify Flow types. +type FlowKind uint32 + +// Flags for flow. +const ( + WriteFlow FlowKind = iota + ReadFlow +) + +func (k FlowKind) String() string { + switch k { + case WriteFlow: + return "write" + case ReadFlow: + return "read" + } + return "unimplemented" +} diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index 3c2e8de7e45..5d7fd19b8ca 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -14,11 +14,8 @@ package statistics import ( - "fmt" "math/rand" - "time" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" ) @@ -27,345 +24,39 @@ import ( // only turned off by the simulator and the test. var Denoising = true -const ( - // RegionHeartBeatReportInterval is the heartbeat report interval of a region. - RegionHeartBeatReportInterval = 60 - // StoreHeartBeatReportInterval is the heartbeat report interval of a store. - StoreHeartBeatReportInterval = 10 - - statCacheMaxLen = 1000 - storeStatCacheMaxLen = 200 - hotWriteRegionMinFlowRate = 16 * 1024 - hotReadRegionMinFlowRate = 128 * 1024 - minHotRegionReportInterval = 3 - hotRegionAntiCount = 1 -) - -// FlowKind is a identify Flow types. -type FlowKind uint32 - -// Flags for flow. -const ( - WriteFlow FlowKind = iota - ReadFlow -) - -func (k FlowKind) String() string { - switch k { - case WriteFlow: - return "write" - case ReadFlow: - return "read" - } - return "unimplemented" -} - -// HotStoresStats saves the hotspot peer's statistics. -type HotStoresStats struct { - hotStoreStats map[uint64]cache.Cache // storeID -> hot regions - storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs -} - -// NewHotStoresStats creates a HotStoresStats -func NewHotStoresStats() *HotStoresStats { - return &HotStoresStats{ - hotStoreStats: make(map[uint64]cache.Cache), - storesOfRegion: make(map[uint64]map[uint64]struct{}), - } -} - -// CheckRegionFlow checks the flow information of region. -func (f *HotStoresStats) CheckRegionFlow(region *core.RegionInfo, kind FlowKind) []HotSpotPeerStatGenerator { - var ( - generators []HotSpotPeerStatGenerator - getBytesFlow func() uint64 - getKeysFlow func() uint64 - bytesPerSec uint64 - keysPerSec uint64 - reportInterval uint64 - - isExpiredInStore func(region *core.RegionInfo, storeID uint64) bool - ) - - storeIDs := make(map[uint64]struct{}) - // gets the storeIDs, including old region and new region - ids, ok := f.storesOfRegion[region.GetID()] - if ok { - for storeID := range ids { - storeIDs[storeID] = struct{}{} - } - } - - for _, peer := range region.GetPeers() { - // ReadFlow no need consider the followers. - if kind == ReadFlow && peer.GetStoreId() != region.GetLeader().GetStoreId() { - continue - } - if _, ok := storeIDs[peer.GetStoreId()]; !ok { - storeIDs[peer.GetStoreId()] = struct{}{} - } - } - - switch kind { - case WriteFlow: - getBytesFlow = region.GetBytesWritten - getKeysFlow = region.GetKeysWritten - isExpiredInStore = func(region *core.RegionInfo, storeID uint64) bool { - return region.GetStorePeer(storeID) == nil - } - case ReadFlow: - getBytesFlow = region.GetBytesRead - getKeysFlow = region.GetKeysRead - isExpiredInStore = func(region *core.RegionInfo, storeID uint64) bool { - return region.GetLeader().GetStoreId() != storeID - } - } - - reportInterval = region.GetInterval().GetEndTimestamp() - region.GetInterval().GetStartTimestamp() - - // ignores this region flow information if the report time interval is too short or too long. - if reportInterval < minHotRegionReportInterval || reportInterval > 3*RegionHeartBeatReportInterval { - for storeID := range storeIDs { - if isExpiredInStore(region, storeID) { - generator := &hotSpotPeerStatGenerator{ - Region: region, - StoreID: storeID, - Kind: kind, - Expired: true, - } - generators = append(generators, generator) - } - } - return generators - } - - bytesPerSec = uint64(float64(getBytesFlow()) / float64(reportInterval)) - keysPerSec = uint64(float64(getKeysFlow()) / float64(reportInterval)) - for storeID := range storeIDs { - var oldRegionStat *HotSpotPeerStat - - hotStoreStats, ok := f.hotStoreStats[storeID] - if ok { - if v, isExist := hotStoreStats.Peek(region.GetID()); isExist { - oldRegionStat = v.(*HotSpotPeerStat) - // This is used for the simulator. - if Denoising { - interval := time.Since(oldRegionStat.LastUpdateTime).Seconds() - if interval < minHotRegionReportInterval && !isExpiredInStore(region, storeID) { - continue - } - } - } - } - - generator := &hotSpotPeerStatGenerator{ - Region: region, - StoreID: storeID, - FlowBytes: bytesPerSec, - FlowKeys: keysPerSec, - Kind: kind, - - lastHotSpotPeerStats: oldRegionStat, - } - - if isExpiredInStore(region, storeID) { - generator.Expired = true - } - generators = append(generators, generator) - } - return generators -} - -// Update updates the items in statistics. -func (f *HotStoresStats) Update(item *HotSpotPeerStat) { - if item.IsNeedDelete() { - if hotStoreStat, ok := f.hotStoreStats[item.StoreID]; ok { - hotStoreStat.Remove(item.RegionID) - } - if index, ok := f.storesOfRegion[item.RegionID]; ok { - delete(index, item.StoreID) - } - } else { - hotStoreStat, ok := f.hotStoreStats[item.StoreID] - if !ok { - hotStoreStat = cache.NewCache(statCacheMaxLen, cache.TwoQueueCache) - f.hotStoreStats[item.StoreID] = hotStoreStat - } - hotStoreStat.Put(item.RegionID, item) - index, ok := f.storesOfRegion[item.RegionID] - if !ok { - index = make(map[uint64]struct{}) - } - index[item.StoreID] = struct{}{} - f.storesOfRegion[item.RegionID] = index - } -} - -func (f *HotStoresStats) isRegionHotWithAnyPeers(region *core.RegionInfo, hotThreshold int) bool { - for _, peer := range region.GetPeers() { - if f.isRegionHotWithPeer(region, peer, hotThreshold) { - return true - } - } - return false - -} - -func (f *HotStoresStats) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb.Peer, hotThreshold int) bool { - if peer == nil { - return false - } - storeID := peer.GetStoreId() - stats, ok := f.hotStoreStats[storeID] - if !ok { - return false - } - if stat, ok := stats.Peek(region.GetID()); ok { - return stat.(*HotSpotPeerStat).HotDegree >= hotThreshold - } - return false +// HotCache is a cache hold hot regions. +type HotCache struct { + writeFlow *hotPeerCache + readFlow *hotPeerCache } -// HotSpotPeerStatGenerator used to produce new hotspot statistics. -type HotSpotPeerStatGenerator interface { - GenHotSpotPeerStats(stats *StoresStats) *HotSpotPeerStat -} - -// hotSpotPeerStatBuilder used to produce new hotspot statistics. -type hotSpotPeerStatGenerator struct { - Region *core.RegionInfo - StoreID uint64 - FlowKeys uint64 - FlowBytes uint64 - Expired bool - Kind FlowKind - - lastHotSpotPeerStats *HotSpotPeerStat -} - -const rollingWindowsSize = 5 - -// GenHotSpotPeerStats implements HotSpotPeerStatsGenerator. -func (flowStats *hotSpotPeerStatGenerator) GenHotSpotPeerStats(stats *StoresStats) *HotSpotPeerStat { - var hotRegionThreshold uint64 - switch flowStats.Kind { - case WriteFlow: - hotRegionThreshold = calculateWriteHotThresholdWithStore(stats, flowStats.StoreID) - case ReadFlow: - hotRegionThreshold = calculateReadHotThresholdWithStore(stats, flowStats.StoreID) - } - flowBytes := flowStats.FlowBytes - oldItem := flowStats.lastHotSpotPeerStats - region := flowStats.Region - newItem := &HotSpotPeerStat{ - RegionID: region.GetID(), - FlowBytes: flowStats.FlowBytes, - FlowKeys: flowStats.FlowKeys, - LastUpdateTime: time.Now(), - StoreID: flowStats.StoreID, - Version: region.GetMeta().GetRegionEpoch().GetVersion(), - AntiCount: hotRegionAntiCount, - Kind: flowStats.Kind, - needDelete: flowStats.Expired, - } - - if region.GetLeader().GetStoreId() == flowStats.StoreID { - newItem.isLeader = true - } - - if newItem.IsNeedDelete() { - return newItem - } - - if oldItem != nil { - newItem.HotDegree = oldItem.HotDegree + 1 - newItem.Stats = oldItem.Stats - } - - if flowBytes >= hotRegionThreshold { - if oldItem == nil { - newItem.Stats = NewRollingStats(rollingWindowsSize) - } - newItem.isNew = true - newItem.Stats.Add(float64(flowBytes)) - return newItem - } - - // smaller than hotRegionThreshold - if oldItem == nil { - return nil - } - if oldItem.AntiCount <= 0 { - newItem.needDelete = true - return newItem - } - // eliminate some noise - newItem.HotDegree = oldItem.HotDegree - 1 - newItem.AntiCount = oldItem.AntiCount - 1 - newItem.Stats.Add(float64(flowBytes)) - return newItem -} - -// HotSpotCache is a cache hold hot regions. -type HotSpotCache struct { - writeFlow *HotStoresStats - readFlow *HotStoresStats -} - -// NewHotSpotCache creates a new hot spot cache. -func NewHotSpotCache() *HotSpotCache { - return &HotSpotCache{ - writeFlow: NewHotStoresStats(), - readFlow: NewHotStoresStats(), +// NewHotCache creates a new hot spot cache. +func NewHotCache() *HotCache { + return &HotCache{ + writeFlow: NewHotStoresStats(WriteFlow), + readFlow: NewHotStoresStats(ReadFlow), } } // CheckWrite checks the write status, returns update items. -func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stats *StoresStats) []*HotSpotPeerStat { - var updateItems []*HotSpotPeerStat - hotStatGenerators := w.writeFlow.CheckRegionFlow(region, WriteFlow) - for _, hotGen := range hotStatGenerators { - item := hotGen.GenHotSpotPeerStats(stats) - if item != nil { - updateItems = append(updateItems, item) - } - } - return updateItems +func (w *HotCache) CheckWrite(region *core.RegionInfo, stats *StoresStats) []*HotPeerStat { + return w.writeFlow.CheckRegionFlow(region, stats) } // CheckRead checks the read status, returns update items. -func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stats *StoresStats) []*HotSpotPeerStat { - var updateItems []*HotSpotPeerStat - hotStatGenerators := w.readFlow.CheckRegionFlow(region, ReadFlow) - for _, hotGen := range hotStatGenerators { - item := hotGen.GenHotSpotPeerStats(stats) - if item != nil { - updateItems = append(updateItems, item) - } - } - return updateItems -} - -func (w *HotSpotCache) incMetrics(name string, storeID uint64, kind FlowKind) { - storeTag := fmt.Sprintf("store-%d", storeID) - switch kind { - case WriteFlow: - hotCacheStatusGauge.WithLabelValues(name, storeTag, "write").Inc() - case ReadFlow: - hotCacheStatusGauge.WithLabelValues(name, storeTag, "read").Inc() - } +func (w *HotCache) CheckRead(region *core.RegionInfo, stats *StoresStats) []*HotPeerStat { + return w.readFlow.CheckRegionFlow(region, stats) } // Update updates the cache. -func (w *HotSpotCache) Update(item *HotSpotPeerStat) { - var stats *HotStoresStats +func (w *HotCache) Update(item *HotPeerStat) { switch item.Kind { case WriteFlow: - stats = w.writeFlow + w.writeFlow.Update(item) case ReadFlow: - stats = w.readFlow + w.readFlow.Update(item) } - stats.Update(item) + if item.IsNeedDelete() { w.incMetrics("remove_item", item.StoreID, item.Kind) } else if item.IsNew() { @@ -376,124 +67,57 @@ func (w *HotSpotCache) Update(item *HotSpotPeerStat) { } // RegionStats returns hot items according to kind -func (w *HotSpotCache) RegionStats(kind FlowKind) map[uint64][]*HotSpotPeerStat { - var flowMap map[uint64]cache.Cache +func (w *HotCache) RegionStats(kind FlowKind) map[uint64][]*HotPeerStat { + var peersOfStore map[uint64]cache.Cache switch kind { case WriteFlow: - flowMap = w.writeFlow.hotStoreStats + peersOfStore = w.writeFlow.peersOfStore case ReadFlow: - flowMap = w.readFlow.hotStoreStats + peersOfStore = w.readFlow.peersOfStore } - res := make(map[uint64][]*HotSpotPeerStat) - for storeID, elements := range flowMap { - values := elements.Elems() - stat, ok := res[storeID] - if !ok { - stat = make([]*HotSpotPeerStat, len(values)) - res[storeID] = stat - } + + res := make(map[uint64][]*HotPeerStat) + for storeID, peers := range peersOfStore { + values := peers.Elems() + stat := make([]*HotPeerStat, len(values)) + res[storeID] = stat for i := range values { - stat[i] = values[i].Value.(*HotSpotPeerStat) + stat[i] = values[i].Value.(*HotPeerStat) } } return res } // RandHotRegionFromStore random picks a hot region in specify store. -func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hotThreshold int) *HotSpotPeerStat { - stats, ok := w.RegionStats(kind)[storeID] - if !ok { - return nil - } - for _, i := range rand.Perm(len(stats)) { - if stats[i].HotDegree >= hotThreshold { - return stats[i] +func (w *HotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hotDegree int) *HotPeerStat { + if stats, ok := w.RegionStats(kind)[storeID]; ok { + for _, i := range rand.Perm(len(stats)) { + if stats[i].HotDegree >= hotDegree { + return stats[i] + } } } return nil } -// CollectMetrics collect the hot cache metrics -func (w *HotSpotCache) CollectMetrics(stats *StoresStats) { - for storeID, flowStats := range w.writeFlow.hotStoreStats { - storeTag := fmt.Sprintf("store-%d", storeID) - threshold := calculateWriteHotThresholdWithStore(stats, storeID) - hotCacheStatusGauge.WithLabelValues("total_length", storeTag, "write").Set(float64(flowStats.Len())) - hotCacheStatusGauge.WithLabelValues("hotThreshold", storeTag, "write").Set(float64(threshold)) - } - - for storeID, flowStats := range w.readFlow.hotStoreStats { - storeTag := fmt.Sprintf("store-%d", storeID) - threshold := calculateReadHotThresholdWithStore(stats, storeID) - hotCacheStatusGauge.WithLabelValues("total_length", storeTag, "read").Set(float64(flowStats.Len())) - hotCacheStatusGauge.WithLabelValues("hotThreshold", storeTag, "read").Set(float64(threshold)) - } -} - // IsRegionHot checks if the region is hot. -func (w *HotSpotCache) IsRegionHot(region *core.RegionInfo, hotThreshold int) bool { - stats := w.writeFlow - if stats.isRegionHotWithAnyPeers(region, hotThreshold) { - return true - } - stats = w.readFlow - return stats.isRegionHotWithPeer(region, region.GetLeader(), hotThreshold) -} - -// Utils -func calculateWriteHotThreshold(stats *StoresStats) uint64 { - // hotRegionThreshold is used to pick hot region - // suppose the number of the hot Regions is statCacheMaxLen - // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions - // divide 2 because the store reports data about two times than the region record write to rocksdb - divisor := float64(statCacheMaxLen) * 2 - hotRegionThreshold := uint64(stats.TotalBytesWriteRate() / divisor) - - if hotRegionThreshold < hotWriteRegionMinFlowRate { - hotRegionThreshold = hotWriteRegionMinFlowRate - } - return hotRegionThreshold -} - -func calculateWriteHotThresholdWithStore(stats *StoresStats, storeID uint64) uint64 { - writeBytes, _ := stats.GetStoreBytesRate(storeID) - divisor := float64(storeStatCacheMaxLen) * 2 - hotRegionThreshold := uint64(float64(writeBytes) / divisor) - - if hotRegionThreshold < hotWriteRegionMinFlowRate { - hotRegionThreshold = hotWriteRegionMinFlowRate - } - return hotRegionThreshold +func (w *HotCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool { + return w.writeFlow.IsRegionHot(region, hotDegree) || + w.readFlow.IsRegionHot(region, hotDegree) } -func calculateReadHotThresholdWithStore(stats *StoresStats, storeID uint64) uint64 { - _, readBytes := stats.GetStoreBytesRate(storeID) - divisor := float64(storeStatCacheMaxLen) * 2 - hotRegionThreshold := uint64(float64(readBytes) / divisor) - - if hotRegionThreshold < hotReadRegionMinFlowRate { - hotRegionThreshold = hotReadRegionMinFlowRate - } - return hotRegionThreshold +// CollectMetrics collect the hot cache metrics +func (w *HotCache) CollectMetrics(stats *StoresStats) { + w.writeFlow.CollectMetrics(stats, "write") + w.readFlow.CollectMetrics(stats, "read") } -func calculateReadHotThreshold(stats *StoresStats) uint64 { - // hotRegionThreshold is used to pick hot region - // suppose the number of the hot Regions is statCacheMaxLen - // and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions - divisor := float64(statCacheMaxLen) - hotRegionThreshold := uint64(stats.TotalBytesReadRate() / divisor) - - if hotRegionThreshold < hotReadRegionMinFlowRate { - hotRegionThreshold = hotReadRegionMinFlowRate +func (w *HotCache) incMetrics(name string, storeID uint64, kind FlowKind) { + store := storeTag(storeID) + switch kind { + case WriteFlow: + hotCacheStatusGauge.WithLabelValues(name, store, "write").Inc() + case ReadFlow: + hotCacheStatusGauge.WithLabelValues(name, store, "read").Inc() } - return hotRegionThreshold -} - -// RegionStatInformer provides access to a shared informer of statistics. -type RegionStatInformer interface { - IsRegionHot(region *core.RegionInfo) bool - RegionWriteStats() map[uint64][]*HotSpotPeerStat - RegionReadStats() map[uint64][]*HotSpotPeerStat - RandHotRegionFromStore(store uint64, kind FlowKind) *core.RegionInfo } diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go new file mode 100644 index 00000000000..83d6ebb64f3 --- /dev/null +++ b/server/statistics/hot_peer.go @@ -0,0 +1,57 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import "time" + +// HotPeerStat records each hot peer's statistics +type HotPeerStat struct { + StoreID uint64 `json:"store_id"` + RegionID uint64 `json:"region_id"` + + // HotDegree records the hot region update times + HotDegree int `json:"hot_degree"` + // AntiCount used to eliminate some noise when remove region in cache + AntiCount int + + Kind FlowKind `json:"kind"` + BytesRate uint64 `json:"flow_bytes"` + KeysRate uint64 `json:"flow_keys"` + // RollingBytesRate is a rolling statistics, recording some recently added records. + RollingBytesRate *RollingStats + + // LastUpdateTime used to calculate average write + LastUpdateTime time.Time `json:"last_update_time"` + // Version used to check the region split times + Version uint64 + + needDelete bool + isLeader bool + isNew bool +} + +// IsNeedDelete to delete the item in cache. +func (stat HotPeerStat) IsNeedDelete() bool { + return stat.needDelete +} + +// IsLeader indicaes the item belong to the leader. +func (stat HotPeerStat) IsLeader() bool { + return stat.isLeader +} + +// IsNew indicaes the item is first update in the cache of the region. +func (stat HotPeerStat) IsNew() bool { + return stat.isNew +} diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go new file mode 100644 index 00000000000..0cbfa0b7830 --- /dev/null +++ b/server/statistics/hot_peer_cache.go @@ -0,0 +1,296 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/pkg/cache" + "github.com/pingcap/pd/server/core" +) + +const ( + cacheMaxLen = 1000 + hotPeerMaxCount = 400 + + rollingWindowsSize = 5 + + hotWriteRegionMinBytesRate = 16 * 1024 + hotReadRegionMinBytesRate = 128 * 1024 + + hotRegionReportMinInterval = 3 + + hotRegionAntiCount = 1 +) + +// hotPeerCache saves the hotspot peer's statistics. +type hotPeerCache struct { + kind FlowKind + peersOfStore map[uint64]cache.Cache // storeID -> hot peers + storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs +} + +// NewHotStoresStats creates a HotStoresStats +func NewHotStoresStats(kind FlowKind) *hotPeerCache { + return &hotPeerCache{ + kind: kind, + peersOfStore: make(map[uint64]cache.Cache), + storesOfRegion: make(map[uint64]map[uint64]struct{}), + } +} + +// Update updates the items in statistics. +func (f *hotPeerCache) Update(item *HotPeerStat) { + if item.IsNeedDelete() { + if peers, ok := f.peersOfStore[item.StoreID]; ok { + peers.Remove(item.RegionID) + } + + if stores, ok := f.storesOfRegion[item.RegionID]; ok { + delete(stores, item.StoreID) + } + } else { + peers, ok := f.peersOfStore[item.StoreID] + if !ok { + peers = cache.NewCache(cacheMaxLen, cache.TwoQueueCache) + f.peersOfStore[item.StoreID] = peers + } + peers.Put(item.RegionID, item) + + stores, ok := f.storesOfRegion[item.RegionID] + if !ok { + stores = make(map[uint64]struct{}) + f.storesOfRegion[item.RegionID] = stores + } + stores[item.StoreID] = struct{}{} + } +} + +// CheckRegionFlow checks the flow information of region. +func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresStats) (ret []*HotPeerStat) { + storeIDs := f.getAllStoreIDs(region) + + bytesFlow := f.getBytesFlow(region) + keysFlow := f.getKeysFlow(region) + + bytesPerSecInit := uint64(float64(bytesFlow) / float64(RegionHeartBeatReportInterval)) + keysPerSecInit := uint64(float64(keysFlow) / float64(RegionHeartBeatReportInterval)) + + for storeID := range storeIDs { + bytesPerSec := bytesPerSecInit + keysPerSec := keysPerSecInit + isExpired := f.isRegionExpired(region, storeID) + oldItem := f.getOldHotPeerStat(region.GetID(), storeID) + + // This is used for the simulator. + if oldItem != nil && Denoising { + interval := time.Since(oldItem.LastUpdateTime).Seconds() + // ignore if report too fast + if interval < hotRegionReportMinInterval && !isExpired { + continue + } + bytesPerSec = uint64(float64(bytesFlow) / interval) + keysPerSec = uint64(float64(keysFlow) / interval) + } + + newItem := &HotPeerStat{ + StoreID: storeID, + RegionID: region.GetID(), + Kind: f.kind, + BytesRate: bytesPerSec, + KeysRate: keysPerSec, + LastUpdateTime: time.Now(), + Version: region.GetMeta().GetRegionEpoch().GetVersion(), + needDelete: isExpired, + isLeader: region.GetLeader().GetStoreId() == storeID, + } + + hotThreshold := f.calcHotThreshold(stats, storeID) + newItem = updateHotPeerStat(newItem, oldItem, bytesPerSec, hotThreshold) + if newItem != nil { + ret = append(ret, newItem) + } + } + + return ret +} + +func (f *hotPeerCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool { + switch f.kind { + case WriteFlow: + return f.isRegionHotWithAnyPeers(region, hotDegree) + case ReadFlow: + return f.isRegionHotWithPeer(region, region.GetLeader(), hotDegree) + } + return false +} + +func (f *hotPeerCache) CollectMetrics(stats *StoresStats, typ string) { + for storeID, peers := range f.peersOfStore { + store := storeTag(storeID) + threshold := f.calcHotThreshold(stats, storeID) + hotCacheStatusGauge.WithLabelValues("total_length", store, typ).Set(float64(peers.Len())) + hotCacheStatusGauge.WithLabelValues("hotThreshold", store, typ).Set(float64(threshold)) + } +} + +func (f *hotPeerCache) getBytesFlow(region *core.RegionInfo) uint64 { + switch f.kind { + case WriteFlow: + return region.GetBytesWritten() + case ReadFlow: + return region.GetBytesRead() + } + return 0 +} + +func (f *hotPeerCache) getKeysFlow(region *core.RegionInfo) uint64 { + switch f.kind { + case WriteFlow: + return region.GetKeysWritten() + case ReadFlow: + return region.GetKeysRead() + } + return 0 +} + +func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat { + if hotPeers, ok := f.peersOfStore[storeID]; ok { + if v, ok := hotPeers.Peek(regionID); ok { + return v.(*HotPeerStat) + } + } + return nil +} + +func (f *hotPeerCache) isRegionExpired(region *core.RegionInfo, storeID uint64) bool { + switch f.kind { + case WriteFlow: + return region.GetStorePeer(storeID) == nil + case ReadFlow: + return region.GetLeader().GetStoreId() != storeID + } + return false +} + +func (f *hotPeerCache) calcHotThreshold(stats *StoresStats, storeID uint64) uint64 { + switch f.kind { + case WriteFlow: + return calculateWriteHotThresholdWithStore(stats, storeID) + case ReadFlow: + return calculateReadHotThresholdWithStore(stats, storeID) + } + return 0 +} + +// gets the storeIDs, including old region and new region +func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) map[uint64]struct{} { + storeIDs := make(map[uint64]struct{}) + // old stores + ids, ok := f.storesOfRegion[region.GetID()] + if ok { + for storeID := range ids { + storeIDs[storeID] = struct{}{} + } + } + + // new stores + for _, peer := range region.GetPeers() { + // ReadFlow no need consider the followers. + if f.kind == ReadFlow && peer.GetStoreId() != region.GetLeader().GetStoreId() { + continue + } + if _, ok := storeIDs[peer.GetStoreId()]; !ok { + storeIDs[peer.GetStoreId()] = struct{}{} + } + } + + return storeIDs +} + +func (f *hotPeerCache) isRegionHotWithAnyPeers(region *core.RegionInfo, hotDegree int) bool { + for _, peer := range region.GetPeers() { + if f.isRegionHotWithPeer(region, peer, hotDegree) { + return true + } + } + return false +} + +func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb.Peer, hotDegree int) bool { + if peer == nil { + return false + } + storeID := peer.GetStoreId() + if peers, ok := f.peersOfStore[storeID]; ok { + if stat, ok := peers.Peek(region.GetID()); ok { + return stat.(*HotPeerStat).HotDegree >= hotDegree + } + } + return false +} + +func updateHotPeerStat(newItem, oldItem *HotPeerStat, bytesRate uint64, hotThreshold uint64) *HotPeerStat { + isHot := bytesRate >= hotThreshold + if newItem.needDelete { + return newItem + } + if oldItem != nil { + newItem.RollingBytesRate = oldItem.RollingBytesRate + if isHot { + newItem.HotDegree = oldItem.HotDegree + 1 + newItem.AntiCount = hotRegionAntiCount + } else { + newItem.HotDegree = oldItem.HotDegree - 1 + newItem.AntiCount = oldItem.AntiCount - 1 + if newItem.AntiCount < 0 { + newItem.needDelete = true + } + } + } else { + if !isHot { + return nil + } + newItem.RollingBytesRate = NewRollingStats(rollingWindowsSize) + newItem.AntiCount = hotRegionAntiCount + newItem.isNew = true + } + newItem.RollingBytesRate.Add(float64(bytesRate)) + + return newItem +} + +// Utils +func calculateWriteHotThresholdWithStore(stats *StoresStats, storeID uint64) uint64 { + writeBytes, _ := stats.GetStoreBytesRate(storeID) + divisor := float64(hotPeerMaxCount) + hotRegionThreshold := uint64(float64(writeBytes) / divisor) + + if hotRegionThreshold < hotWriteRegionMinBytesRate { + hotRegionThreshold = hotWriteRegionMinBytesRate + } + return hotRegionThreshold +} + +func calculateReadHotThresholdWithStore(stats *StoresStats, storeID uint64) uint64 { + _, readBytes := stats.GetStoreBytesRate(storeID) + divisor := float64(hotPeerMaxCount) + hotRegionThreshold := uint64(float64(readBytes) / divisor) + + if hotRegionThreshold < hotReadRegionMinBytesRate { + hotRegionThreshold = hotReadRegionMinBytesRate + } + return hotRegionThreshold +} diff --git a/server/statistics/hot_regions_stat.go b/server/statistics/hot_regions_stat.go new file mode 100644 index 00000000000..3a93bd0de62 --- /dev/null +++ b/server/statistics/hot_regions_stat.go @@ -0,0 +1,21 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +// HotRegionsStat records all hot regions statistics +type HotRegionsStat struct { + TotalBytesRate uint64 `json:"total_flow_bytes"` + RegionsCount int `json:"regions_count"` + RegionsStat []HotPeerStat `json:"statistics"` +} diff --git a/server/statistics/region.go b/server/statistics/region.go index d27af1167ca..5dcd9e39fa8 100644 --- a/server/statistics/region.go +++ b/server/statistics/region.go @@ -14,63 +14,13 @@ package statistics import ( - "time" - "github.com/pingcap/pd/server/core" ) -// HotSpotPeerStat records each hot region's statistics -type HotSpotPeerStat struct { - RegionID uint64 `json:"region_id"` - FlowBytes uint64 `json:"flow_bytes"` - FlowKeys uint64 `json:"flow_keys"` - // HotDegree records the hot region update times - HotDegree int `json:"hot_degree"` - // LastUpdateTime used to calculate average write - LastUpdateTime time.Time `json:"last_update_time"` - // StoreID is the store id of the region peer - StoreID uint64 `json:"store_id"` - Kind FlowKind `json:"kind"` - // AntiCount used to eliminate some noise when remove region in cache - AntiCount int - // Version used to check the region split times - Version uint64 - // Stats is a rolling statistics, recording some recently added records. - Stats *RollingStats - - needDelete bool - isLeader bool - isNew bool -} - -// IsNeedDelete to delete the item in cache. -func (stat HotSpotPeerStat) IsNeedDelete() bool { - return stat.needDelete -} - -// IsLeader indicaes the item belong to the leader. -func (stat HotSpotPeerStat) IsLeader() bool { - return stat.isLeader -} - -// IsNew indicaes the item is first update in the cache of the region. -func (stat HotSpotPeerStat) IsNew() bool { - return stat.isNew -} - -// RegionsStat is a list of a group region state type -type RegionsStat []HotSpotPeerStat - -func (m RegionsStat) Len() int { return len(m) } -func (m RegionsStat) Swap(i, j int) { m[i], m[j] = m[j], m[i] } -func (m RegionsStat) Less(i, j int) bool { return m[i].FlowBytes < m[j].FlowBytes } - -// HotRegionsStat records all hot regions statistics -type HotRegionsStat struct { - TotalFlowBytes uint64 `json:"total_flow_bytes"` - RegionsCount int `json:"regions_count"` - RegionsStat RegionsStat `json:"statistics"` -} +const ( + // RegionHeartBeatReportInterval is the heartbeat report interval of a region. + RegionHeartBeatReportInterval = 60 +) // RegionStats records a list of regions' statistics and distribution status. type RegionStats struct { @@ -86,6 +36,15 @@ type RegionStats struct { StorePeerKeys map[uint64]int64 `json:"store_peer_keys"` } +// GetRegionStats sums regions' statistics. +func GetRegionStats(regions []*core.RegionInfo) *RegionStats { + stats := newRegionStats() + for _, region := range regions { + stats.Observe(region) + } + return stats +} + func newRegionStats() *RegionStats { return &RegionStats{ StoreLeaderCount: make(map[uint64]int), @@ -122,12 +81,3 @@ func (s *RegionStats) Observe(r *core.RegionInfo) { s.StorePeerKeys[storeID] += approximateKeys } } - -// GetRegionStats sums regions' statistics. -func GetRegionStats(regions []*core.RegionInfo) *RegionStats { - stats := newRegionStats() - for _, region := range regions { - stats.Observe(region) - } - return stats -} diff --git a/server/statistics/region_stat_informer.go b/server/statistics/region_stat_informer.go new file mode 100644 index 00000000000..efe490d886a --- /dev/null +++ b/server/statistics/region_stat_informer.go @@ -0,0 +1,24 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import "github.com/pingcap/pd/server/core" + +// RegionStatInformer provides access to a shared informer of statistics. +type RegionStatInformer interface { + IsRegionHot(region *core.RegionInfo) bool + RegionWriteStats() map[uint64][]*HotPeerStat + RegionReadStats() map[uint64][]*HotPeerStat + RandHotRegionFromStore(store uint64, kind FlowKind) *core.RegionInfo +} diff --git a/server/statistics/schedule_options.go b/server/statistics/schedule_options.go new file mode 100644 index 00000000000..9d0ab700176 --- /dev/null +++ b/server/statistics/schedule_options.go @@ -0,0 +1,47 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import "time" + +// ScheduleOptions is an interface to access configurations. +// TODO: merge the Options to schedule.Options +type ScheduleOptions interface { + GetLocationLabels() []string + + GetLowSpaceRatio() float64 + GetHighSpaceRatio() float64 + GetTolerantSizeRatio() float64 + GetStoreBalanceRate() float64 + + GetSchedulerMaxWaitingOperator() uint64 + GetLeaderScheduleLimit(name string) uint64 + GetRegionScheduleLimit(name string) uint64 + GetReplicaScheduleLimit(name string) uint64 + GetMergeScheduleLimit(name string) uint64 + GetHotRegionScheduleLimit(name string) uint64 + GetMaxReplicas(name string) int + GetHotRegionCacheHitsThreshold() int + GetMaxSnapshotCount() uint64 + GetMaxPendingPeerCount() uint64 + GetMaxMergeRegionSize() uint64 + GetMaxMergeRegionKeys() uint64 + + IsMakeUpReplicaEnabled() bool + IsRemoveExtraReplicaEnabled() bool + IsRemoveDownReplicaEnabled() bool + IsReplaceOfflineReplicaEnabled() bool + + GetMaxStoreDownTime() time.Duration +} diff --git a/server/statistics/store.go b/server/statistics/store.go index 7f3dfad61d2..217920894db 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -20,6 +20,11 @@ import ( "github.com/pingcap/pd/server/core" ) +const ( + // StoreHeartBeatReportInterval is the heartbeat report interval of a store. + StoreHeartBeatReportInterval = 10 +) + // StoresStats is a cache hold hot regions. type StoresStats struct { sync.RWMutex @@ -148,15 +153,6 @@ func (s *StoresStats) GetStoresKeysReadStat() map[uint64]uint64 { return res } -// StoreHotRegionInfos : used to get human readable description for hot regions. -type StoreHotRegionInfos struct { - AsPeer StoreHotRegionsStat `json:"as_peer"` - AsLeader StoreHotRegionsStat `json:"as_leader"` -} - -// StoreHotRegionsStat used to record the hot region statistics group by store -type StoreHotRegionsStat map[uint64]*HotRegionsStat - // RollingStoreStats are multiple sets of recent historical records with specified windows size. type RollingStoreStats struct { sync.RWMutex diff --git a/server/statistics/store_collection.go b/server/statistics/store_collection.go index c0dffcbc036..80225ae37c8 100644 --- a/server/statistics/store_collection.go +++ b/server/statistics/store_collection.go @@ -16,7 +16,6 @@ package statistics import ( "fmt" "strconv" - "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" @@ -28,38 +27,6 @@ const ( labelType = "label" ) -// ScheduleOptions is an interface to access configurations. -// TODO: merge the Options to schedule.Options -type ScheduleOptions interface { - GetLocationLabels() []string - - GetLowSpaceRatio() float64 - GetHighSpaceRatio() float64 - GetTolerantSizeRatio() float64 - GetStoreBalanceRate() float64 - - GetSchedulerMaxWaitingOperator() uint64 - GetLeaderScheduleLimit(name string) uint64 - GetRegionScheduleLimit(name string) uint64 - GetReplicaScheduleLimit(name string) uint64 - GetMergeScheduleLimit(name string) uint64 - GetHotRegionScheduleLimit(name string) uint64 - GetMaxReplicas(name string) int - GetHotRegionCacheHitsThreshold() int - GetMaxSnapshotCount() uint64 - GetMaxPendingPeerCount() uint64 - GetMaxMergeRegionSize() uint64 - GetMaxMergeRegionKeys() uint64 - - IsMakeUpReplicaEnabled() bool - IsRemoveExtraReplicaEnabled() bool - IsRemoveDownReplicaEnabled() bool - IsReplaceOfflineReplicaEnabled() bool - GetLeaderScheduleKind() core.LeaderScheduleKind - - GetMaxStoreDownTime() time.Duration -} - type storeStatistics struct { opt ScheduleOptions namespace string diff --git a/server/statistics/store_hot_region_infos.go b/server/statistics/store_hot_region_infos.go new file mode 100644 index 00000000000..a5f22a2ce35 --- /dev/null +++ b/server/statistics/store_hot_region_infos.go @@ -0,0 +1,23 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +// StoreHotRegionInfos : used to get human readable description for hot regions. +type StoreHotRegionInfos struct { + AsPeer StoreHotRegionsStat `json:"as_peer"` + AsLeader StoreHotRegionsStat `json:"as_leader"` +} + +// StoreHotRegionsStat used to record the hot region statistics group by store +type StoreHotRegionsStat map[uint64]*HotRegionsStat diff --git a/server/statistics/util.go b/server/statistics/util.go index d4aa1ced880..07010c4f60c 100644 --- a/server/statistics/util.go +++ b/server/statistics/util.go @@ -14,9 +14,15 @@ package statistics import ( + "fmt" + "github.com/montanaflynn/stats" ) +func storeTag(id uint64) string { + return fmt.Sprintf("store-%d", id) +} + // RollingStats provides rolling statistics with specified window size. // There are window size records for calculating. type RollingStats struct {