Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: improve flow update #7791

Merged
merged 6 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (
ClusterStatus = "/pd/api/v1/cluster/status"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
operators = "/pd/api/v1/operators"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
)
Expand Down
9 changes: 9 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Client interface {
GetPDVersion(context.Context) (string, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]string, error)
DeleteOperators(context.Context) error

/* Client-related methods */
// WithCallerID sets and returns a new client with the given caller ID.
Expand Down Expand Up @@ -879,3 +880,11 @@ func (c *client) GetPDVersion(ctx context.Context) (string, error) {
WithResp(&ver))
return ver.Version, err
}

// DeleteOperators deletes the running operators.
func (c *client) DeleteOperators(ctx context.Context) error {
return c.request(ctx, newRequestInfo().
WithName(deleteOperators).
WithURI(operators).
WithMethod(http.MethodDelete))
}
1 change: 1 addition & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const (
resetBaseAllocIDName = "ResetBaseAllocID"
setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark"
deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark"
deleteOperators = "DeleteOperators"
)

type requestInfo struct {
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func changeLogLevel(c *gin.Context) {
// @Success 200 {string} string "Success"
// @Failure 400 {string} error
// @Failure 500 {string} error
// @Router /config/group [POST]
// @Router /config/group [post]
func (s *Service) postResourceGroup(c *gin.Context) {
var group rmpb.ResourceGroup
if err := c.ShouldBindJSON(&group); err != nil {
Expand Down Expand Up @@ -181,7 +181,7 @@ func (s *Service) putResourceGroup(c *gin.Context) {
// @Failure 404 {string} error
// @Param name path string true "groupName"
// @Param with_stats query bool false "whether to return statistics data."
// @Router /config/group/{name} [GET]
// @Router /config/group/{name} [get]
func (s *Service) getResourceGroup(c *gin.Context) {
withStats := strings.EqualFold(c.Query("with_stats"), "true")
group := s.manager.GetResourceGroup(c.Param("name"), withStats)
Expand All @@ -198,7 +198,7 @@ func (s *Service) getResourceGroup(c *gin.Context) {
// @Success 200 {string} json format of []rmserver.ResourceGroup
// @Failure 404 {string} error
// @Param with_stats query bool false "whether to return statistics data."
// @Router /config/groups [GET]
// @Router /config/groups [get]
func (s *Service) getResourceGroupList(c *gin.Context) {
withStats := strings.EqualFold(c.Query("with_stats"), "true")
groups := s.manager.GetResourceGroupList(withStats)
Expand All @@ -212,7 +212,7 @@ func (s *Service) getResourceGroupList(c *gin.Context) {
// @Param name path string true "Name of the resource group to be deleted"
// @Success 200 {string} string "Success!"
// @Failure 404 {string} error
// @Router /config/group/{name} [DELETE]
// @Router /config/group/{name} [delete]
func (s *Service) deleteResourceGroup(c *gin.Context) {
if err := s.manager.DeleteResourceGroup(c.Param("name")); err != nil {
c.String(http.StatusNotFound, err.Error())
Expand All @@ -226,7 +226,7 @@ func (s *Service) deleteResourceGroup(c *gin.Context) {
// @Summary Get the resource controller config.
// @Success 200 {string} json format of rmserver.ControllerConfig
// @Failure 400 {string} error
// @Router /config/controller [GET]
// @Router /config/controller [get]
func (s *Service) getControllerConfig(c *gin.Context) {
config := s.manager.GetControllerConfig()
c.IndentedJSON(http.StatusOK, config)
Expand All @@ -239,7 +239,7 @@ func (s *Service) getControllerConfig(c *gin.Context) {
// @Param config body object true "json params, rmserver.ControllerConfig"
// @Success 200 {string} string "Success!"
// @Failure 400 {string} error
// @Router /config/controller [POST]
// @Router /config/controller [post]
func (s *Service) setControllerConfig(c *gin.Context) {
conf := make(map[string]any)
if err := c.ShouldBindJSON(&conf); err != nil {
Expand Down
21 changes: 19 additions & 2 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
router.GET("", getOperators)
router.POST("", createOperator)
router.DELETE("", deleteOperators)
router.GET("/:id", getOperatorByRegion)
router.DELETE("/:id", deleteOperatorByRegion)
router.GET("/records", getOperatorRecords)
Expand Down Expand Up @@ -307,7 +308,7 @@ func deleteRegionCacheByID(c *gin.Context) {
// @Success 200 {object} operator.OpWithStatus
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/{id} [GET]
// @Router /operators/{id} [get]
func getOperatorByRegion(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
id := c.Param("id")
Expand All @@ -334,7 +335,7 @@ func getOperatorByRegion(c *gin.Context) {
// @Produce json
// @Success 200 {array} operator.Operator
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [GET]
// @Router /operators [get]
func getOperators(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
var (
Expand Down Expand Up @@ -365,6 +366,22 @@ func getOperators(c *gin.Context) {
}
}

// @Tags operators
// @Summary Delete operators.
// @Produce json
// @Success 200 {string} string "All pending operator are canceled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [delete]
func deleteOperators(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
if err := handler.RemoveOperators(); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

c.String(http.StatusOK, "All pending operator are canceled.")
}

// @Tags operator
// @Summary Cancel a Region's pending operator.
// @Param region_id path int true "A Region's Id"
Expand Down
11 changes: 11 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ func (h *Handler) RemoveOperator(regionID uint64) error {
return nil
}

// RemoveOperators removes the all operators.
func (h *Handler) RemoveOperators() error {
c, err := h.GetOperatorController()
if err != nil {
return err
}

c.RemoveOperators(operator.AdminStop)
return nil
}

// GetOperators returns the running operators.
func (h *Handler) GetOperators() ([]*operator.Operator, error) {
c, err := h.GetOperatorController()
Expand Down
35 changes: 35 additions & 0 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,41 @@ func (oc *Controller) ack(op *Operator) {
}
}

// RemoveOperators removes all operators from the running operators.
func (oc *Controller) RemoveOperators(reasons ...CancelReasonType) {
oc.Lock()
removed := oc.removeOperatorsLocked()
oc.Unlock()
var cancelReason CancelReasonType
if len(reasons) > 0 {
cancelReason = reasons[0]
}
for _, op := range removed {
if op.Cancel(cancelReason) {
log.Info("operator removed",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op))
}
oc.buryOperator(op)
}
}

func (oc *Controller) removeOperatorsLocked() []*Operator {
var removed []*Operator
for regionID, op := range oc.operators {
delete(oc.operators, regionID)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
oc.ack(op)
if op.Kind()&OpMerge != 0 {
oc.removeRelatedMergeOperator(op)
}
removed = append(removed, op)
}
oc.updateCounts(oc.operators)
return removed
}

// RemoveOperator removes an operator from the running operators.
func (oc *Controller) RemoveOperator(op *Operator, reasons ...CancelReasonType) bool {
oc.Lock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func StringContain(re *require.Assertions, sub string) func([]byte, int, http.He
}
}

// StringNotContain is used to check whether response context doesn't contain given string.
func StringNotContain(re *require.Assertions, sub string) func([]byte, int, http.Header) {
return func(resp []byte, _ int, _ http.Header) {
re.NotContains(string(resp), sub, "resp: "+string(resp))
}
}

// StringEqual is used to check whether response context equal given string.
func StringEqual(re *require.Assertions, str string) func([]byte, int, http.Header) {
return func(resp []byte, _ int, _ http.Header) {
Expand Down
15 changes: 15 additions & 0 deletions server/api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ func (h *operatorHandler) GetOperators(w http.ResponseWriter, r *http.Request) {
}
}

// @Tags operator
// @Summary Cancel all pending operators.
// @Produce json
// @Success 200 {string} string "All pending operators are canceled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [delete]
func (h *operatorHandler) DeleteOperators(w http.ResponseWriter, r *http.Request) {
if err := h.RemoveOperators(); err != nil {
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.r.JSON(w, http.StatusOK, "All pending operators are canceled.")
}

// FIXME: details of input json body params
// @Tags operator
// @Summary Create an operator.
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
operatorHandler := newOperatorHandler(handler, rd)
registerFunc(apiRouter, "/operators", operatorHandler.GetOperators, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/operators", operatorHandler.CreateOperator, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/operators", operatorHandler.DeleteOperators, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/operators/records", operatorHandler.GetOperatorRecords, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/operators/{region_id}", operatorHandler.GetOperatorsByRegion, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/operators/{region_id}", operatorHandler.DeleteOperatorByRegion, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
Expand Down
4 changes: 2 additions & 2 deletions server/api/service_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareC
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "config item not found"
// @Router /service-middleware/config/rate-limit [POST]
// @Router /service-middleware/config/rate-limit [post]
func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]any
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
Expand Down Expand Up @@ -236,7 +236,7 @@ func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r *
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "config item not found"
// @Router /service-middleware/config/grpc-rate-limit [POST]
// @Router /service-middleware/config/grpc-rate-limit [post]
func (h *serviceMiddlewareHandler) SetGRPCRateLimitConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]any
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions server/api/unsafe_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newUnsafeOperationHandler(svr *server.Server, rd *render.Render) *unsafeOpe
// Success 200 {string} string "Request has been accepted."
// Failure 400 {string} string "The input is invalid."
// Failure 500 {string} string "PD server failed to proceed the request."
// @Router /admin/unsafe/remove-failed-stores [POST]
// @Router /admin/unsafe/remove-failed-stores [post]
func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
var input map[string]any
Expand Down Expand Up @@ -81,7 +81,7 @@ func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *ht
// @Summary Show the current status of failed stores removal.
// @Produce json
// Success 200 {object} []StageOutput
// @Router /admin/unsafe/remove-failed-stores/show [GET]
// @Router /admin/unsafe/remove-failed-stores/show [get]
func (h *unsafeOperationHandler) GetFailedStoresRemovalStatus(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
h.rd.JSON(w, http.StatusOK, rc.GetUnsafeRecoveryController().Show())
Expand Down
53 changes: 53 additions & 0 deletions tests/server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,3 +624,56 @@ func (suite *operatorTestSuite) pauseRuleChecker(re *require.Assertions, cluster
re.NoError(err)
re.True(resp["paused"].(bool))
}

func (suite *operatorTestSuite) TestRemoveOperators() {
suite.env.RunTestInTwoModes(suite.checkRemoveOperators)
}

func (suite *operatorTestSuite) checkRemoveOperators(cluster *tests.TestCluster) {
re := suite.Require()
stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 2,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 4,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
},
}

for _, store := range stores {
tests.MustPutStore(re, cluster, store)
}

suite.pauseRuleChecker(re, cluster)
r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1))
tests.MustPutRegionInfo(re, cluster, r1)
r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3))
tests.MustPutRegionInfo(re, cluster, r2)
r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2))
tests.MustPutRegionInfo(re, cluster, r3)

urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr())
err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`), tu.StatusOK(re))
re.NoError(err)
err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"add-peer", "region_id": 30, "store_id": 4}`), tu.StatusOK(re))
re.NoError(err)
url := fmt.Sprintf("%s/operators", urlPrefix)
err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re), tu.StringContain(re, "merge: region 10 to 20"), tu.StringContain(re, "add peer: store [4]"))
re.NoError(err)
err = tu.CheckDelete(testDialClient, url, tu.StatusOK(re))
re.NoError(err)
err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re), tu.StringNotContain(re, "merge: region 10 to 20"), tu.StringNotContain(re, "add peer: store [4]"))
re.NoError(err)
}
1 change: 0 additions & 1 deletion tools/pd-heartbeat-bench/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ round = 0
store-count = 100
region-count = 2000000

key-length = 56
replica = 3

leader-update-ratio = 0.06
Expand Down
Loading
Loading