Skip to content

Commit

Permalink
Merge branch 'release-5.1' into cherry-pick-4067-to-release-5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
HunDunDM authored Sep 13, 2021
2 parents 192c8ac + ca80632 commit 3e8a5a7
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 35 deletions.
7 changes: 6 additions & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package api

import (
"encoding/json"
"io"
"net/http"
"strconv"
Expand Down Expand Up @@ -95,14 +96,18 @@ func (h *adminHandler) ResetTS(w http.ResponseWriter, r *http.Request) {
}

// Intentionally no swagger mark as it is supposed to be only used in
// server-to-server.
// server-to-server. For security reason, it only accepts JSON formatted data.
func (h *adminHandler) persistFile(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
h.rd.Text(w, http.StatusInternalServerError, "")
return
}
defer r.Body.Close()
if !json.Valid(data) {
h.rd.Text(w, http.StatusBadRequest, "body should be json format")
return
}
err = h.svr.PersistFile(mux.Vars(r)["file_name"], data)
if err != nil {
h.rd.Text(w, http.StatusInternalServerError, err.Error())
Expand Down
9 changes: 9 additions & 0 deletions server/api/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ func (s *testAdminSuite) TestDropRegion(c *C) {
c.Assert(region.GetRegionEpoch().Version, Equals, uint64(50))
}

func (s *testAdminSuite) TestPersistFile(c *C) {
data := []byte("#!/bin/sh\nrm -rf /")
err := postJSON(testDialClient, s.urlPrefix+"/admin/persist-file/fun.sh", data)
c.Assert(err, NotNil)
data = []byte(`{"foo":"bar"}`)
err = postJSON(testDialClient, s.urlPrefix+"/admin/persist-file/good.json", data)
c.Assert(err, IsNil)
}

var _ = Suite(&testTSOSuite{})

type testTSOSuite struct {
Expand Down
5 changes: 1 addition & 4 deletions server/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,10 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < r.opts.GetMaxStoreDownTime() {
continue
}
if stats.GetDownSeconds() < uint64(r.opts.GetMaxStoreDownTime().Seconds()) {
continue
}

return r.fixPeer(region, storeID, downStatus)
}
return nil
Expand Down
4 changes: 1 addition & 3 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,10 @@ func (c *RuleChecker) isDownPeer(region *core.RegionInfo, peer *metapb.Peer) boo
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return false
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < c.cluster.GetOpts().GetMaxStoreDownTime() {
continue
}
if stats.GetDownSeconds() < uint64(c.cluster.GetOpts().GetMaxStoreDownTime().Seconds()) {
continue
}
return true
}
return false
Expand Down
16 changes: 12 additions & 4 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type balanceLeaderSchedulerConfig struct {

type balanceLeaderScheduler struct {
*BaseScheduler
*retryQuota
conf *balanceLeaderSchedulerConfig
opController *schedule.OperatorController
filters []filter.Filter
Expand All @@ -83,6 +84,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *

s := &balanceLeaderScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceLeaderRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
conf: conf,
opController: opController,
counter: balanceLeaderCounter,
Expand Down Expand Up @@ -153,38 +155,44 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
})
sort.Slice(targets, func(i, j int) bool {
iOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[j].GetID())
return targets[i].LeaderScore(leaderSchedulePolicy, iOp) <
targets[j].LeaderScore(leaderSchedulePolicy, jOp)
})

for i := 0; i < len(sources) || i < len(targets); i++ {
if i < len(sources) {
plan.source, plan.target = sources[i], nil
retryLimit := l.retryQuota.GetLimit(plan.source)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("source-store", plan.SourceStoreID()))
l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc()
for j := 0; j < balanceLeaderRetryLimit; j++ {
for j := 0; j < retryLimit; j++ {
if ops := l.transferLeaderOut(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.source)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", plan.SourceMetricLabel()))
return ops
}
}
l.Attenuate(plan.source)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", plan.SourceStoreID()))
}
if i < len(targets) {
plan.source, plan.target = nil, targets[i]
retryLimit := l.retryQuota.GetLimit(plan.target)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("target-store", plan.TargetStoreID()))
l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc()

for j := 0; j < balanceLeaderRetryLimit; j++ {
for j := 0; j < retryLimit; j++ {
if ops := l.transferLeaderIn(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.target)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", plan.TargetMetricLabel()))
return ops
}
}
l.Attenuate(plan.target)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", plan.TargetStoreID()))
}
}
l.retryQuota.GC(append(sources, targets...))
return nil
}

Expand Down
8 changes: 7 additions & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type balanceRegionSchedulerConfig struct {

type balanceRegionScheduler struct {
*BaseScheduler
*retryQuota
conf *balanceRegionSchedulerConfig
opController *schedule.OperatorController
filters []filter.Filter
Expand All @@ -82,6 +83,7 @@ func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *
base := NewBaseScheduler(opController)
scheduler := &balanceRegionScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceRegionRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
conf: conf,
opController: opController,
counter: balanceRegionCounter,
Expand Down Expand Up @@ -149,7 +151,8 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
stores[j].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), jOp, -1)
})
for _, plan.source = range stores {
for i := 0; i < balanceRegionRetryLimit; i++ {
retryLimit := s.retryQuota.GetLimit(plan.source)
for i := 0; i < retryLimit; i++ {
// Priority pick the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
Expand Down Expand Up @@ -185,11 +188,14 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
}

if op := s.transferPeer(plan); op != nil {
s.retryQuota.ResetLimit(plan.source)
op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator"))
return []*operator.Operator{op}
}
}
s.retryQuota.Attenuate(plan.source)
}
s.retryQuota.GC(stores)
return nil
}

Expand Down
43 changes: 24 additions & 19 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,19 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()

{ // update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums,
regionRead,
read, core.LeaderKind)
h.stLoadInfos[readPeer] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums,
regionRead,
Expand All @@ -209,12 +212,14 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
{ // update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums,
regionWrite,
write, core.LeaderKind)

h.stLoadInfos[writePeer] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums,
regionWrite,
Expand Down Expand Up @@ -246,6 +251,7 @@ func (h *hotScheduler) gcRegionPendings() {
// summaryStoresLoad Load information of all available stores.
// it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store
func summaryStoresLoad(
stores []*core.StoreInfo,
storesLoads map[uint64][]float64,
storePendings map[uint64]*Influence,
storeHotPeers map[uint64][]*statistics.HotPeerStat,
Expand All @@ -258,7 +264,12 @@ func summaryStoresLoad(
allCount := 0.0

// Stores without byte rate statistics is not available to schedule.
for id, storeLoads := range storesLoads {
for _, store := range stores {
id := store.GetID()
storeLoads, ok := storesLoads[id]
if !ok {
continue
}
loads := make([]float64, statistics.DimLen)
switch rwTy {
case read:
Expand Down Expand Up @@ -310,6 +321,7 @@ func summaryStoresLoad(

// Construct store load info.
loadDetail[id] = &storeLoadDetail{
Store: store,
LoadPred: stLoadPred,
HotPeers: hotPeers,
}
Expand Down Expand Up @@ -558,10 +570,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
for id, detail := range bs.stLoadDetail {
if bs.cluster.GetStore(id) == nil {
log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore))
continue
}
if len(detail.HotPeers) == 0 {
continue
}
Expand Down Expand Up @@ -705,12 +713,9 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {
func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
var (
filters []filter.Filter
candidates []*core.StoreInfo
candidates []*storeLoadDetail
)
srcStore := bs.cluster.GetStore(bs.cur.srcStoreID)
if srcStore == nil {
return nil
}
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store
switch bs.opTy {
case movePeer:
filters = []filter.Filter{
Expand All @@ -720,8 +725,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filter.NewPlacementSafeguard(bs.sche.GetName(), bs.cluster, bs.cur.region, srcStore),
}

for storeID := range bs.stLoadDetail {
candidates = append(candidates, bs.cluster.GetStore(storeID))
for _, detail := range bs.stLoadDetail {
candidates = append(candidates, detail)
}

case transferLeader:
Expand All @@ -733,9 +738,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filters = append(filters, leaderFilter)
}

for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) {
if _, ok := bs.stLoadDetail[store.GetID()]; ok {
candidates = append(candidates, store)
for _, peer := range bs.cur.region.GetFollowers() {
if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok {
candidates = append(candidates, detail)
}
}

Expand All @@ -745,17 +750,17 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
return bs.pickDstStores(filters, candidates)
}

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail {
func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
for _, store := range candidates {
for _, detail := range candidates {
store := detail.Store
if filter.Target(bs.cluster.GetOpts(), store, filters) {
detail := bs.stLoadDetail[store.GetID()]
maxLoads := detail.LoadPred.max().Loads
if slice.AllOf(maxLoads, func(i int) bool {
return maxLoads[i]*dstToleranceRatio < detail.LoadPred.Expect.Loads[i]
}) {
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
ret[store.GetID()] = detail
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc()
}
hotSchedulerResultCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc()
Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,20 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op
}

func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator {
stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()
switch typ {
case read:
s.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesLoads,
map[uint64]*Influence{},
cluster.RegionReadStats(),
read, core.LeaderKind)
return s.randomSchedule(cluster, s.stLoadInfos[readLeader])
case write:
s.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesLoads,
map[uint64]*Influence{},
cluster.RegionWriteStats(),
Expand Down
Loading

0 comments on commit 3e8a5a7

Please sign in to comment.