Skip to content

Commit

Permalink
Merge branch 'release-5.2' into cherry-pick-4111-to-release-5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 13, 2021
2 parents 70f219b + 46534a1 commit 1a6e7c1
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 49 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20210805052247-76981389e818
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5
github.com/pingcap/tidb-dashboard v0.0.0-20210826074103-29034af68525
github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuR
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8=
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 h1:7rvAtZe/ZUzOKzgriNPQoBNvleJXBk4z7L3Z47+tS98=
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY=
github.com/pingcap/tidb-dashboard v0.0.0-20210826074103-29034af68525 h1:hJDAzcVGfVsUx90/JHXPgvHggxsiuYZJ6FKkLY6Vo3M=
github.com/pingcap/tidb-dashboard v0.0.0-20210826074103-29034af68525/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529 h1:EVU+6dwm/kVb0kHsh+Wdgu2RGcpK1o9eUZI5QFzZPR4=
github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
3 changes: 3 additions & 0 deletions pkg/dashboard/distro/distro.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build dashboard_distro
// +build dashboard_distro

package distro

import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,5 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !dashboard_distro

package distro

import "github.com/pingcap/tidb-dashboard/pkg/utils/distro"

// Resource declared the distro brand information
var Resource = distro.Resource
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ func (c *RaftCluster) Stop() {
}

c.running = false
close(c.quit)
c.coordinator.stop()
close(c.quit)
c.Unlock()
c.wg.Wait()
log.Info("raftcluster is stopped")
Expand Down
6 changes: 6 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,12 @@ func (s *scheduleController) Stop() {

func (s *scheduleController) Schedule() []*operator.Operator {
for i := 0; i < maxScheduleRetries; i++ {
// no need to retry if schedule should stop to speed exit
select {
case <-s.ctx.Done():
return nil
default:
}
// If we have schedule, reset interval to the minimal interval.
if op := s.Scheduler.Schedule(s.cluster); op != nil {
s.nextInterval = s.Scheduler.GetMinInterval()
Expand Down
5 changes: 1 addition & 4 deletions server/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,10 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < r.opts.GetMaxStoreDownTime() {
continue
}
if stats.GetDownSeconds() < uint64(r.opts.GetMaxStoreDownTime().Seconds()) {
continue
}

return r.fixPeer(region, storeID, downStatus)
}
return nil
Expand Down
4 changes: 1 addition & 3 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,10 @@ func (c *RuleChecker) isDownPeer(region *core.RegionInfo, peer *metapb.Peer) boo
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return false
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < c.cluster.GetOpts().GetMaxStoreDownTime() {
continue
}
if stats.GetDownSeconds() < uint64(c.cluster.GetOpts().GetMaxStoreDownTime().Seconds()) {
continue
}
return true
}
return false
Expand Down
16 changes: 12 additions & 4 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type balanceLeaderSchedulerConfig struct {

type balanceLeaderScheduler struct {
*BaseScheduler
*retryQuota
conf *balanceLeaderSchedulerConfig
opController *schedule.OperatorController
filters []filter.Filter
Expand All @@ -83,6 +84,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *

s := &balanceLeaderScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceLeaderRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
conf: conf,
opController: opController,
counter: balanceLeaderCounter,
Expand Down Expand Up @@ -153,40 +155,46 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
})
sort.Slice(targets, func(i, j int) bool {
iOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[j].GetID())
return targets[i].LeaderScore(leaderSchedulePolicy, iOp) <
targets[j].LeaderScore(leaderSchedulePolicy, jOp)
})

for i := 0; i < len(sources) || i < len(targets); i++ {
if i < len(sources) {
plan.source, plan.target = sources[i], nil
retryLimit := l.retryQuota.GetLimit(plan.source)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("source-store", plan.SourceStoreID()))
l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc()
for j := 0; j < balanceLeaderRetryLimit; j++ {
for j := 0; j < retryLimit; j++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if ops := l.transferLeaderOut(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.source)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", plan.SourceMetricLabel()))
return ops
}
}
l.Attenuate(plan.source)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", plan.SourceStoreID()))
}
if i < len(targets) {
plan.source, plan.target = nil, targets[i]
retryLimit := l.retryQuota.GetLimit(plan.target)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("target-store", plan.TargetStoreID()))
l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc()

for j := 0; j < balanceLeaderRetryLimit; j++ {
for j := 0; j < retryLimit; j++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if ops := l.transferLeaderIn(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.target)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", plan.TargetMetricLabel()))
return ops
}
}
l.Attenuate(plan.target)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", plan.TargetStoreID()))
}
}
l.retryQuota.GC(append(sources, targets...))
return nil
}

Expand Down
27 changes: 22 additions & 5 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type balanceRegionSchedulerConfig struct {

type balanceRegionScheduler struct {
*BaseScheduler
*retryQuota
conf *balanceRegionSchedulerConfig
opController *schedule.OperatorController
filters []filter.Filter
Expand All @@ -82,6 +83,7 @@ func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *
base := NewBaseScheduler(opController)
scheduler := &balanceRegionScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceRegionRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
conf: conf,
opController: opController,
counter: balanceRegionCounter,
Expand Down Expand Up @@ -149,23 +151,35 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
return stores[i].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), iOp) >
stores[j].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), jOp)
})

var allowBalanceEmptyRegion func(*core.RegionInfo) bool

switch cluster.(type) {
case *schedule.RangeCluster:
// allow empty region to be scheduled in range cluster
allowBalanceEmptyRegion = func(region *core.RegionInfo) bool { return true }
default:
allowBalanceEmptyRegion = opt.AllowBalanceEmptyRegion(cluster)
}

for _, plan.source = range stores {
for i := 0; i < balanceRegionRetryLimit; i++ {
retryLimit := s.retryQuota.GetLimit(plan.source)
for i := 0; i < retryLimit; i++ {
schedulerCounter.WithLabelValues(s.GetName(), "total").Inc()
// Priority pick the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
if plan.region == nil {
// Then pick the region that has a follower in the source store.
plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
// Then pick the region has the leader in the source store.
plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
// Finally pick learner.
plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc()
Expand All @@ -187,11 +201,14 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
}

if op := s.transferPeer(plan); op != nil {
s.retryQuota.ResetLimit(plan.source)
op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator"))
return []*operator.Operator{op}
}
}
s.retryQuota.Attenuate(plan.source)
}
s.retryQuota.GC(stores)
return nil
}

Expand Down
27 changes: 13 additions & 14 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,28 +1053,29 @@ func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) {
c.Assert(mb.IsScheduleAllowed(tc), IsFalse)
}

var _ = Suite(&testScatterRangeLeaderSuite{})
var _ = Suite(&testScatterRangeSuite{})

type testScatterRangeLeaderSuite struct {
type testScatterRangeSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *testScatterRangeLeaderSuite) SetUpSuite(c *C) {
func (s *testScatterRangeSuite) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testScatterRangeLeaderSuite) TearDownSuite(c *C) {
func (s *testScatterRangeSuite) TearDownSuite(c *C) {
s.cancel()
}

func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
func (s *testScatterRangeSuite) TestBalance(c *C) {
opt := config.NewTestOptions()
// TODO: enable palcementrules
opt.SetPlacementRuleEnabled(false)
tc := mockcluster.NewCluster(s.ctx, opt)
tc.DisableFeature(versioninfo.JointConsensus)
tc.SetTolerantSizeRatio(2.5)
// range cluster use a special tolerant ratio, cluster opt take no impact
tc.SetTolerantSizeRatio(10000)
// Add stores 1,2,3,4,5.
tc.AddRegionStore(1, 0)
tc.AddRegionStore(2, 0)
Expand All @@ -1099,17 +1100,16 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
})
id += 4
}
// empty case
// empty region case
regions[49].EndKey = []byte("")
for _, meta := range regions {
leader := rand.Intn(4) % 3
regionInfo := core.NewRegionInfo(
meta,
meta.Peers[leader],
core.SetApproximateKeys(96),
core.SetApproximateSize(96),
core.SetApproximateKeys(1),
core.SetApproximateSize(1),
)

tc.Regions.SetRegion(regionInfo)
}
for i := 0; i < 100; i++ {
Expand All @@ -1133,7 +1133,7 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
}
}

func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
func (s *testScatterRangeSuite) TestBalanceLeaderLimit(c *C) {
opt := config.NewTestOptions()
opt.SetPlacementRuleEnabled(false)
tc := mockcluster.NewCluster(s.ctx, opt)
Expand Down Expand Up @@ -1164,7 +1164,6 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
id += 4
}

// empty case
regions[49].EndKey = []byte("")
for _, meta := range regions {
leader := rand.Intn(4) % 3
Expand Down Expand Up @@ -1209,7 +1208,7 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
c.Check(maxLeaderCount-minLeaderCount, Greater, 10)
}

func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) {
func (s *testScatterRangeSuite) TestConcurrencyUpdateConfig(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(s.ctx, opt)
oc := schedule.NewOperatorController(s.ctx, nil, nil)
Expand All @@ -1235,7 +1234,7 @@ func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) {
ch <- struct{}{}
}

func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
func (s *testScatterRangeSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(s.ctx, opt)
// Add stores 1,2,3.
Expand Down
10 changes: 6 additions & 4 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ func (conf *hotRegionSchedulerConfig) GetEnableForTiFlash() bool {
}

func (conf *hotRegionSchedulerConfig) SetEnableForTiFlash(enable bool) {
conf.RLock()
defer conf.RUnlock()
conf.Lock()
defer conf.Unlock()
conf.EnableForTiFlash = enable
}

Expand Down Expand Up @@ -309,7 +309,7 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *
}
newc, _ := json.Marshal(conf)
if !bytes.Equal(oldc, newc) {
conf.persist()
conf.persistLocked()
rd.Text(w, http.StatusOK, "success")
}

Expand All @@ -333,7 +333,7 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *
rd.Text(w, http.StatusBadRequest, "config item not found")
}

func (conf *hotRegionSchedulerConfig) persist() error {
func (conf *hotRegionSchedulerConfig) persistLocked() error {
data, err := schedule.EncodeConfig(conf)
if err != nil {
return err
Expand All @@ -343,6 +343,8 @@ func (conf *hotRegionSchedulerConfig) persist() error {

func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster opt.Cluster) bool {
querySupport := cluster.IsFeatureSupported(versioninfo.HotScheduleWithQuery)
conf.Lock()
defer conf.Unlock()
if querySupport != conf.lastQuerySupported {
log.Info("query supported changed",
zap.Bool("last-query-support", conf.lastQuerySupported),
Expand Down
Loading

0 comments on commit 1a6e7c1

Please sign in to comment.