Skip to content

Commit

Permalink
ctl: support to debug hot bucket info (#6764)
Browse files Browse the repository at this point in the history
close #6765

Signed-off-by: bufferflies <1045931706@qq.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
bufferflies and ti-chi-bot[bot] authored Jul 10, 2023
1 parent 906850b commit 47d408b
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 73 deletions.
8 changes: 4 additions & 4 deletions pkg/statistics/buckets/hot_bucket_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type HotBucketCache struct {
ctx context.Context
}

// GetHotBucketStats returns the hot stats of the regions that great than degree.
func (h *HotBucketCache) GetHotBucketStats(degree int, regions []uint64) map[uint64][]*BucketStat {
// GetHotBucketStats returns the hot stats of the regionIDs that great than degree.
func (h *HotBucketCache) GetHotBucketStats(degree int, regionIDs []uint64) map[uint64][]*BucketStat {
rst := make(map[uint64][]*BucketStat)
appendItems := func(item *BucketTreeItem) {
stats := make([]*BucketStat, 0)
Expand All @@ -67,12 +67,12 @@ func (h *HotBucketCache) GetHotBucketStats(degree int, regions []uint64) map[uin
rst[item.regionID] = stats
}
}
if len(regions) == 0 {
if len(regionIDs) == 0 {
for _, item := range h.bucketsOfRegion {
appendItems(item)
}
} else {
for _, region := range regions {
for _, region := range regionIDs {
if item, ok := h.bucketsOfRegion[region]; ok {
appendItems(item)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/statistics/buckets/hot_bucket_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ func (t *checkBucketsTask) runTask(cache *HotBucketCache) {

type collectBucketStatsTask struct {
minDegree int
regions []uint64
regionIDs []uint64
ret chan map[uint64][]*BucketStat // RegionID ==>Buckets
}

// NewCollectBucketStatsTask creates task to collect bucket stats.
func NewCollectBucketStatsTask(minDegree int, regions ...uint64) *collectBucketStatsTask {
func NewCollectBucketStatsTask(minDegree int, regionIDs ...uint64) *collectBucketStatsTask {
return &collectBucketStatsTask{
minDegree: minDegree,
regions: regions,
regionIDs: regionIDs,
ret: make(chan map[uint64][]*BucketStat, 1),
}
}
Expand All @@ -84,7 +84,7 @@ func (t *collectBucketStatsTask) taskType() flowItemTaskKind {
}

func (t *collectBucketStatsTask) runTask(cache *HotBucketCache) {
t.ret <- cache.GetHotBucketStats(t.minDegree, t.regions)
t.ret <- cache.GetHotBucketStats(t.minDegree, t.regionIDs)
}

// WaitRet returns the result of the task.
Expand Down
56 changes: 54 additions & 2 deletions server/api/hot_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"net/http"
"strconv"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
Expand All @@ -32,6 +34,32 @@ type hotStatusHandler struct {
rd *render.Render
}

// HotBucketsResponse is the response for hot buckets.
type HotBucketsResponse map[uint64][]*HotBucketsItem

// HotBucketsItem is the item of hot buckets.
type HotBucketsItem struct {
StartKey string `json:"start_key"`
EndKey string `json:"end_key"`
HotDegree int `json:"hot_degree"`
ReadBytes uint64 `json:"read_bytes"`
ReadKeys uint64 `json:"read_keys"`
WriteBytes uint64 `json:"write_bytes"`
WriteKeys uint64 `json:"write_keys"`
}

func convert(buckets *buckets.BucketStat) *HotBucketsItem {
return &HotBucketsItem{
StartKey: core.HexRegionKeyStr(buckets.StartKey),
EndKey: core.HexRegionKeyStr(buckets.EndKey),
HotDegree: buckets.HotDegree,
ReadBytes: buckets.Loads[statistics.RegionReadBytes],
ReadKeys: buckets.Loads[statistics.RegionReadKeys],
WriteBytes: buckets.Loads[statistics.RegionWriteBytes],
WriteKeys: buckets.Loads[statistics.RegionWriteKeys],
}
}

// HotStoreStats is used to record the status of hot stores.
type HotStoreStats struct {
BytesWriteStats map[uint64]float64 `json:"bytes-write-rate,omitempty"`
Expand Down Expand Up @@ -169,6 +197,30 @@ func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, stats)
}

// @Tags hotspot
// @Summary List the hot buckets.
// @Produce json
// @Success 200 {object} HotBucketsResponse
// @Router /hotspot/buckets [get]
func (h *hotStatusHandler) GetHotBuckets(w http.ResponseWriter, r *http.Request) {
regionIDs := r.URL.Query()["region_id"]
ids := make([]uint64, len(regionIDs))
for i, regionID := range regionIDs {
if id, err := strconv.ParseUint(regionID, 10, 64); err == nil {
ids[i] = id
}
}
stats := h.Handler.GetHotBuckets()
ret := HotBucketsResponse{}
for regionID, stats := range stats {
ret[regionID] = make([]*HotBucketsItem, len(stats))
for i, stat := range stats {
ret[regionID][i] = convert(stat)
}
}
h.rd.JSON(w, http.StatusOK, ret)
}

// @Tags hotspot
// @Summary List the history hot regions.
// @Accept json
Expand All @@ -190,15 +242,15 @@ func (h *hotStatusHandler) GetHistoryHotRegions(w http.ResponseWriter, r *http.R
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
results, err := getAllRequestHistroyHotRegion(h.Handler, historyHotRegionsRequest)
results, err := getAllRequestHistoryHotRegion(h.Handler, historyHotRegionsRequest)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, results)
}

func getAllRequestHistroyHotRegion(handler *server.Handler, request *HistoryHotRegionsRequest) (*storage.HistoryHotRegions, error) {
func getAllRequestHistoryHotRegion(handler *server.Handler, request *HistoryHotRegionsRequest) (*storage.HistoryHotRegions, error) {
var hotRegionTypes = storage.HotRegionTypes
if len(request.HotRegionTypes) != 0 {
hotRegionTypes = request.HotRegionTypes
Expand Down
16 changes: 8 additions & 8 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,14 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

// @Tags region
// @Summary List regions belongs to the given keyspace ID.
// @Param keyspace_id query string true "Keyspace ID"
// @Param limit query integer false "Limit count" default(16)
// @Produce json
// @Success 200 {object} RegionsInfo
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/keyspace/id/{id} [get]
// @Tags region
// @Summary List regions belongs to the given keyspace ID.
// @Param keyspace_id query string true "Keyspace ID"
// @Param limit query integer false "Limit count" default(16)
// @Produce json
// @Success 200 {object} RegionsInfo
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/keyspace/id/{id} [get]
func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
vars := mux.Vars(r)
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(apiRouter, "/hotspot/regions/read", hotStatusHandler.GetHotReadRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/hotspot/regions/history", hotStatusHandler.GetHistoryHotRegions, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/hotspot/stores", hotStatusHandler.GetHotStores, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/hotspot/buckets", hotStatusHandler.GetHotBuckets, setMethods(http.MethodGet), setAuditBackend(prometheus))

regionHandler := newRegionHandler(svr, rd)
registerFunc(clusterRouter, "/region/id/{id}", regionHandler.GetRegionByID, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down
94 changes: 47 additions & 47 deletions server/apiv2/handlers/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ type CreateKeyspaceParams struct {

// CreateKeyspace creates keyspace according to given input.
//
// @Tags keyspaces
// @Summary Create new keyspace.
// @Param body body CreateKeyspaceParams true "Create keyspace parameters"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces [post]
// @Tags keyspaces
// @Summary Create new keyspace.
// @Param body body CreateKeyspaceParams true "Create keyspace parameters"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces [post]
func CreateKeyspace(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceManager()
Expand Down Expand Up @@ -90,13 +90,13 @@ func CreateKeyspace(c *gin.Context) {

// LoadKeyspace returns target keyspace.
//
// @Tags keyspaces
// @Summary Get keyspace info.
// @Param name path string true "Keyspace Name"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces/{name} [get]
// @Tags keyspaces
// @Summary Get keyspace info.
// @Param name path string true "Keyspace Name"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces/{name} [get]
func LoadKeyspace(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceManager()
Expand All @@ -115,13 +115,13 @@ func LoadKeyspace(c *gin.Context) {

// LoadKeyspaceByID returns target keyspace.
//
// @Tags keyspaces
// @Summary Get keyspace info.
// @Param id path string true "Keyspace id"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces/id/{id} [get]
// @Tags keyspaces
// @Summary Get keyspace info.
// @Param id path string true "Keyspace id"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces/id/{id} [get]
func LoadKeyspaceByID(c *gin.Context) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil || id == 0 {
Expand Down Expand Up @@ -190,15 +190,15 @@ type LoadAllKeyspacesResponse struct {

// LoadAllKeyspaces loads range of keyspaces.
//
// @Tags keyspaces
// @Summary list keyspaces.
// @Param page_token query string false "page token"
// @Param limit query string false "maximum number of results to return"
// @Produce json
// @Success 200 {object} LoadAllKeyspacesResponse
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces [get]
// @Tags keyspaces
// @Summary list keyspaces.
// @Param page_token query string false "page token"
// @Param limit query string false "maximum number of results to return"
// @Produce json
// @Success 200 {object} LoadAllKeyspacesResponse
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /keyspaces [get]
func LoadAllKeyspaces(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceManager()
Expand Down Expand Up @@ -254,14 +254,14 @@ type UpdateConfigParams struct {
// This api uses PATCH semantic and supports JSON Merge Patch.
// format and processing rules.
//
// @Tags keyspaces
// @Summary Update keyspace config.
// @Param name path string true "Keyspace Name"
// @Param body body UpdateConfigParams true "Update keyspace parameters"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Tags keyspaces
// @Summary Update keyspace config.
// @Param name path string true "Keyspace Name"
// @Param body body UpdateConfigParams true "Update keyspace parameters"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
//
// Router /keyspaces/{name}/config [patch]
func UpdateKeyspaceConfig(c *gin.Context) {
Expand Down Expand Up @@ -315,14 +315,14 @@ type UpdateStateParam struct {

// UpdateKeyspaceState update the target keyspace's state.
//
// @Tags keyspaces
// @Summary Update keyspace state.
// @Param name path string true "Keyspace Name"
// @Param body body UpdateStateParam true "New state for the keyspace"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Tags keyspaces
// @Summary Update keyspace state.
// @Param name path string true "Keyspace Name"
// @Param body body UpdateStateParam true "New state for the keyspace"
// @Produce json
// @Success 200 {object} KeyspaceMeta
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
//
// Router /keyspaces/{name}/state [put]
func UpdateKeyspaceState(c *gin.Context) {
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2255,8 +2255,8 @@ func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
}

// BucketsStats returns hot region's buckets stats.
func (c *RaftCluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buckets.BucketStat {
task := buckets.NewCollectBucketStatsTask(degree, regions...)
func (c *RaftCluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat {
task := buckets.NewCollectBucketStatsTask(degree, regionIDs...)
if !c.hotBuckets.CheckAsync(task) {
return nil
}
Expand Down
11 changes: 11 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -186,6 +187,16 @@ func (h *Handler) GetHotWriteRegions() *statistics.StoreHotPeersInfos {
return c.GetHotWriteRegions()
}

// GetHotBuckets returns all hot buckets stats.
func (h *Handler) GetHotBuckets(regionIDs ...uint64) map[uint64][]*buckets.BucketStat {
c, err := h.GetRaftCluster()
if err != nil {
return nil
}
degree := c.GetOpts().GetHotRegionCacheHitsThreshold()
return c.BucketsStats(degree, regionIDs...)
}

// GetHotReadRegions gets all hot read regions stats.
func (h *Handler) GetHotReadRegions() *statistics.StoreHotPeersInfos {
c, err := h.GetRaftCluster()
Expand Down
7 changes: 7 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@ func (c *TestCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
return cluster.HandleRegionHeartbeat(region)
}

// HandleReportBuckets processes BucketInfo reports from the client.
func (c *TestCluster) HandleReportBuckets(b *metapb.Buckets) error {
leader := c.GetLeader()
cluster := c.servers[leader].GetRaftCluster()
return cluster.HandleReportBuckets(b)
}

// Join is used to add a new TestServer into the cluster.
func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServer, error) {
conf, err := c.config.Join().Generate(opts...)
Expand Down
15 changes: 15 additions & 0 deletions tests/pdctl/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,18 @@ func MustPutRegion(re *require.Assertions, cluster *tests.TestCluster, regionID,
re.NoError(err)
return r
}

// MustReportBuckets is used for test purpose.
func MustReportBuckets(re *require.Assertions, cluster *tests.TestCluster, regionID uint64, start, end []byte, stats *metapb.BucketStats) *metapb.Buckets {
buckets := &metapb.Buckets{
RegionId: regionID,
Version: 1,
Keys: [][]byte{start, end},
Stats: stats,
// report buckets interval is 10s
PeriodInMs: 10000,
}
err := cluster.HandleReportBuckets(buckets)
re.NoError(err)
return buckets
}
Loading

0 comments on commit 47d408b

Please sign in to comment.