From dadede47de392048e74702e749cc979849f86050 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 11 Jun 2019 11:59:04 +0800 Subject: [PATCH] *: split statistics from core package (#1565) * split statistics from core package Signed-off-by: Ryan Leung --- server/api/stats_test.go | 9 +- server/api/trend.go | 6 +- server/cluster.go | 12 +- server/cluster_info.go | 36 ++-- server/cluster_info_test.go | 14 -- server/coordinator.go | 9 +- server/core/region.go | 120 ++--------- server/core/store.go | 195 +++--------------- server/handler.go | 5 +- server/mock/cluster.go | 10 +- server/namespace_cluster.go | 5 +- server/schedule/scheduler.go | 4 +- server/schedulers/hot_region.go | 47 ++--- server/schedulers/shuffle_hot_region.go | 3 +- server/statistics/hot_cache.go | 50 ++--- server/statistics/region.go | 125 +++++++++++ server/statistics/store.go | 186 +++++++++++++++++ .../statistics.go => statistics/util.go} | 2 +- .../util_test.go} | 8 +- server/store_statistics.go | 9 +- server/store_statistics_test.go | 5 +- tests/pdctl/hot/hot_test.go | 5 +- 22 files changed, 482 insertions(+), 383 deletions(-) create mode 100644 server/statistics/region.go create mode 100644 server/statistics/store.go rename server/{core/statistics.go => statistics/util.go} (98%) rename server/{core/statistics_test.go => statistics/util_test.go} (92%) diff --git a/server/api/stats_test.go b/server/api/stats_test.go index c59717b0ffb..f4fdd3d963d 100644 --- a/server/api/stats_test.go +++ b/server/api/stats_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/pd/pkg/apiutil" "github.com/pingcap/pd/server" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/statistics" ) var _ = Suite(&testStatsSuite{}) @@ -127,7 +128,7 @@ func (s *testStatsSuite) TestRegionStats(c *C) { // 3 ["t", "x") 1 1 F L // 4 ["x", "") 50 20 L - statsAll := &core.RegionStats{ + statsAll := &statistics.RegionStats{ Count: 4, EmptyCount: 1, StorageSize: 351, @@ -141,12 +142,12 @@ func (s *testStatsSuite) TestRegionStats(c *C) { } res, err := http.Get(statsURL) c.Assert(err, IsNil) - stats := &core.RegionStats{} + stats := &statistics.RegionStats{} err = apiutil.ReadJSON(res.Body, stats) c.Assert(err, IsNil) c.Assert(stats, DeepEquals, statsAll) - stats23 := &core.RegionStats{ + stats23 := &statistics.RegionStats{ Count: 2, EmptyCount: 1, StorageSize: 201, @@ -161,7 +162,7 @@ func (s *testStatsSuite) TestRegionStats(c *C) { args := fmt.Sprintf("?start_key=%s&end_key=%s", url.QueryEscape("\x01\x02"), url.QueryEscape("xyz\x00\x00")) res, err = http.Get(statsURL + args) c.Assert(err, IsNil) - stats = &core.RegionStats{} + stats = &statistics.RegionStats{} err = apiutil.ReadJSON(res.Body, stats) c.Assert(err, IsNil) c.Assert(stats, DeepEquals, stats23) diff --git a/server/api/trend.go b/server/api/trend.go index 1939484346e..f98a4892bec 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/pd/pkg/typeutil" "github.com/pingcap/pd/server" - "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/statistics" "github.com/unrolled/render" ) @@ -106,7 +106,7 @@ func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) { } func (h *trendHandler) getTrendStores() ([]trendStore, error) { - var readStats, writeStats core.StoreHotRegionsStat + var readStats, writeStats statistics.StoreHotRegionsStat if hotRead := h.GetHotReadRegions(); hotRead != nil { readStats = hotRead.AsLeader } @@ -140,7 +140,7 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { return trendStores, nil } -func (h *trendHandler) getStoreFlow(stats core.StoreHotRegionsStat, storeID uint64) (storeFlow uint64, regionFlows []uint64) { +func (h *trendHandler) getStoreFlow(stats statistics.StoreHotRegionsStat, storeID uint64) (storeFlow uint64, regionFlows []uint64) { if stats == nil { return } diff --git a/server/cluster.go b/server/cluster.go index c0b02d82380..3f3339380e8 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" syncer "github.com/pingcap/pd/server/region_syncer" + "github.com/pingcap/pd/server/statistics" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -326,12 +327,19 @@ func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo { } // GetRegionStats returns region statistics from cluster. -func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *core.RegionStats { +func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.RegionStats { c.RLock() defer c.RUnlock() return c.cachedCluster.getRegionStats(startKey, endKey) } +// GetStoresStats returns stores' statistics from cluster. +func (c *RaftCluster) GetStoresStats() *statistics.StoresStats { + c.RLock() + defer c.RUnlock() + return c.cachedCluster.storesStats +} + // DropCacheRegion removes a region from the cache. func (c *RaftCluster) DropCacheRegion(id uint64) { c.RLock() @@ -652,7 +660,7 @@ func (c *RaftCluster) collectMetrics() { cluster := c.cachedCluster statsMap := newStoreStatisticsMap(c.cachedCluster.opt, c.GetNamespaceClassifier()) for _, s := range cluster.GetStores() { - statsMap.Observe(s) + statsMap.Observe(s, cluster.storesStats) } statsMap.Collect() diff --git a/server/cluster_info.go b/server/cluster_info.go index a52fa1c9e08..74f900acbb7 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -37,6 +37,7 @@ type clusterInfo struct { opt *scheduleOption regionStats *regionStatistics labelLevelStats *labelLevelStatistics + storesStats *statistics.StoresStats prepareChecker *prepareChecker changedRegions chan *core.RegionInfo hotSpotCache *statistics.HotSpotCache @@ -51,6 +52,7 @@ func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clus opt: opt, kv: kv, labelLevelStats: newLabelLevelStatistics(), + storesStats: statistics.NewStoresStats(), prepareChecker: newPrepareChecker(), changedRegions: make(chan *core.RegionInfo, defaultChangedRegionsLimit), hotSpotCache: statistics.NewHotSpotCache(), @@ -87,7 +89,9 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl zap.Int("count", c.core.Regions.GetRegionCount()), zap.Duration("cost", time.Since(start)), ) - + for _, store := range c.core.Stores.GetStores() { + c.storesStats.CreateRollingStoreStats(store.GetID()) + } return c, nil } @@ -201,6 +205,7 @@ func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error { } } c.core.PutStore(store) + c.storesStats.CreateRollingStoreStats(store.GetID()) return nil } @@ -217,6 +222,7 @@ func (c *clusterInfo) deleteStoreLocked(store *core.StoreInfo) error { } } c.core.DeleteStore(store) + c.storesStats.RemoveRollingStoreStats(store.GetID()) return nil } @@ -270,25 +276,25 @@ func (c *clusterInfo) getStoreCount() int { func (c *clusterInfo) getStoresBytesWriteStat() map[uint64]uint64 { c.RLock() defer c.RUnlock() - return c.core.Stores.GetStoresBytesWriteStat() + return c.storesStats.GetStoresBytesWriteStat() } func (c *clusterInfo) getStoresBytesReadStat() map[uint64]uint64 { c.RLock() defer c.RUnlock() - return c.core.Stores.GetStoresBytesReadStat() + return c.storesStats.GetStoresBytesReadStat() } func (c *clusterInfo) getStoresKeysWriteStat() map[uint64]uint64 { c.RLock() defer c.RUnlock() - return c.core.Stores.GetStoresKeysWriteStat() + return c.storesStats.GetStoresKeysWriteStat() } func (c *clusterInfo) getStoresKeysReadStat() map[uint64]uint64 { c.RLock() defer c.RUnlock() - return c.core.Stores.GetStoresKeysReadStat() + return c.storesStats.GetStoresKeysReadStat() } // ScanRegions scans region with start key, until number greater than limit. @@ -382,10 +388,10 @@ func (c *clusterInfo) getRegionCount() int { return c.core.Regions.GetRegionCount() } -func (c *clusterInfo) getRegionStats(startKey, endKey []byte) *core.RegionStats { +func (c *clusterInfo) getRegionStats(startKey, endKey []byte) *statistics.RegionStats { c.RLock() defer c.RUnlock() - return c.core.Regions.GetRegionStats(startKey, endKey) + return statistics.GetRegionStats(c.core.Regions, startKey, endKey) } func (c *clusterInfo) dropRegion(id uint64) { @@ -489,6 +495,8 @@ func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error { } newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(time.Now())) c.core.Stores.SetStore(newStore) + c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats()) + c.storesStats.UpdateTotalBytesRate(c.core.Stores) return nil } @@ -666,7 +674,7 @@ func (c *clusterInfo) collectMetrics() { c.regionStats.Collect() c.labelLevelStats.Collect() // collect hot cache metrics - c.hotSpotCache.CollectMetrics(c.core.Stores) + c.hotSpotCache.CollectMetrics(c.storesStats) } func (c *clusterInfo) GetRegionStatsByType(typ regionStatisticType) []*core.RegionInfo { @@ -798,25 +806,25 @@ func (c *clusterInfo) CheckLabelProperty(typ string, labels []*metapb.StoreLabel } // RegionReadStats returns hot region's read stats. -func (c *clusterInfo) RegionReadStats() []*core.RegionStat { +func (c *clusterInfo) RegionReadStats() []*statistics.RegionStat { // RegionStats is a thread-safe method return c.hotSpotCache.RegionStats(statistics.ReadFlow) } // RegionWriteStats returns hot region's write stats. -func (c *clusterInfo) RegionWriteStats() []*core.RegionStat { +func (c *clusterInfo) RegionWriteStats() []*statistics.RegionStat { // RegionStats is a thread-safe method return c.hotSpotCache.RegionStats(statistics.WriteFlow) } // CheckWriteStatus checks the write status, returns whether need update statistics and item. -func (c *clusterInfo) CheckWriteStatus(region *core.RegionInfo) (bool, *core.RegionStat) { - return c.hotSpotCache.CheckWrite(region, c.core.Stores) +func (c *clusterInfo) CheckWriteStatus(region *core.RegionInfo) (bool, *statistics.RegionStat) { + return c.hotSpotCache.CheckWrite(region, c.storesStats) } // CheckReadStatus checks the read status, returns whether need update statistics and item. -func (c *clusterInfo) CheckReadStatus(region *core.RegionInfo) (bool, *core.RegionStat) { - return c.hotSpotCache.CheckRead(region, c.core.Stores) +func (c *clusterInfo) CheckReadStatus(region *core.RegionInfo) (bool, *statistics.RegionStat) { + return c.hotSpotCache.CheckRead(region, c.storesStats) } type prepareChecker struct { diff --git a/server/cluster_info_test.go b/server/cluster_info_test.go index 07b0d685ab5..7f00a135f44 100644 --- a/server/cluster_info_test.go +++ b/server/cluster_info_test.go @@ -16,7 +16,6 @@ package server import ( "math/rand" - "github.com/gogo/protobuf/proto" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -79,19 +78,6 @@ func (s *testStoresInfoSuite) TestStores(c *C) { } c.Assert(cache.GetStoreCount(), Equals, int(n)) - - bytesWritten := uint64(8 * 1024 * 1024) - bytesRead := uint64(128 * 1024 * 1024) - store := cache.GetStore(1) - - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.BytesWritten = bytesWritten - newStats.BytesRead = bytesRead - newStats.Interval = &pdpb.TimeInterval{EndTimestamp: 10, StartTimestamp: 0} - newStore := store.Clone(core.SetStoreStats(newStats)) - cache.SetStore(newStore) - c.Assert(cache.TotalBytesWriteRate(), Equals, float64(bytesWritten/10)) - c.Assert(cache.TotalBytesReadRate(), Equals, float64(bytesRead/10)) } var _ = Suite(&testRegionsInfoSuite{}) diff --git a/server/coordinator.go b/server/coordinator.go index 450db9ae9e7..9a8cb512eea 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" + "github.com/pingcap/pd/server/statistics" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -256,11 +257,11 @@ func (c *coordinator) stop() { // Hack to retrieve info from scheduler. // TODO: remove it. type hasHotStatus interface { - GetHotReadStatus() *core.StoreHotRegionInfos - GetHotWriteStatus() *core.StoreHotRegionInfos + GetHotReadStatus() *statistics.StoreHotRegionInfos + GetHotWriteStatus() *statistics.StoreHotRegionInfos } -func (c *coordinator) getHotWriteRegions() *core.StoreHotRegionInfos { +func (c *coordinator) getHotWriteRegions() *statistics.StoreHotRegionInfos { c.RLock() defer c.RUnlock() s, ok := c.schedulers[hotRegionScheduleName] @@ -273,7 +274,7 @@ func (c *coordinator) getHotWriteRegions() *core.StoreHotRegionInfos { return nil } -func (c *coordinator) getHotReadRegions() *core.StoreHotRegionInfos { +func (c *coordinator) getHotReadRegions() *statistics.StoreHotRegionInfos { c.RLock() defer c.RUnlock() s, ok := c.schedulers[hotRegionScheduleName] diff --git a/server/core/region.go b/server/core/region.go index b0fcc78536f..4e7e213070e 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -20,7 +20,6 @@ import ( "math/rand" "reflect" "strings" - "time" "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/metapb" @@ -354,49 +353,6 @@ func (r *RegionInfo) GetRegionEpoch() *metapb.RegionEpoch { return r.meta.RegionEpoch } -// RegionStat records each hot region's statistics -type RegionStat struct { - RegionID uint64 `json:"region_id"` - FlowBytes uint64 `json:"flow_bytes"` - // HotDegree records the hot region update times - HotDegree int `json:"hot_degree"` - // LastUpdateTime used to calculate average write - LastUpdateTime time.Time `json:"last_update_time"` - StoreID uint64 `json:"-"` - // AntiCount used to eliminate some noise when remove region in cache - AntiCount int - // Version used to check the region split times - Version uint64 - // Stats is a rolling statistics, recording some recently added records. - Stats *RollingStats -} - -// NewRegionStat returns a RegionStat. -func NewRegionStat(region *RegionInfo, flowBytes uint64, antiCount int) *RegionStat { - return &RegionStat{ - RegionID: region.GetID(), - FlowBytes: flowBytes, - LastUpdateTime: time.Now(), - StoreID: region.leader.GetStoreId(), - Version: region.meta.GetRegionEpoch().GetVersion(), - AntiCount: antiCount, - } -} - -// RegionsStat is a list of a group region state type -type RegionsStat []RegionStat - -func (m RegionsStat) Len() int { return len(m) } -func (m RegionsStat) Swap(i, j int) { m[i], m[j] = m[j], m[i] } -func (m RegionsStat) Less(i, j int) bool { return m[i].FlowBytes < m[j].FlowBytes } - -// HotRegionsStat records all hot regions statistics -type HotRegionsStat struct { - TotalFlowBytes uint64 `json:"total_flow_bytes"` - RegionsCount int `json:"regions_count"` - RegionsStat RegionsStat `json:"statistics"` -} - // regionMap wraps a map[uint64]*core.RegionInfo and supports randomly pick a region. type regionMap struct { m map[uint64]*regionEntry @@ -742,6 +698,21 @@ func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo { return res } +// ScanRangeWithEndKey scans region with start key and end key. +func (r *RegionsInfo) ScanRangeWithEndKey(startKey, endKey []byte) []*RegionInfo { + var regions []*RegionInfo + r.tree.scanRange(startKey, func(meta *metapb.Region) bool { + if len(endKey) > 0 && (len(meta.EndKey) == 0 || bytes.Compare(meta.EndKey, endKey) >= 0) { + return false + } + if region := r.GetRegion(meta.GetId()); region != nil { + regions = append(regions, region) + } + return true + }) + return regions +} + // ScanRangeWithIterator scans region with start key, until iterator returns false. func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(metaRegion *metapb.Region) bool) { r.tree.scanRange(startKey, iterator) @@ -769,67 +740,6 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 { return r.regions.TotalSize() / int64(r.regions.Len()) } -// RegionStats records a list of regions' statistics and distribution status. -type RegionStats struct { - Count int `json:"count"` - EmptyCount int `json:"empty_count"` - StorageSize int64 `json:"storage_size"` - StorageKeys int64 `json:"storage_keys"` - StoreLeaderCount map[uint64]int `json:"store_leader_count"` - StorePeerCount map[uint64]int `json:"store_peer_count"` - StoreLeaderSize map[uint64]int64 `json:"store_leader_size"` - StoreLeaderKeys map[uint64]int64 `json:"store_leader_keys"` - StorePeerSize map[uint64]int64 `json:"store_peer_size"` - StorePeerKeys map[uint64]int64 `json:"store_peer_keys"` -} - -func newRegionStats() *RegionStats { - return &RegionStats{ - StoreLeaderCount: make(map[uint64]int), - StorePeerCount: make(map[uint64]int), - StoreLeaderSize: make(map[uint64]int64), - StoreLeaderKeys: make(map[uint64]int64), - StorePeerSize: make(map[uint64]int64), - StorePeerKeys: make(map[uint64]int64), - } -} - -// Observe adds a region's statistics into RegionStats. -func (s *RegionStats) Observe(r *RegionInfo) { - s.Count++ - if r.approximateSize <= EmptyRegionApproximateSize { - s.EmptyCount++ - } - s.StorageSize += r.approximateSize - s.StorageKeys += r.approximateKeys - if r.leader != nil { - s.StoreLeaderCount[r.leader.GetStoreId()]++ - s.StoreLeaderSize[r.leader.GetStoreId()] += r.approximateSize - s.StoreLeaderKeys[r.leader.GetStoreId()] += r.approximateKeys - } - for _, p := range r.meta.Peers { - s.StorePeerCount[p.GetStoreId()]++ - s.StorePeerSize[p.GetStoreId()] += r.approximateSize - s.StorePeerKeys[p.GetStoreId()] += r.approximateKeys - } -} - -// GetRegionStats scans regions that inside range [startKey, endKey) and sums up -// their statistics. -func (r *RegionsInfo) GetRegionStats(startKey, endKey []byte) *RegionStats { - stats := newRegionStats() - r.tree.scanRange(startKey, func(meta *metapb.Region) bool { - if len(endKey) > 0 && (len(meta.EndKey) == 0 || bytes.Compare(meta.EndKey, endKey) >= 0) { - return false - } - if region := r.GetRegion(meta.GetId()); region != nil { - stats.Observe(region) - } - return true - }) - return stats -} - const randomRegionMaxRetry = 10 func randRegion(regions *regionMap, opts ...RegionOption) *RegionInfo { diff --git a/server/core/store.go b/server/core/store.go index dd67b627e79..d852c3173cb 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -17,7 +17,6 @@ import ( "fmt" "math" "strings" - "sync" "time" "github.com/pingcap/errcode" @@ -32,27 +31,25 @@ type StoreInfo struct { meta *metapb.Store stats *pdpb.StoreStats // Blocked means that the store is blocked from balance. - blocked bool - leaderCount int - regionCount int - leaderSize int64 - regionSize int64 - pendingPeerCount int - lastHeartbeatTS time.Time - leaderWeight float64 - regionWeight float64 - rollingStoreStats *RollingStoreStats - overloaded bool + blocked bool + leaderCount int + regionCount int + leaderSize int64 + regionSize int64 + pendingPeerCount int + lastHeartbeatTS time.Time + leaderWeight float64 + regionWeight float64 + overloaded bool } // NewStoreInfo creates StoreInfo with meta data. func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo { storeInfo := &StoreInfo{ - meta: store, - stats: &pdpb.StoreStats{}, - leaderWeight: 1.0, - regionWeight: 1.0, - rollingStoreStats: newRollingStoreStats(), + meta: store, + stats: &pdpb.StoreStats{}, + leaderWeight: 1.0, + regionWeight: 1.0, } for _, opt := range opts { opt(storeInfo) @@ -63,19 +60,18 @@ func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo { // Clone creates a copy of current StoreInfo. func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo { store := &StoreInfo{ - meta: s.meta, - stats: s.stats, - blocked: s.blocked, - leaderCount: s.leaderCount, - regionCount: s.regionCount, - leaderSize: s.leaderSize, - regionSize: s.regionSize, - pendingPeerCount: s.pendingPeerCount, - lastHeartbeatTS: s.lastHeartbeatTS, - leaderWeight: s.leaderWeight, - regionWeight: s.regionWeight, - rollingStoreStats: s.rollingStoreStats, - overloaded: s.overloaded, + meta: s.meta, + stats: s.stats, + blocked: s.blocked, + leaderCount: s.leaderCount, + regionCount: s.regionCount, + leaderSize: s.leaderSize, + regionSize: s.regionSize, + pendingPeerCount: s.pendingPeerCount, + lastHeartbeatTS: s.lastHeartbeatTS, + leaderWeight: s.leaderWeight, + regionWeight: s.regionWeight, + overloaded: s.overloaded, } for _, opt := range opts { @@ -249,11 +245,6 @@ func (s *StoreInfo) GetLastHeartbeatTS() time.Time { return s.lastHeartbeatTS } -// GetRollingStoreStats returns the rolling statistics of the store. -func (s *StoreInfo) GetRollingStoreStats() *RollingStoreStats { - return s.rollingStoreStats -} - const minWeight = 1e-6 const maxScore = 1024 * 1024 * 1024 @@ -457,15 +448,6 @@ L: return storeLabels } -// StoreHotRegionInfos : used to get human readable description for hot regions. -type StoreHotRegionInfos struct { - AsPeer StoreHotRegionsStat `json:"as_peer"` - AsLeader StoreHotRegionsStat `json:"as_leader"` -} - -// StoreHotRegionsStat used to record the hot region statistics group by store -type StoreHotRegionsStat map[uint64]*HotRegionsStat - type storeNotFoundErr struct { storeID uint64 } @@ -481,9 +463,7 @@ func NewStoreNotFoundErr(storeID uint64) errcode.ErrorCode { // StoresInfo contains information about all stores. type StoresInfo struct { - stores map[uint64]*StoreInfo - bytesReadRate float64 - bytesWriteRate float64 + stores map[uint64]*StoreInfo } // NewStoresInfo create a StoresInfo with map of storeID to StoreInfo @@ -514,8 +494,6 @@ func (s *StoresInfo) TakeStore(storeID uint64) *StoreInfo { // SetStore sets a StoreInfo with storeID. func (s *StoresInfo) SetStore(store *StoreInfo) { s.stores[store.GetID()] = store - store.GetRollingStoreStats().Observe(store.GetStoreStats()) - s.updateTotalBytesRate() } // BlockStore blocks a StoreInfo with storeID. @@ -636,122 +614,3 @@ func (s *StoresInfo) UpdateStoreStatusLocked(storeID uint64, leaderCount int, re s.SetStore(newStore) } } - -func (s *StoresInfo) updateTotalBytesRate() { - var totalBytesWirteRate float64 - var totalBytesReadRate float64 - var writeRate, readRate float64 - for _, s := range s.stores { - if s.IsUp() { - writeRate, readRate = s.GetRollingStoreStats().GetBytesRate() - totalBytesWirteRate += writeRate - totalBytesReadRate += readRate - } - } - s.bytesWriteRate = totalBytesWirteRate - s.bytesReadRate = totalBytesReadRate -} - -// TotalBytesWriteRate returns the total written bytes rate of all StoreInfo. -func (s *StoresInfo) TotalBytesWriteRate() float64 { - return s.bytesWriteRate -} - -// TotalBytesReadRate returns the total read bytes rate of all StoreInfo. -func (s *StoresInfo) TotalBytesReadRate() float64 { - return s.bytesReadRate -} - -// GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo. -func (s *StoresInfo) GetStoresBytesWriteStat() map[uint64]uint64 { - res := make(map[uint64]uint64, len(s.stores)) - for _, s := range s.stores { - writeRate, _ := s.GetRollingStoreStats().GetBytesRate() - res[s.GetID()] = uint64(writeRate) - } - return res -} - -// GetStoresBytesReadStat returns the bytes read stat of all StoreInfo. -func (s *StoresInfo) GetStoresBytesReadStat() map[uint64]uint64 { - res := make(map[uint64]uint64, len(s.stores)) - for _, s := range s.stores { - _, readRate := s.GetRollingStoreStats().GetBytesRate() - res[s.GetID()] = uint64(readRate) - } - return res -} - -// GetStoresKeysWriteStat returns the keys write stat of all StoreInfo. -func (s *StoresInfo) GetStoresKeysWriteStat() map[uint64]uint64 { - res := make(map[uint64]uint64, len(s.stores)) - for _, s := range s.stores { - res[s.GetID()] = uint64(s.GetRollingStoreStats().GetKeysWriteRate()) - } - return res -} - -// GetStoresKeysReadStat returns the bytes read stat of all StoreInfo. -func (s *StoresInfo) GetStoresKeysReadStat() map[uint64]uint64 { - res := make(map[uint64]uint64, len(s.stores)) - for _, s := range s.stores { - res[s.GetID()] = uint64(s.GetRollingStoreStats().GetKeysReadRate()) - } - return res -} - -// RollingStoreStats are multiple sets of recent historical records with specified windows size. -type RollingStoreStats struct { - sync.RWMutex - bytesWriteRate *RollingStats - bytesReadRate *RollingStats - keysWriteRate *RollingStats - keysReadRate *RollingStats -} - -const storeStatsRollingWindows = 3 - -func newRollingStoreStats() *RollingStoreStats { - return &RollingStoreStats{ - bytesWriteRate: NewRollingStats(storeStatsRollingWindows), - bytesReadRate: NewRollingStats(storeStatsRollingWindows), - keysWriteRate: NewRollingStats(storeStatsRollingWindows), - keysReadRate: NewRollingStats(storeStatsRollingWindows), - } -} - -// Observe records current statistics. -func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) { - statInterval := stats.GetInterval() - interval := statInterval.GetEndTimestamp() - statInterval.GetStartTimestamp() - if interval == 0 { - return - } - r.Lock() - defer r.Unlock() - r.bytesWriteRate.Add(float64(stats.BytesWritten / interval)) - r.bytesReadRate.Add(float64(stats.BytesRead / interval)) - r.keysWriteRate.Add(float64(stats.KeysWritten / interval)) - r.keysReadRate.Add(float64(stats.KeysRead / interval)) -} - -// GetBytesRate returns the bytes write rate and the bytes read rate. -func (r *RollingStoreStats) GetBytesRate() (writeRate float64, readRate float64) { - r.RLock() - defer r.RUnlock() - return r.bytesWriteRate.Median(), r.bytesReadRate.Median() -} - -// GetKeysWriteRate returns the keys write rate. -func (r *RollingStoreStats) GetKeysWriteRate() float64 { - r.RLock() - defer r.RUnlock() - return r.keysWriteRate.Median() -} - -// GetKeysReadRate returns the keys read rate. -func (r *RollingStoreStats) GetKeysReadRate() float64 { - r.RLock() - defer r.RUnlock() - return r.keysReadRate.Median() -} diff --git a/server/handler.go b/server/handler.go index 3394c59fca2..e4081dbdeb0 100644 --- a/server/handler.go +++ b/server/handler.go @@ -23,6 +23,7 @@ import ( log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" + "github.com/pingcap/pd/server/statistics" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -108,7 +109,7 @@ func (h *Handler) GetStores() ([]*core.StoreInfo, error) { } // GetHotWriteRegions gets all hot write regions stats. -func (h *Handler) GetHotWriteRegions() *core.StoreHotRegionInfos { +func (h *Handler) GetHotWriteRegions() *statistics.StoreHotRegionInfos { c, err := h.getCoordinator() if err != nil { return nil @@ -117,7 +118,7 @@ func (h *Handler) GetHotWriteRegions() *core.StoreHotRegionInfos { } // GetHotReadRegions gets all hot read regions stats. -func (h *Handler) GetHotReadRegions() *core.StoreHotRegionInfos { +func (h *Handler) GetHotReadRegions() *statistics.StoreHotRegionInfos { c, err := h.getCoordinator() if err != nil { return nil diff --git a/server/mock/cluster.go b/server/mock/cluster.go index 255506e205e..81983c8fc7b 100644 --- a/server/mock/cluster.go +++ b/server/mock/cluster.go @@ -33,6 +33,7 @@ type Cluster struct { *IDAllocator *ScheduleOptions *statistics.HotSpotCache + *statistics.StoresStats ID uint64 } @@ -43,6 +44,7 @@ func NewCluster(opt *ScheduleOptions) *Cluster { IDAllocator: NewIDAllocator(), ScheduleOptions: opt, HotSpotCache: statistics.NewHotSpotCache(), + StoresStats: statistics.NewStoresStats(), } } @@ -73,12 +75,12 @@ func (mc *Cluster) IsRegionHot(id uint64) bool { } // RegionReadStats returns hot region's read stats. -func (mc *Cluster) RegionReadStats() []*core.RegionStat { +func (mc *Cluster) RegionReadStats() []*statistics.RegionStat { return mc.HotSpotCache.RegionStats(statistics.ReadFlow) } // RegionWriteStats returns hot region's write stats. -func (mc *Cluster) RegionWriteStats() []*core.RegionStat { +func (mc *Cluster) RegionWriteStats() []*statistics.RegionStat { return mc.HotSpotCache.RegionStats(statistics.WriteFlow) } @@ -227,7 +229,7 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, followerIds ...uint64) { r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r = r.Clone(core.SetReadBytes(readBytes)) - isUpdate, item := mc.HotSpotCache.CheckRead(r, mc.Stores) + isUpdate, item := mc.HotSpotCache.CheckRead(r, mc.StoresStats) if isUpdate { mc.HotSpotCache.Update(regionID, item, statistics.ReadFlow) } @@ -238,7 +240,7 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, func (mc *Cluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, followerIds ...uint64) { r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r = r.Clone(core.SetWrittenBytes(writtenBytes)) - isUpdate, item := mc.HotSpotCache.CheckWrite(r, mc.Stores) + isUpdate, item := mc.HotSpotCache.CheckWrite(r, mc.StoresStats) if isUpdate { mc.HotSpotCache.Update(regionID, item, statistics.WriteFlow) } diff --git a/server/namespace_cluster.go b/server/namespace_cluster.go index e9764165971..eb4e08d5ac8 100644 --- a/server/namespace_cluster.go +++ b/server/namespace_cluster.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" + "github.com/pingcap/pd/server/statistics" ) // namespaceCluster is part of a global cluster that contains stores and regions @@ -124,9 +125,9 @@ func (c *namespaceCluster) GetRegion(id uint64) *core.RegionInfo { } // RegionWriteStats returns hot region's write stats. -func (c *namespaceCluster) RegionWriteStats() []*core.RegionStat { +func (c *namespaceCluster) RegionWriteStats() []*statistics.RegionStat { allStats := c.Cluster.RegionWriteStats() - stats := make([]*core.RegionStat, 0, len(allStats)) + stats := make([]*statistics.RegionStat, 0, len(allStats)) for _, s := range allStats { if c.GetRegion(s.RegionID) != nil { stats = append(stats, s) diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index e1e11fce972..27281703243 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -48,8 +48,8 @@ type Cluster interface { ResetStoreOverload(id uint64) IsRegionHot(id uint64) bool - RegionWriteStats() []*core.RegionStat - RegionReadStats() []*core.RegionStat + RegionWriteStats() []*statistics.RegionStat + RegionReadStats() []*statistics.RegionStat RandHotRegionFromStore(store uint64, kind statistics.FlowKind) *core.RegionInfo // get config methods diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index fd75f671338..2ad7002f8ff 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -23,6 +23,7 @@ import ( log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" + "github.com/pingcap/pd/server/statistics" "go.uber.org/zap" ) @@ -54,16 +55,16 @@ const ( ) type storeStatistics struct { - readStatAsLeader core.StoreHotRegionsStat - writeStatAsPeer core.StoreHotRegionsStat - writeStatAsLeader core.StoreHotRegionsStat + readStatAsLeader statistics.StoreHotRegionsStat + writeStatAsPeer statistics.StoreHotRegionsStat + writeStatAsLeader statistics.StoreHotRegionsStat } func newStoreStaticstics() *storeStatistics { return &storeStatistics{ - readStatAsLeader: make(core.StoreHotRegionsStat), - writeStatAsLeader: make(core.StoreHotRegionsStat), - writeStatAsPeer: make(core.StoreHotRegionsStat), + readStatAsLeader: make(statistics.StoreHotRegionsStat), + writeStatAsLeader: make(statistics.StoreHotRegionsStat), + writeStatAsPeer: make(statistics.StoreHotRegionsStat), } } @@ -217,8 +218,8 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu return nil } -func calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.ResourceKind) core.StoreHotRegionsStat { - stats := make(core.StoreHotRegionsStat) +func calcScore(items []*statistics.RegionStat, cluster schedule.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat { + stats := make(statistics.StoreHotRegionsStat) for _, r := range items { // HotDegree is the update times on the hot cache. If the heartbeat report // the flow of the region exceeds the threshold, the scheduler will update the region in @@ -245,13 +246,13 @@ func calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.Res for _, storeID := range storeIDs { storeStat, ok := stats[storeID] if !ok { - storeStat = &core.HotRegionsStat{ - RegionsStat: make(core.RegionsStat, 0, storeHotRegionsDefaultLen), + storeStat = &statistics.HotRegionsStat{ + RegionsStat: make(statistics.RegionsStat, 0, storeHotRegionsDefaultLen), } stats[storeID] = storeStat } - s := core.RegionStat{ + s := statistics.RegionStat{ RegionID: r.RegionID, FlowBytes: uint64(r.Stats.Median()), HotDegree: r.HotDegree, @@ -269,7 +270,7 @@ func calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.Res } // balanceByPeer balances the peer distribution of hot regions. -func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, storesStat core.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer, *metapb.Peer) { +func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, storesStat statistics.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer, *metapb.Peer) { if !h.allowBalanceRegion(cluster) { return nil, nil, nil } @@ -330,7 +331,7 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto } // balanceByLeader balances the leader distribution of hot regions. -func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, storesStat core.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer) { +func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, storesStat statistics.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer) { if !h.allowBalanceLeader(cluster) { return nil, nil } @@ -376,7 +377,7 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s // Select the store to move hot regions from. // We choose the store with the maximum number of hot region first. // Inside these stores, we choose the one with maximum flow bytes. -func (h *balanceHotRegionsScheduler) selectSrcStore(stats core.StoreHotRegionsStat) (srcStoreID uint64) { +func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotRegionsStat) (srcStoreID uint64) { var ( maxFlowBytes uint64 maxHotStoreRegionCount int @@ -395,7 +396,7 @@ func (h *balanceHotRegionsScheduler) selectSrcStore(stats core.StoreHotRegionsSt // selectDestStore selects a target store to hold the region of the source region. // We choose a target store based on the hot region number and flow bytes of this store. -func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionFlowBytes uint64, srcStoreID uint64, storesStat core.StoreHotRegionsStat) (destStoreID uint64) { +func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionFlowBytes uint64, srcStoreID uint64, storesStat statistics.StoreHotRegionsStat) (destStoreID uint64) { sr := storesStat[srcStoreID] srcFlowBytes := sr.TotalFlowBytes srcHotRegionsCount := sr.RegionsStat.Len() @@ -425,7 +426,7 @@ func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, return } -func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) uint64 { +func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat statistics.StoreHotRegionsStat) uint64 { srcStoreStatistics := storesStat[storeID] var hotRegionTotalCount float64 @@ -439,24 +440,24 @@ func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesSt return maxUint64(limit, 1) } -func (h *balanceHotRegionsScheduler) GetHotReadStatus() *core.StoreHotRegionInfos { +func (h *balanceHotRegionsScheduler) GetHotReadStatus() *statistics.StoreHotRegionInfos { h.RLock() defer h.RUnlock() - asLeader := make(core.StoreHotRegionsStat, len(h.stats.readStatAsLeader)) + asLeader := make(statistics.StoreHotRegionsStat, len(h.stats.readStatAsLeader)) for id, stat := range h.stats.readStatAsLeader { clone := *stat asLeader[id] = &clone } - return &core.StoreHotRegionInfos{ + return &statistics.StoreHotRegionInfos{ AsLeader: asLeader, } } -func (h *balanceHotRegionsScheduler) GetHotWriteStatus() *core.StoreHotRegionInfos { +func (h *balanceHotRegionsScheduler) GetHotWriteStatus() *statistics.StoreHotRegionInfos { h.RLock() defer h.RUnlock() - asLeader := make(core.StoreHotRegionsStat, len(h.stats.writeStatAsLeader)) - asPeer := make(core.StoreHotRegionsStat, len(h.stats.writeStatAsPeer)) + asLeader := make(statistics.StoreHotRegionsStat, len(h.stats.writeStatAsLeader)) + asPeer := make(statistics.StoreHotRegionsStat, len(h.stats.writeStatAsPeer)) for id, stat := range h.stats.writeStatAsLeader { clone := *stat asLeader[id] = &clone @@ -465,7 +466,7 @@ func (h *balanceHotRegionsScheduler) GetHotWriteStatus() *core.StoreHotRegionInf clone := *stat asPeer[id] = &clone } - return &core.StoreHotRegionInfos{ + return &statistics.StoreHotRegionInfos{ AsLeader: asLeader, AsPeer: asPeer, } diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index a5a77c01d29..f6a64e7a056 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -21,6 +21,7 @@ import ( log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" + "github.com/pingcap/pd/server/statistics" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -95,7 +96,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ BalanceType, cluster schedule.C return nil } -func (s *shuffleHotRegionScheduler) randomSchedule(cluster schedule.Cluster, storeStats core.StoreHotRegionsStat) []*schedule.Operator { +func (s *shuffleHotRegionScheduler) randomSchedule(cluster schedule.Cluster, storeStats statistics.StoreHotRegionsStat) []*schedule.Operator { for _, stats := range storeStats { i := s.r.Intn(stats.RegionsStat.Len()) r := stats.RegionsStat[i] diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index d3d8bf5d34a..d851cb1b145 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -62,17 +62,17 @@ func NewHotSpotCache() *HotSpotCache { } // CheckWrite checks the write status, returns whether need update statistics and item. -func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresInfo) (bool, *core.RegionStat) { +func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stats *StoresStats) (bool, *RegionStat) { var ( WrittenBytesPerSec uint64 - value *core.RegionStat + value *RegionStat ) WrittenBytesPerSec = uint64(float64(region.GetBytesWritten()) / float64(RegionHeartBeatReportInterval)) v, isExist := w.writeFlow.Peek(region.GetID()) if isExist { - value = v.(*core.RegionStat) + value = v.(*RegionStat) // This is used for the simulator. if !Simulating { interval := time.Since(value.LastUpdateTime).Seconds() @@ -83,22 +83,22 @@ func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresIn } } - hotRegionThreshold := calculateWriteHotThreshold(stores) + hotRegionThreshold := calculateWriteHotThreshold(stats) return w.isNeedUpdateStatCache(region, WrittenBytesPerSec, hotRegionThreshold, value, WriteFlow) } // CheckRead checks the read status, returns whether need update statistics and item. -func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInfo) (bool, *core.RegionStat) { +func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stats *StoresStats) (bool, *RegionStat) { var ( ReadBytesPerSec uint64 - value *core.RegionStat + value *RegionStat ) ReadBytesPerSec = uint64(float64(region.GetBytesRead()) / float64(RegionHeartBeatReportInterval)) v, isExist := w.readFlow.Peek(region.GetID()) if isExist { - value = v.(*core.RegionStat) + value = v.(*RegionStat) // This is used for the simulator. if !Simulating { interval := time.Since(value.LastUpdateTime).Seconds() @@ -109,7 +109,7 @@ func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInf } } - hotRegionThreshold := calculateReadHotThreshold(stores) + hotRegionThreshold := calculateReadHotThreshold(stats) return w.isNeedUpdateStatCache(region, ReadBytesPerSec, hotRegionThreshold, value, ReadFlow) } @@ -122,13 +122,13 @@ func (w *HotSpotCache) incMetrics(name string, kind FlowKind) { } } -func calculateWriteHotThreshold(stores *core.StoresInfo) uint64 { +func calculateWriteHotThreshold(stats *StoresStats) uint64 { // hotRegionThreshold is used to pick hot region // suppose the number of the hot Regions is statCacheMaxLen // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions // divide 2 because the store reports data about two times than the region record write to rocksdb divisor := float64(statCacheMaxLen) * 2 - hotRegionThreshold := uint64(stores.TotalBytesWriteRate() / divisor) + hotRegionThreshold := uint64(stats.TotalBytesWriteRate() / divisor) if hotRegionThreshold < hotWriteRegionMinFlowRate { hotRegionThreshold = hotWriteRegionMinFlowRate @@ -136,12 +136,12 @@ func calculateWriteHotThreshold(stores *core.StoresInfo) uint64 { return hotRegionThreshold } -func calculateReadHotThreshold(stores *core.StoresInfo) uint64 { +func calculateReadHotThreshold(stats *StoresStats) uint64 { // hotRegionThreshold is used to pick hot region // suppose the number of the hot Regions is statCacheMaxLen // and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions divisor := float64(statCacheMaxLen) - hotRegionThreshold := uint64(stores.TotalBytesReadRate() / divisor) + hotRegionThreshold := uint64(stats.TotalBytesReadRate() / divisor) if hotRegionThreshold < hotReadRegionMinFlowRate { hotRegionThreshold = hotReadRegionMinFlowRate @@ -151,8 +151,8 @@ func calculateReadHotThreshold(stores *core.StoresInfo) uint64 { const rollingWindowsSize = 5 -func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes uint64, hotRegionThreshold uint64, oldItem *core.RegionStat, kind FlowKind) (bool, *core.RegionStat) { - newItem := core.NewRegionStat(region, flowBytes, hotRegionAntiCount) +func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes uint64, hotRegionThreshold uint64, oldItem *RegionStat, kind FlowKind) (bool, *RegionStat) { + newItem := NewRegionStat(region, flowBytes, hotRegionAntiCount) if oldItem != nil { newItem.HotDegree = oldItem.HotDegree + 1 newItem.Stats = oldItem.Stats @@ -160,7 +160,7 @@ func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes if flowBytes >= hotRegionThreshold { if oldItem == nil { w.incMetrics("add_item", kind) - newItem.Stats = core.NewRollingStats(rollingWindowsSize) + newItem.Stats = NewRollingStats(rollingWindowsSize) } newItem.Stats.Add(float64(flowBytes)) return true, newItem @@ -181,7 +181,7 @@ func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes } // Update updates the cache. -func (w *HotSpotCache) Update(key uint64, item *core.RegionStat, kind FlowKind) { +func (w *HotSpotCache) Update(key uint64, item *RegionStat, kind FlowKind) { switch kind { case WriteFlow: if item == nil { @@ -201,7 +201,7 @@ func (w *HotSpotCache) Update(key uint64, item *core.RegionStat, kind FlowKind) } // RegionStats returns hot items according to kind -func (w *HotSpotCache) RegionStats(kind FlowKind) []*core.RegionStat { +func (w *HotSpotCache) RegionStats(kind FlowKind) []*RegionStat { var elements []*cache.Item switch kind { case WriteFlow: @@ -209,15 +209,15 @@ func (w *HotSpotCache) RegionStats(kind FlowKind) []*core.RegionStat { case ReadFlow: elements = w.readFlow.Elems() } - stats := make([]*core.RegionStat, len(elements)) + stats := make([]*RegionStat, len(elements)) for i := range elements { - stats[i] = elements[i].Value.(*core.RegionStat) + stats[i] = elements[i].Value.(*RegionStat) } return stats } // RandHotRegionFromStore random picks a hot region in specify store. -func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hotThreshold int) *core.RegionStat { +func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hotThreshold int) *RegionStat { stats := w.RegionStats(kind) for _, i := range rand.Perm(len(stats)) { if stats[i].HotDegree >= hotThreshold && stats[i].StoreID == storeID { @@ -228,24 +228,24 @@ func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hot } // CollectMetrics collect the hot cache metrics -func (w *HotSpotCache) CollectMetrics(stores *core.StoresInfo) { +func (w *HotSpotCache) CollectMetrics(stats *StoresStats) { hotCacheStatusGauge.WithLabelValues("total_length", "write").Set(float64(w.writeFlow.Len())) hotCacheStatusGauge.WithLabelValues("total_length", "read").Set(float64(w.readFlow.Len())) - threshold := calculateWriteHotThreshold(stores) + threshold := calculateWriteHotThreshold(stats) hotCacheStatusGauge.WithLabelValues("hotThreshold", "write").Set(float64(threshold)) - threshold = calculateReadHotThreshold(stores) + threshold = calculateReadHotThreshold(stats) hotCacheStatusGauge.WithLabelValues("hotThreshold", "read").Set(float64(threshold)) } // IsRegionHot checks if the region is hot. func (w *HotSpotCache) IsRegionHot(id uint64, hotThreshold int) bool { if stat, ok := w.writeFlow.Peek(id); ok { - if stat.(*core.RegionStat).HotDegree >= hotThreshold { + if stat.(*RegionStat).HotDegree >= hotThreshold { return true } } if stat, ok := w.readFlow.Peek(id); ok { - return stat.(*core.RegionStat).HotDegree >= hotThreshold + return stat.(*RegionStat).HotDegree >= hotThreshold } return false } diff --git a/server/statistics/region.go b/server/statistics/region.go new file mode 100644 index 00000000000..1eb63ced8b8 --- /dev/null +++ b/server/statistics/region.go @@ -0,0 +1,125 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "time" + + "github.com/pingcap/pd/server/core" +) + +// RegionStat records each hot region's statistics +type RegionStat struct { + RegionID uint64 `json:"region_id"` + FlowBytes uint64 `json:"flow_bytes"` + // HotDegree records the hot region update times + HotDegree int `json:"hot_degree"` + // LastUpdateTime used to calculate average write + LastUpdateTime time.Time `json:"last_update_time"` + StoreID uint64 `json:"-"` + // AntiCount used to eliminate some noise when remove region in cache + AntiCount int + // Version used to check the region split times + Version uint64 + // Stats is a rolling statistics, recording some recently added records. + Stats *RollingStats +} + +// NewRegionStat returns a RegionStat. +func NewRegionStat(region *core.RegionInfo, flowBytes uint64, antiCount int) *RegionStat { + return &RegionStat{ + RegionID: region.GetID(), + FlowBytes: flowBytes, + LastUpdateTime: time.Now(), + StoreID: region.GetLeader().GetStoreId(), + Version: region.GetMeta().GetRegionEpoch().GetVersion(), + AntiCount: antiCount, + } +} + +// RegionsStat is a list of a group region state type +type RegionsStat []RegionStat + +func (m RegionsStat) Len() int { return len(m) } +func (m RegionsStat) Swap(i, j int) { m[i], m[j] = m[j], m[i] } +func (m RegionsStat) Less(i, j int) bool { return m[i].FlowBytes < m[j].FlowBytes } + +// HotRegionsStat records all hot regions statistics +type HotRegionsStat struct { + TotalFlowBytes uint64 `json:"total_flow_bytes"` + RegionsCount int `json:"regions_count"` + RegionsStat RegionsStat `json:"statistics"` +} + +// RegionStats records a list of regions' statistics and distribution status. +type RegionStats struct { + Count int `json:"count"` + EmptyCount int `json:"empty_count"` + StorageSize int64 `json:"storage_size"` + StorageKeys int64 `json:"storage_keys"` + StoreLeaderCount map[uint64]int `json:"store_leader_count"` + StorePeerCount map[uint64]int `json:"store_peer_count"` + StoreLeaderSize map[uint64]int64 `json:"store_leader_size"` + StoreLeaderKeys map[uint64]int64 `json:"store_leader_keys"` + StorePeerSize map[uint64]int64 `json:"store_peer_size"` + StorePeerKeys map[uint64]int64 `json:"store_peer_keys"` +} + +func newRegionStats() *RegionStats { + return &RegionStats{ + StoreLeaderCount: make(map[uint64]int), + StorePeerCount: make(map[uint64]int), + StoreLeaderSize: make(map[uint64]int64), + StoreLeaderKeys: make(map[uint64]int64), + StorePeerSize: make(map[uint64]int64), + StorePeerKeys: make(map[uint64]int64), + } +} + +// Observe adds a region's statistics into RegionStats. +func (s *RegionStats) Observe(r *core.RegionInfo) { + s.Count++ + approximateKeys := r.GetApproximateKeys() + approximateSize := r.GetApproximateSize() + if approximateSize <= core.EmptyRegionApproximateSize { + s.EmptyCount++ + } + s.StorageSize += approximateSize + s.StorageKeys += approximateKeys + leader := r.GetLeader() + if leader != nil { + storeID := leader.GetStoreId() + s.StoreLeaderCount[storeID]++ + s.StoreLeaderSize[storeID] += approximateSize + s.StoreLeaderKeys[storeID] += approximateKeys + } + peers := r.GetMeta().GetPeers() + for _, p := range peers { + storeID := p.GetStoreId() + s.StorePeerCount[storeID]++ + s.StorePeerSize[storeID] += approximateSize + s.StorePeerKeys[storeID] += approximateKeys + } +} + +// GetRegionStats scans regions that inside range [startKey, endKey) and sums up +// their statistics. +func GetRegionStats(r *core.RegionsInfo, startKey, endKey []byte) *RegionStats { + stats := newRegionStats() + regions := r.ScanRangeWithEndKey(startKey, endKey) + for _, region := range regions { + stats.Observe(region) + } + return stats +} diff --git a/server/statistics/store.go b/server/statistics/store.go new file mode 100644 index 00000000000..32b324081a1 --- /dev/null +++ b/server/statistics/store.go @@ -0,0 +1,186 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "sync" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/core" +) + +// StoresStats is a cache hold hot regions. +type StoresStats struct { + rollingStoresStats map[uint64]*RollingStoreStats + bytesReadRate float64 + bytesWriteRate float64 +} + +// NewStoresStats creates a new hot spot cache. +func NewStoresStats() *StoresStats { + return &StoresStats{ + rollingStoresStats: make(map[uint64]*RollingStoreStats), + } +} + +// CreateRollingStoreStats creates RollingStoreStats with a given store ID. +func (s *StoresStats) CreateRollingStoreStats(storeID uint64) { + s.rollingStoresStats[storeID] = newRollingStoreStats() +} + +// RemoveRollingStoreStats removes RollingStoreStats with a given store ID. +func (s *StoresStats) RemoveRollingStoreStats(storeID uint64) { + delete(s.rollingStoresStats, storeID) +} + +// GetRollingStoreStats gets RollingStoreStats with a given store ID. +func (s *StoresStats) GetRollingStoreStats(storeID uint64) *RollingStoreStats { + return s.rollingStoresStats[storeID] +} + +// Observe records the current store status with a given store. +func (s *StoresStats) Observe(storeID uint64, stats *pdpb.StoreStats) { + s.rollingStoresStats[storeID].Observe(stats) +} + +// UpdateTotalBytesRate updates the total bytes write rate and read rate. +func (s *StoresStats) UpdateTotalBytesRate(stores *core.StoresInfo) { + var totalBytesWriteRate float64 + var totalBytesReadRate float64 + var writeRate, readRate float64 + ss := stores.GetStores() + for _, store := range ss { + if store.IsUp() { + writeRate, readRate = s.rollingStoresStats[store.GetID()].GetBytesRate() + totalBytesWriteRate += writeRate + totalBytesReadRate += readRate + } + } + s.bytesWriteRate = totalBytesWriteRate + s.bytesReadRate = totalBytesReadRate +} + +// TotalBytesWriteRate returns the total written bytes rate of all StoreInfo. +func (s *StoresStats) TotalBytesWriteRate() float64 { + return s.bytesWriteRate +} + +// TotalBytesReadRate returns the total read bytes rate of all StoreInfo. +func (s *StoresStats) TotalBytesReadRate() float64 { + return s.bytesReadRate +} + +// GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo. +func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 { + res := make(map[uint64]uint64, len(s.rollingStoresStats)) + for storeID, stats := range s.rollingStoresStats { + writeRate, _ := stats.GetBytesRate() + res[storeID] = uint64(writeRate) + } + return res +} + +// GetStoresBytesReadStat returns the bytes read stat of all StoreInfo. +func (s *StoresStats) GetStoresBytesReadStat() map[uint64]uint64 { + res := make(map[uint64]uint64, len(s.rollingStoresStats)) + for storeID, stats := range s.rollingStoresStats { + _, readRate := stats.GetBytesRate() + res[storeID] = uint64(readRate) + } + return res +} + +// GetStoresKeysWriteStat returns the keys write stat of all StoreInfo. +func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]uint64 { + res := make(map[uint64]uint64, len(s.rollingStoresStats)) + for storeID, stats := range s.rollingStoresStats { + res[storeID] = uint64(stats.GetKeysWriteRate()) + } + return res +} + +// GetStoresKeysReadStat returns the bytes read stat of all StoreInfo. +func (s *StoresStats) GetStoresKeysReadStat() map[uint64]uint64 { + res := make(map[uint64]uint64, len(s.rollingStoresStats)) + for storeID, stats := range s.rollingStoresStats { + res[storeID] = uint64(stats.GetKeysReadRate()) + } + return res +} + +// StoreHotRegionInfos : used to get human readable description for hot regions. +type StoreHotRegionInfos struct { + AsPeer StoreHotRegionsStat `json:"as_peer"` + AsLeader StoreHotRegionsStat `json:"as_leader"` +} + +// StoreHotRegionsStat used to record the hot region statistics group by store +type StoreHotRegionsStat map[uint64]*HotRegionsStat + +// RollingStoreStats are multiple sets of recent historical records with specified windows size. +type RollingStoreStats struct { + sync.RWMutex + bytesWriteRate *RollingStats + bytesReadRate *RollingStats + keysWriteRate *RollingStats + keysReadRate *RollingStats +} + +const storeStatsRollingWindows = 3 + +// NewRollingStoreStats creates a RollingStoreStats. +func newRollingStoreStats() *RollingStoreStats { + return &RollingStoreStats{ + bytesWriteRate: NewRollingStats(storeStatsRollingWindows), + bytesReadRate: NewRollingStats(storeStatsRollingWindows), + keysWriteRate: NewRollingStats(storeStatsRollingWindows), + keysReadRate: NewRollingStats(storeStatsRollingWindows), + } +} + +// Observe records current statistics. +func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) { + statInterval := stats.GetInterval() + interval := statInterval.GetEndTimestamp() - statInterval.GetStartTimestamp() + if interval == 0 { + return + } + r.Lock() + defer r.Unlock() + r.bytesWriteRate.Add(float64(stats.BytesWritten / interval)) + r.bytesReadRate.Add(float64(stats.BytesRead / interval)) + r.keysWriteRate.Add(float64(stats.KeysWritten / interval)) + r.keysReadRate.Add(float64(stats.KeysRead / interval)) +} + +// GetBytesRate returns the bytes write rate and the bytes read rate. +func (r *RollingStoreStats) GetBytesRate() (writeRate float64, readRate float64) { + r.RLock() + defer r.RUnlock() + return r.bytesWriteRate.Median(), r.bytesReadRate.Median() +} + +// GetKeysWriteRate returns the keys write rate. +func (r *RollingStoreStats) GetKeysWriteRate() float64 { + r.RLock() + defer r.RUnlock() + return r.keysWriteRate.Median() +} + +// GetKeysReadRate returns the keys read rate. +func (r *RollingStoreStats) GetKeysReadRate() float64 { + r.RLock() + defer r.RUnlock() + return r.keysReadRate.Median() +} diff --git a/server/core/statistics.go b/server/statistics/util.go similarity index 98% rename from server/core/statistics.go rename to server/statistics/util.go index 1ad14ba4061..d4aa1ced880 100644 --- a/server/core/statistics.go +++ b/server/statistics/util.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package core +package statistics import ( "github.com/montanaflynn/stats" diff --git a/server/core/statistics_test.go b/server/statistics/util_test.go similarity index 92% rename from server/core/statistics_test.go rename to server/statistics/util_test.go index 4788996c353..d7699258aad 100644 --- a/server/core/statistics_test.go +++ b/server/statistics/util_test.go @@ -11,12 +11,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package core +package statistics import ( + "testing" + . "github.com/pingcap/check" ) +func Test(t *testing.T) { + TestingT(t) +} + var _ = Suite(&testRollingStats{}) type testRollingStats struct{} diff --git a/server/store_statistics.go b/server/store_statistics.go index dd759aedfb4..a17d99626c8 100644 --- a/server/store_statistics.go +++ b/server/store_statistics.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" + "github.com/pingcap/pd/server/statistics" ) const ( @@ -52,7 +53,7 @@ func newStoreStatistics(opt *scheduleOption, namespace string) *storeStatistics } } -func (s *storeStatistics) Observe(store *core.StoreInfo) { +func (s *storeStatistics) Observe(store *core.StoreInfo, stats *statistics.StoresStats) { for _, k := range s.opt.GetLocationLabels() { v := store.GetLabelValue(k) if v == "" { @@ -103,7 +104,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "store_capacity").Set(float64(store.GetCapacity())) // Store flows. - storeFlowStats := store.GetRollingStoreStats() + storeFlowStats := stats.GetRollingStoreStats(store.GetID()) storeWriteRateBytes, storeReadRateBytes := storeFlowStats.GetBytesRate() storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "store_write_rate_bytes").Set(float64(storeWriteRateBytes)) storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "store_read_rate_bytes").Set(float64(storeReadRateBytes)) @@ -199,14 +200,14 @@ func newStoreStatisticsMap(opt *scheduleOption, classifier namespace.Classifier) } } -func (m *storeStatisticsMap) Observe(store *core.StoreInfo) { +func (m *storeStatisticsMap) Observe(store *core.StoreInfo, stats *statistics.StoresStats) { namespace := m.classifier.GetStoreNamespace(store) stat, ok := m.stats[namespace] if !ok { stat = newStoreStatistics(m.opt, namespace) m.stats[namespace] = stat } - stat.Observe(store) + stat.Observe(store, stats) } func (m *storeStatisticsMap) Collect() { diff --git a/server/store_statistics_test.go b/server/store_statistics_test.go index 8c25c645962..aa5b682f8d6 100644 --- a/server/store_statistics_test.go +++ b/server/store_statistics_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" + "github.com/pingcap/pd/server/statistics" ) var _ = Suite(&testStoreStatisticsSuite{}) @@ -42,9 +43,11 @@ func (t *testStoreStatisticsSuite) TestStoreStatistics(c *C) { {Id: 7, Address: "mock://tikv-7", Labels: []*metapb.StoreLabel{{Key: "host", Value: "h1"}}}, {Id: 8, Address: "mock://tikv-8", Labels: []*metapb.StoreLabel{{Key: "host", Value: "h2"}}}, } + storesStats := statistics.NewStoresStats() var stores []*core.StoreInfo for _, m := range metaStores { s := core.NewStoreInfo(m, core.SetLastHeartbeatTS(time.Now())) + storesStats.CreateRollingStoreStats(m.GetId()) stores = append(stores, s) } @@ -54,7 +57,7 @@ func (t *testStoreStatisticsSuite) TestStoreStatistics(c *C) { stores[4] = store4 storeStats := newStoreStatisticsMap(opt, namespace.DefaultClassifier) for _, store := range stores { - storeStats.Observe(store) + storeStats.Observe(store, storesStats) } stats := storeStats.stats["global"] diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index b5a9091f780..a82ff5165e6 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server" "github.com/pingcap/pd/server/api" - "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/tests" "github.com/pingcap/pd/tests/pdctl" ) @@ -78,8 +77,8 @@ func (s *hotTestSuite) TestHot(c *C) { newStats.KeysWritten = keysWritten newStats.KeysRead = keysRead newStats.Interval = interval - newStore := ss.Clone(core.SetStoreStats(newStats)) - newStore.GetRollingStoreStats().Observe(newStore.GetStoreStats()) + rc := leaderServer.GetRaftCluster() + rc.GetStoresStats().Observe(ss.GetID(), newStats) // TODO: Provide a way to test the result of hot read and hot write commands // hot read