Skip to content

Commit

Permalink
schedule: operator limit for stores (#1474)
Browse files Browse the repository at this point in the history
* add store limit for scheduling

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored and nolouch committed May 22, 2019
1 parent 7a4685b commit 0fe66a0
Show file tree
Hide file tree
Showing 37 changed files with 613 additions and 91 deletions.
6 changes: 3 additions & 3 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ max-snapshot-count = 3
max-pending-peer-count = 16
max-store-down-time = "30m"
leader-schedule-limit = 4
region-schedule-limit = 4
replica-schedule-limit = 8
region-schedule-limit = 1024
replica-schedule-limit = 1024
merge-schedule-limit = 8
tolerant-size-ratio = 5.0
tolerant-size-ratio = 0

# customized schedulers, the format is as below
# if empty, it will use balance-leader, balance-region, hot-region as default
Expand Down
15 changes: 10 additions & 5 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/schedulers", schedulerHandler.Post).Methods("POST")
router.HandleFunc("/api/v1/schedulers/{name}", schedulerHandler.Delete).Methods("DELETE")

router.Handle("/api/v1/cluster", newClusterHandler(svr, rd)).Methods("GET")
router.HandleFunc("/api/v1/cluster/status", newClusterHandler(svr, rd).GetClusterStatus).Methods("GET")
clusterHandler := newClusterHandler(svr, rd)
router.Handle("/api/v1/cluster", clusterHandler).Methods("GET")
router.HandleFunc("/api/v1/cluster/status", clusterHandler.GetClusterStatus).Methods("GET")

confHandler := newConfHandler(svr, rd)
router.HandleFunc("/api/v1/config", confHandler.Get).Methods("GET")
Expand All @@ -61,14 +62,18 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/config/cluster-version", confHandler.GetClusterVersion).Methods("GET")
router.HandleFunc("/api/v1/config/cluster-version", confHandler.SetClusterVersion).Methods("POST")

storeHandler := newStoreHandler(svr, rd)
storeHandler := newStoreHandler(handler, rd)
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/store/{id}", storeHandler.Delete).Methods("DELETE")
router.HandleFunc("/api/v1/store/{id}/state", storeHandler.SetState).Methods("POST")
router.HandleFunc("/api/v1/store/{id}/label", storeHandler.SetLabels).Methods("POST")
router.HandleFunc("/api/v1/store/{id}/weight", storeHandler.SetWeight).Methods("POST")
router.Handle("/api/v1/stores", newStoresHandler(svr, rd)).Methods("GET")
router.HandleFunc("/api/v1/stores/remove-tombstone", newStoresHandler(svr, rd).RemoveTombStone).Methods("DELETE")
router.HandleFunc("/api/v1/store/{id}/limit", storeHandler.SetLimit).Methods("POST")
storesHandler := newStoresHandler(handler, rd)
router.Handle("/api/v1/stores", storesHandler).Methods("GET")
router.HandleFunc("/api/v1/stores/remove-tombstone", storesHandler.RemoveTombStone).Methods("DELETE")
router.HandleFunc("/api/v1/stores/limit", storesHandler.GetAllLimit).Methods("GET")
router.HandleFunc("/api/v1/stores/limit", storesHandler.SetAllLimit).Methods("POST")

labelsHandler := newLabelsHandler(svr, rd)
router.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET")
Expand Down
111 changes: 92 additions & 19 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,19 @@ type StoresInfo struct {
}

type storeHandler struct {
svr *server.Server
rd *render.Render
*server.Handler
rd *render.Render
}

func newStoreHandler(svr *server.Server, rd *render.Render) *storeHandler {
func newStoreHandler(handler *server.Handler, rd *render.Render) *storeHandler {
return &storeHandler{
svr: svr,
rd: rd,
Handler: handler,
rd: rd,
}
}

func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand All @@ -152,12 +152,12 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
storeInfo := newStoreInfo(h.GetScheduleConfig(), store)
h.rd.JSON(w, http.StatusOK, storeInfo)
}

func (h *storeHandler) Delete(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
errorResp(h.rd, w, errcode.NewInternalErr(server.ErrNotBootstrapped))
return
Expand Down Expand Up @@ -187,7 +187,7 @@ func (h *storeHandler) Delete(w http.ResponseWriter, r *http.Request) {
}

func (h *storeHandler) SetState(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand Down Expand Up @@ -217,7 +217,7 @@ func (h *storeHandler) SetState(w http.ResponseWriter, r *http.Request) {
}

func (h *storeHandler) SetLabels(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand Down Expand Up @@ -257,7 +257,7 @@ func (h *storeHandler) SetLabels(w http.ResponseWriter, r *http.Request) {
}

func (h *storeHandler) SetWeight(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand Down Expand Up @@ -304,20 +304,52 @@ func (h *storeHandler) SetWeight(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storeHandler) SetLimit(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
storeID, errParse := apiutil.ParseUint64VarsField(vars, "id")
if errParse != nil {
errorResp(h.rd, w, errcode.NewInvalidInputErr(errParse))
return
}

var input map[string]interface{}
if err := readJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

rateVal, ok := input["rate"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}

if err := h.SetStoreLimit(storeID, rate); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

type storesHandler struct {
svr *server.Server
rd *render.Render
*server.Handler
rd *render.Render
}

func newStoresHandler(svr *server.Server, rd *render.Render) *storesHandler {
func newStoresHandler(handler *server.Handler, rd *render.Render) *storesHandler {
return &storesHandler{
svr: svr,
rd: rd,
Handler: handler,
rd: rd,
}
}

func (h *storesHandler) RemoveTombStone(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
errorResp(h.rd, w, errcode.NewInternalErr(server.ErrNotBootstrapped))
return
Expand All @@ -332,8 +364,49 @@ func (h *storesHandler) RemoveTombStone(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
var input map[string]interface{}
if err := readJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

rateVal, ok := input["rate"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}

if err := h.SetAllStoresLimit(rate); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storesHandler) GetAllLimit(w http.ResponseWriter, r *http.Request) {
limit, err := h.GetAllStoresLimit()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
ret := make(map[uint64]interface{})
for s, l := range limit {
ret[s] = struct {
Rate float64 `json:"rate"`
}{Rate: l}
}

h.rd.JSON(w, http.StatusOK, ret)
}

func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
cluster := h.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
Expand All @@ -358,7 +431,7 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
storeInfo := newStoreInfo(h.GetScheduleConfig(), store)
StoresInfo.Stores = append(StoresInfo.Stores, storeInfo)
}
StoresInfo.Count = len(StoresInfo.Stores)
Expand Down
2 changes: 2 additions & 0 deletions server/api/trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"fmt"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -48,6 +49,7 @@ func (s *testTrendSuite) TestTrend(c *C) {
// Create 3 operators that transfers leader, moves follower, moves leader.
c.Assert(svr.GetHandler().AddTransferLeaderOperator(4, 2), IsNil)
c.Assert(svr.GetHandler().AddTransferPeerOperator(5, 2, 3), IsNil)
time.Sleep(2 * time.Second)
c.Assert(svr.GetHandler().AddTransferPeerOperator(6, 1, 3), IsNil)

// Complete the operators.
Expand Down
2 changes: 1 addition & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (c *RaftCluster) checkStores() {

offlineStore := store.GetMeta()
// If the store is empty, it can be buried.
if cluster.getStoreRegionCount(offlineStore.GetId()) == 0 {
if cluster.GetStoreRegionCount(offlineStore.GetId()) == 0 {
if err := c.BuryStore(offlineStore.GetId(), false); err != nil {
log.Error("bury store failed",
zap.Stringer("store", offlineStore),
Expand Down
20 changes: 19 additions & 1 deletion server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,20 @@ func (c *clusterInfo) UnblockStore(storeID uint64) {
c.core.UnblockStore(storeID)
}

// SetStoreOverload stops balancer from selecting the store.
func (c *clusterInfo) SetStoreOverload(storeID uint64) {
c.Lock()
defer c.Unlock()
c.core.SetStoreOverload(storeID)
}

// ResetStoreOverload allows balancer to select the store.
func (c *clusterInfo) ResetStoreOverload(storeID uint64) {
c.Lock()
defer c.Unlock()
c.core.ResetStoreOverload(storeID)
}

// GetStores returns all stores in the cluster.
func (c *clusterInfo) GetStores() []*core.StoreInfo {
c.RLock()
Expand Down Expand Up @@ -381,7 +395,7 @@ func (c *clusterInfo) dropRegion(id uint64) {
}
}

func (c *clusterInfo) getStoreRegionCount(storeID uint64) int {
func (c *clusterInfo) GetStoreRegionCount(storeID uint64) int {
c.RLock()
defer c.RUnlock()
return c.core.Regions.GetStoreRegionCount(storeID)
Expand Down Expand Up @@ -687,6 +701,10 @@ func (c *clusterInfo) GetHotRegionScheduleLimit() uint64 {
return c.opt.GetHotRegionScheduleLimit(namespace.DefaultNamespace)
}

func (c *clusterInfo) GetStoreBalanceRate() float64 {
return c.opt.GetStoreBalanceRate()
}

func (c *clusterInfo) GetTolerantSizeRatio() float64 {
return c.opt.GetTolerantSizeRatio()
}
Expand Down
2 changes: 1 addition & 1 deletion server/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
}
}
for id, count := range regionCounts {
c.Assert(cluster.getStoreRegionCount(id), Equals, count)
c.Assert(cluster.GetStoreRegionCount(id), Equals, count)
}

for _, region := range cluster.getRegions() {
Expand Down
13 changes: 9 additions & 4 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ type ScheduleConfig struct {
// If the number of times a region hits the hot cache is greater than this
// threshold, it is considered a hot region.
HotRegionCacheHitsThreshold uint64 `toml:"hot-region-cache-hits-threshold,omitempty" json:"hot-region-cache-hits-threshold"`
// StoreBalanceRate is the maximum of balance rate for each store.
StoreBalanceRate float64 `toml:"store-balance-rate,omitempty" json:"store-balance-rate"`
// TolerantSizeRatio is the ratio of buffer size for balance scheduler.
TolerantSizeRatio float64 `toml:"tolerant-size-ratio,omitempty" json:"tolerant-size-ratio"`
//
Expand Down Expand Up @@ -538,6 +540,7 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
MergeScheduleLimit: c.MergeScheduleLimit,
HotRegionScheduleLimit: c.HotRegionScheduleLimit,
HotRegionCacheHitsThreshold: c.HotRegionCacheHitsThreshold,
StoreBalanceRate: c.StoreBalanceRate,
TolerantSizeRatio: c.TolerantSizeRatio,
LowSpaceRatio: c.LowSpaceRatio,
HighSpaceRatio: c.HighSpaceRatio,
Expand All @@ -561,12 +564,13 @@ const (
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultLeaderScheduleLimit = 8
defaultRegionScheduleLimit = 1024
defaultReplicaScheduleLimit = 1024
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultTolerantSizeRatio = 5
defaultStoreBalanceRate = 1
defaultTolerantSizeRatio = 0
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
// defaultHotRegionCacheHitsThreshold is the low hit number threshold of the
Expand Down Expand Up @@ -611,6 +615,7 @@ func (c *ScheduleConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("tolerant-size-ratio") {
adjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio)
}
adjustFloat64(&c.StoreBalanceRate, defaultStoreBalanceRate)
adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio)
adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio)
adjustSchedulers(&c.Schedulers, defaultSchedulers)
Expand Down
1 change: 1 addition & 0 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind sche

func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption, error) {
cfg := NewConfig()
cfg.Schedule.TolerantSizeRatio = 5
if err := cfg.Adjust(nil); err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit 0fe66a0

Please sign in to comment.