Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: support hotspot http interface in scheduling server #7184

Merged
merged 10 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 134 additions & 11 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package apis

import (
"fmt"
"net/http"
"strconv"
"sync"
Expand All @@ -26,11 +27,12 @@ import (
"github.com/joho/godotenv"
"github.com/pingcap/log"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -68,15 +70,11 @@ type Service struct {
}

type server struct {
server *scheserver.Server
*scheserver.Server
}

func (s *server) GetCoordinator() *schedule.Coordinator {
return s.server.GetCoordinator()
}

func (s *server) GetCluster() sche.SharedCluster {
return s.server.GetCluster()
func (s *server) GetCluster() sche.SchedulerCluster {
return s.Server.GetCluster()
}

func createIndentRender() *render.Render {
Expand All @@ -98,11 +96,11 @@ func NewService(srv *scheserver.Service) *Service {
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set(multiservicesapi.ServiceContextKey, srv.Server)
c.Set(handlerKey, handler.NewHandler(&server{server: srv.Server}))
c.Set(handlerKey, handler.NewHandler(&server{srv.Server}))
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
apiHandlerEngine.GET("metrics", utils.PromHandler())
apiHandlerEngine.GET("metrics", mcsutils.PromHandler())
pprof.Register(apiHandlerEngine)
root := apiHandlerEngine.Group(APIPathPrefix)
s := &Service{
Expand All @@ -115,6 +113,7 @@ func NewService(srv *scheserver.Service) *Service {
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
s.RegisterHotspotRouter()
return s
}

Expand All @@ -141,6 +140,16 @@ func (s *Service) RegisterCheckersRouter() {
router.POST("/:name", pauseOrResumeChecker)
}

// RegisterHotspotRouter registers the router of the hotspot handler.
func (s *Service) RegisterHotspotRouter() {
router := s.root.Group("hotspot")
router.GET("/regions/write", getHotWriteRegions)
router.GET("/regions/read", getHotReadRegions)
router.GET("/regions/history", getHistoryHotRegions)
router.GET("/stores", getHotStores)
router.GET("/buckets", getHotBuckets)
}

// RegisterOperatorsRouter registers the router of the operators handler.
func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
Expand Down Expand Up @@ -425,3 +434,117 @@ func pauseOrResumeScheduler(c *gin.Context) {
}
c.String(http.StatusOK, "Pause or resume the scheduler successfully.")
}

// @Tags hotspot
// @Summary List the hot write regions.
// @Produce json
// @Success 200 {object} statistics.StoreHotPeersInfos
// @Failure 400 {string} string "The request is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/regions/write [get]
func getHotWriteRegions(c *gin.Context) {
getHotRegions(utils.Write, c)
}

// @Tags hotspot
// @Summary List the hot read regions.
// @Produce json
// @Success 200 {object} statistics.StoreHotPeersInfos
// @Failure 400 {string} string "The request is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/regions/read [get]
func getHotReadRegions(c *gin.Context) {
getHotRegions(utils.Read, c)
}

func getHotRegions(typ utils.RWType, c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

storeIDs := c.QueryArray("store_id")
if len(storeIDs) < 1 {
hotRegions, err := handler.GetHotRegions(typ)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, hotRegions)
return
}

var ids []uint64
for _, storeID := range storeIDs {
id, err := strconv.ParseUint(storeID, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID))
return
}
_, err = handler.GetStore(id)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
ids = append(ids, id)
}

hotRegions, err := handler.GetHotRegions(typ, ids...)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, hotRegions)
}

// @Tags hotspot
// @Summary List the hot stores.
// @Produce json
// @Success 200 {object} handler.HotStoreStats
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/stores [get]
func getHotStores(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
stores, err := handler.GetHotStores()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, stores)
}

// @Tags hotspot
// @Summary List the hot buckets.
// @Produce json
// @Success 200 {object} handler.HotBucketsResponse
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/buckets [get]
func getHotBuckets(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

regionIDs := c.QueryArray("region_id")
ids := make([]uint64, len(regionIDs))
for i, regionID := range regionIDs {
if id, err := strconv.ParseUint(regionID, 10, 64); err == nil {
ids[i] = id
}
}
ret, err := handler.GetHotBuckets(ids...)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, ret)
}

// @Tags hotspot
// @Summary List the history hot regions.
// @Accept json
// @Produce json
// @Success 200 {object} storage.HistoryHotRegions
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/regions/history [get]
func getHistoryHotRegions(c *gin.Context) {
// TODO: support history hotspot in scheduling server with stateless in the future.
// Ref: https://github.com/tikv/pd/pull/7183
var res storage.HistoryHotRegions
c.IndentedJSON(http.StatusOK, res)
}
6 changes: 6 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (c *Cluster) GetHotStat() *statistics.HotStat {
return c.hotStat
}

// GetStoresStats returns stores' statistics from cluster.
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat
func (c *Cluster) GetStoresStats() *statistics.StoresStats {
return c.hotStat.StoresStats
}

// GetRegionStats gets region statistics.
func (c *Cluster) GetRegionStats() *statistics.RegionStatistics {
return c.regionStats
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,11 @@ func (s *Server) GetBasicCluster() *core.BasicCluster {

// GetCoordinator returns the coordinator.
func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.GetCluster().GetCoordinator()
c := s.GetCluster()
if c == nil {
return nil
}
return c.GetCoordinator()
}

// ServerLoopWgDone decreases the server loop wait group.
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ type SharedConfigProvider interface {
IsWitnessAllowed() bool
IsPlacementRulesCacheEnabled() bool
SetHaltScheduling(bool, string)
GetHotRegionCacheHitsThreshold() int

// for test purpose
SetPlacementRuleEnabled(bool)
SetPlacementRulesCacheEnabled(bool)
SetEnableWitness(bool)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,25 @@ func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHot
return infos
}

// GetHotRegions gets hot regions' statistics by RWType and storeIDs.
// If storeIDs is empty, it returns all hot regions' statistics by RWType.
func (c *Coordinator) GetHotRegions(typ utils.RWType, storeIDs ...uint64) *statistics.StoreHotPeersInfos {
hotRegions := c.GetHotRegionsByType(typ)
if len(storeIDs) > 0 && hotRegions != nil {
asLeader := statistics.StoreHotPeersStat{}
asPeer := statistics.StoreHotPeersStat{}
for _, storeID := range storeIDs {
asLeader[storeID] = hotRegions.AsLeader[storeID]
asPeer[storeID] = hotRegions.AsPeer[storeID]
}
return &statistics.StoreHotPeersInfos{
AsLeader: asLeader,
AsPeer: asPeer,
}
}
return hotRegions
}

// GetWaitGroup returns the wait group. Only for test purpose.
func (c *Coordinator) GetWaitGroup() *sync.WaitGroup {
return &c.wg
Expand Down
Loading
Loading