From 10b691af0a532e79c2b91870fa6ee6cf6f83ee6d Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Mon, 24 Apr 2023 15:16:45 +0800 Subject: [PATCH] [cherry-pick 6327] server: add accelerate-schedule/batch api (#6348) * server: add accelerate-schedule/batch api Signed-off-by: Lloyd-Pottiger Co-authored-by: Ti Chi Robot * address comments Signed-off-by: Lloyd-Pottiger --------- Signed-off-by: Lloyd-Pottiger Co-authored-by: Ti Chi Robot --- server/api/region.go | 59 +++++++++++++++++++++++++++++++++++++++ server/api/region_test.go | 20 +++++++++++++ server/api/router.go | 1 + 3 files changed, 80 insertions(+) diff --git a/server/api/region.go b/server/api/region.go index 796c3acafa4..48e8176435f 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -22,6 +22,7 @@ import ( "net/url" "sort" "strconv" + "strings" "github.com/gorilla/mux" "github.com/pingcap/failpoint" @@ -836,6 +837,64 @@ func (h *regionsHandler) AccelerateRegionsScheduleInRange(w http.ResponseWriter, h.rd.Text(w, http.StatusOK, fmt.Sprintf("Accelerate regions scheduling in a given range [%s,%s)", rawStartKey, rawEndKey)) } +// @Tags region +// @Summary Accelerate regions scheduling in given ranges, only receive hex format for keys +// @Accept json +// @Param body body object true "json params" +// @Param limit query integer false "Limit count" default(256) +// @Produce json +// @Success 200 {string} string "Accelerate regions scheduling in given ranges [startKey1, endKey1), [startKey2, endKey2), ..." +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/accelerate-schedule/batch [post] +func (h *regionsHandler) AccelerateRegionsScheduleInRanges(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + var input []map[string]interface{} + if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { + return + } + limit := 256 + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { + var err error + limit, err = strconv.Atoi(limitStr) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + } + if limit > maxRegionLimit { + limit = maxRegionLimit + } + var msgBuilder strings.Builder + msgBuilder.Grow(128) + msgBuilder.WriteString("Accelerate regions scheduling in given ranges: ") + regionsIDSet := make(map[uint64]struct{}) + for _, rg := range input { + startKey, rawStartKey, err := apiutil.ParseKey("start_key", rg) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + endKey, rawEndKey, err := apiutil.ParseKey("end_key", rg) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + regions := rc.ScanRegions(startKey, endKey, limit) + for _, region := range regions { + regionsIDSet[region.GetID()] = struct{}{} + } + msgBuilder.WriteString(fmt.Sprintf("[%s,%s), ", rawStartKey, rawEndKey)) + } + if len(regionsIDSet) > 0 { + regionsIDList := make([]uint64, 0, len(regionsIDSet)) + for id := range regionsIDSet { + regionsIDList = append(regionsIDList, id) + } + rc.AddSuspectRegions(regionsIDList...) + } + h.rd.Text(w, http.StatusOK, msgBuilder.String()) +} + func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) { rc := getCluster(r) limit := defaultRegionLimit diff --git a/server/api/region_test.go b/server/api/region_test.go index 0796cd49c97..63d54b2b04b 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -351,6 +351,26 @@ func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRange() { suite.Len(idList, 2) } +func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRanges() { + re := suite.Require() + r1 := newTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) + r2 := newTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) + r3 := newTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) + r4 := newTestRegionInfo(560, 16, []byte("a4"), []byte("a5")) + r5 := newTestRegionInfo(561, 17, []byte("a5"), []byte("a6")) + mustRegionHeartbeat(re, suite.svr, r1) + mustRegionHeartbeat(re, suite.svr, r2) + mustRegionHeartbeat(re, suite.svr, r3) + mustRegionHeartbeat(re, suite.svr, r4) + mustRegionHeartbeat(re, suite.svr, r5) + body := fmt.Sprintf(`[{"start_key":"%s", "end_key": "%s"}, {"start_key":"%s", "end_key": "%s"}]`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")), hex.EncodeToString([]byte("a4")), hex.EncodeToString([]byte("a6"))) + + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule/batch", suite.urlPrefix), []byte(body), tu.StatusOK(re)) + suite.NoError(err) + idList := suite.svr.GetRaftCluster().GetSuspectRegions() + suite.Len(idList, 4) +} + func (suite *regionTestSuite) TestScatterRegions() { re := suite.Require() r1 := newTestRegionInfo(601, 13, []byte("b1"), []byte("b2")) diff --git a/server/api/router.go b/server/api/router.go index 2c750b12eb7..e6188b35e41 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -266,6 +266,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/regions/check/hist-keys", regionsHandler.GetKeysHistogram, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/sibling/{id}", regionsHandler.GetRegionSiblings, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/accelerate-schedule", regionsHandler.AccelerateRegionsScheduleInRange, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(clusterRouter, "/regions/accelerate-schedule/batch", regionsHandler.AccelerateRegionsScheduleInRanges, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/regions/scatter", regionsHandler.ScatterRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/regions/split", regionsHandler.SplitRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods(http.MethodGet), setAuditBackend(prometheus))