Skip to content

Commit

Permalink
*: split statistics from core package (#1565)
Browse files Browse the repository at this point in the history
* split statistics from core package

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored and nolouch committed Jun 11, 2019
1 parent 5cb107e commit dadede4
Show file tree
Hide file tree
Showing 22 changed files with 482 additions and 383 deletions.
9 changes: 5 additions & 4 deletions server/api/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/pd/pkg/apiutil"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/statistics"
)

var _ = Suite(&testStatsSuite{})
Expand Down Expand Up @@ -127,7 +128,7 @@ func (s *testStatsSuite) TestRegionStats(c *C) {
// 3 ["t", "x") 1 1 F L
// 4 ["x", "") 50 20 L

statsAll := &core.RegionStats{
statsAll := &statistics.RegionStats{
Count: 4,
EmptyCount: 1,
StorageSize: 351,
Expand All @@ -141,12 +142,12 @@ func (s *testStatsSuite) TestRegionStats(c *C) {
}
res, err := http.Get(statsURL)
c.Assert(err, IsNil)
stats := &core.RegionStats{}
stats := &statistics.RegionStats{}
err = apiutil.ReadJSON(res.Body, stats)
c.Assert(err, IsNil)
c.Assert(stats, DeepEquals, statsAll)

stats23 := &core.RegionStats{
stats23 := &statistics.RegionStats{
Count: 2,
EmptyCount: 1,
StorageSize: 201,
Expand All @@ -161,7 +162,7 @@ func (s *testStatsSuite) TestRegionStats(c *C) {
args := fmt.Sprintf("?start_key=%s&end_key=%s", url.QueryEscape("\x01\x02"), url.QueryEscape("xyz\x00\x00"))
res, err = http.Get(statsURL + args)
c.Assert(err, IsNil)
stats = &core.RegionStats{}
stats = &statistics.RegionStats{}
err = apiutil.ReadJSON(res.Body, stats)
c.Assert(err, IsNil)
c.Assert(stats, DeepEquals, stats23)
Expand Down
6 changes: 3 additions & 3 deletions server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/pingcap/pd/pkg/typeutil"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/statistics"
"github.com/unrolled/render"
)

Expand Down Expand Up @@ -106,7 +106,7 @@ func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) {
}

func (h *trendHandler) getTrendStores() ([]trendStore, error) {
var readStats, writeStats core.StoreHotRegionsStat
var readStats, writeStats statistics.StoreHotRegionsStat
if hotRead := h.GetHotReadRegions(); hotRead != nil {
readStats = hotRead.AsLeader
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) {
return trendStores, nil
}

func (h *trendHandler) getStoreFlow(stats core.StoreHotRegionsStat, storeID uint64) (storeFlow uint64, regionFlows []uint64) {
func (h *trendHandler) getStoreFlow(stats statistics.StoreHotRegionsStat, storeID uint64) (storeFlow uint64, regionFlows []uint64) {
if stats == nil {
return
}
Expand Down
12 changes: 10 additions & 2 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
syncer "github.com/pingcap/pd/server/region_syncer"
"github.com/pingcap/pd/server/statistics"
"github.com/pkg/errors"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -326,12 +327,19 @@ func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo {
}

// GetRegionStats returns region statistics from cluster.
func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *core.RegionStats {
func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.RegionStats {
c.RLock()
defer c.RUnlock()
return c.cachedCluster.getRegionStats(startKey, endKey)
}

// GetStoresStats returns stores' statistics from cluster.
func (c *RaftCluster) GetStoresStats() *statistics.StoresStats {
c.RLock()
defer c.RUnlock()
return c.cachedCluster.storesStats
}

// DropCacheRegion removes a region from the cache.
func (c *RaftCluster) DropCacheRegion(id uint64) {
c.RLock()
Expand Down Expand Up @@ -652,7 +660,7 @@ func (c *RaftCluster) collectMetrics() {
cluster := c.cachedCluster
statsMap := newStoreStatisticsMap(c.cachedCluster.opt, c.GetNamespaceClassifier())
for _, s := range cluster.GetStores() {
statsMap.Observe(s)
statsMap.Observe(s, cluster.storesStats)
}
statsMap.Collect()

Expand Down
36 changes: 22 additions & 14 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type clusterInfo struct {
opt *scheduleOption
regionStats *regionStatistics
labelLevelStats *labelLevelStatistics
storesStats *statistics.StoresStats
prepareChecker *prepareChecker
changedRegions chan *core.RegionInfo
hotSpotCache *statistics.HotSpotCache
Expand All @@ -51,6 +52,7 @@ func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clus
opt: opt,
kv: kv,
labelLevelStats: newLabelLevelStatistics(),
storesStats: statistics.NewStoresStats(),
prepareChecker: newPrepareChecker(),
changedRegions: make(chan *core.RegionInfo, defaultChangedRegionsLimit),
hotSpotCache: statistics.NewHotSpotCache(),
Expand Down Expand Up @@ -87,7 +89,9 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl
zap.Int("count", c.core.Regions.GetRegionCount()),
zap.Duration("cost", time.Since(start)),
)

for _, store := range c.core.Stores.GetStores() {
c.storesStats.CreateRollingStoreStats(store.GetID())
}
return c, nil
}

Expand Down Expand Up @@ -201,6 +205,7 @@ func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error {
}
}
c.core.PutStore(store)
c.storesStats.CreateRollingStoreStats(store.GetID())
return nil
}

Expand All @@ -217,6 +222,7 @@ func (c *clusterInfo) deleteStoreLocked(store *core.StoreInfo) error {
}
}
c.core.DeleteStore(store)
c.storesStats.RemoveRollingStoreStats(store.GetID())
return nil
}

Expand Down Expand Up @@ -270,25 +276,25 @@ func (c *clusterInfo) getStoreCount() int {
func (c *clusterInfo) getStoresBytesWriteStat() map[uint64]uint64 {
c.RLock()
defer c.RUnlock()
return c.core.Stores.GetStoresBytesWriteStat()
return c.storesStats.GetStoresBytesWriteStat()
}

func (c *clusterInfo) getStoresBytesReadStat() map[uint64]uint64 {
c.RLock()
defer c.RUnlock()
return c.core.Stores.GetStoresBytesReadStat()
return c.storesStats.GetStoresBytesReadStat()
}

func (c *clusterInfo) getStoresKeysWriteStat() map[uint64]uint64 {
c.RLock()
defer c.RUnlock()
return c.core.Stores.GetStoresKeysWriteStat()
return c.storesStats.GetStoresKeysWriteStat()
}

func (c *clusterInfo) getStoresKeysReadStat() map[uint64]uint64 {
c.RLock()
defer c.RUnlock()
return c.core.Stores.GetStoresKeysReadStat()
return c.storesStats.GetStoresKeysReadStat()
}

// ScanRegions scans region with start key, until number greater than limit.
Expand Down Expand Up @@ -382,10 +388,10 @@ func (c *clusterInfo) getRegionCount() int {
return c.core.Regions.GetRegionCount()
}

func (c *clusterInfo) getRegionStats(startKey, endKey []byte) *core.RegionStats {
func (c *clusterInfo) getRegionStats(startKey, endKey []byte) *statistics.RegionStats {
c.RLock()
defer c.RUnlock()
return c.core.Regions.GetRegionStats(startKey, endKey)
return statistics.GetRegionStats(c.core.Regions, startKey, endKey)
}

func (c *clusterInfo) dropRegion(id uint64) {
Expand Down Expand Up @@ -489,6 +495,8 @@ func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error {
}
newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(time.Now()))
c.core.Stores.SetStore(newStore)
c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats())
c.storesStats.UpdateTotalBytesRate(c.core.Stores)
return nil
}

Expand Down Expand Up @@ -666,7 +674,7 @@ func (c *clusterInfo) collectMetrics() {
c.regionStats.Collect()
c.labelLevelStats.Collect()
// collect hot cache metrics
c.hotSpotCache.CollectMetrics(c.core.Stores)
c.hotSpotCache.CollectMetrics(c.storesStats)
}

func (c *clusterInfo) GetRegionStatsByType(typ regionStatisticType) []*core.RegionInfo {
Expand Down Expand Up @@ -798,25 +806,25 @@ func (c *clusterInfo) CheckLabelProperty(typ string, labels []*metapb.StoreLabel
}

// RegionReadStats returns hot region's read stats.
func (c *clusterInfo) RegionReadStats() []*core.RegionStat {
func (c *clusterInfo) RegionReadStats() []*statistics.RegionStat {
// RegionStats is a thread-safe method
return c.hotSpotCache.RegionStats(statistics.ReadFlow)
}

// RegionWriteStats returns hot region's write stats.
func (c *clusterInfo) RegionWriteStats() []*core.RegionStat {
func (c *clusterInfo) RegionWriteStats() []*statistics.RegionStat {
// RegionStats is a thread-safe method
return c.hotSpotCache.RegionStats(statistics.WriteFlow)
}

// CheckWriteStatus checks the write status, returns whether need update statistics and item.
func (c *clusterInfo) CheckWriteStatus(region *core.RegionInfo) (bool, *core.RegionStat) {
return c.hotSpotCache.CheckWrite(region, c.core.Stores)
func (c *clusterInfo) CheckWriteStatus(region *core.RegionInfo) (bool, *statistics.RegionStat) {
return c.hotSpotCache.CheckWrite(region, c.storesStats)
}

// CheckReadStatus checks the read status, returns whether need update statistics and item.
func (c *clusterInfo) CheckReadStatus(region *core.RegionInfo) (bool, *core.RegionStat) {
return c.hotSpotCache.CheckRead(region, c.core.Stores)
func (c *clusterInfo) CheckReadStatus(region *core.RegionInfo) (bool, *statistics.RegionStat) {
return c.hotSpotCache.CheckRead(region, c.storesStats)
}

type prepareChecker struct {
Expand Down
14 changes: 0 additions & 14 deletions server/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package server
import (
"math/rand"

"github.com/gogo/protobuf/proto"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -79,19 +78,6 @@ func (s *testStoresInfoSuite) TestStores(c *C) {
}

c.Assert(cache.GetStoreCount(), Equals, int(n))

bytesWritten := uint64(8 * 1024 * 1024)
bytesRead := uint64(128 * 1024 * 1024)
store := cache.GetStore(1)

newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.BytesRead = bytesRead
newStats.Interval = &pdpb.TimeInterval{EndTimestamp: 10, StartTimestamp: 0}
newStore := store.Clone(core.SetStoreStats(newStats))
cache.SetStore(newStore)
c.Assert(cache.TotalBytesWriteRate(), Equals, float64(bytesWritten/10))
c.Assert(cache.TotalBytesReadRate(), Equals, float64(bytesRead/10))
}

var _ = Suite(&testRegionsInfoSuite{})
Expand Down
9 changes: 5 additions & 4 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/statistics"
"github.com/pkg/errors"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -256,11 +257,11 @@ func (c *coordinator) stop() {
// Hack to retrieve info from scheduler.
// TODO: remove it.
type hasHotStatus interface {
GetHotReadStatus() *core.StoreHotRegionInfos
GetHotWriteStatus() *core.StoreHotRegionInfos
GetHotReadStatus() *statistics.StoreHotRegionInfos
GetHotWriteStatus() *statistics.StoreHotRegionInfos
}

func (c *coordinator) getHotWriteRegions() *core.StoreHotRegionInfos {
func (c *coordinator) getHotWriteRegions() *statistics.StoreHotRegionInfos {
c.RLock()
defer c.RUnlock()
s, ok := c.schedulers[hotRegionScheduleName]
Expand All @@ -273,7 +274,7 @@ func (c *coordinator) getHotWriteRegions() *core.StoreHotRegionInfos {
return nil
}

func (c *coordinator) getHotReadRegions() *core.StoreHotRegionInfos {
func (c *coordinator) getHotReadRegions() *statistics.StoreHotRegionInfos {
c.RLock()
defer c.RUnlock()
s, ok := c.schedulers[hotRegionScheduleName]
Expand Down
Loading

0 comments on commit dadede4

Please sign in to comment.