Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
ref tikv#5638

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>
  • Loading branch information
ethercflow committed Nov 4, 2022
1 parent 023bca8 commit 4f1d24c
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 33 deletions.
9 changes: 5 additions & 4 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,10 +969,11 @@ func (c *coordinator) getPausedSchedulerDelayUntil(name string) (int64, error) {
}

func (c *coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) {
c.RLock()
defer c.RUnlock()
if s, ok := c.schedulers[schedulers.TransferWitnessLeaderName]; ok {
if schedulers.NeedTransferWitnessLeader(region) {
if schedulers.NeedTransferWitnessLeader(region) {
c.RLock()
s, ok := c.schedulers[schedulers.TransferWitnessLeaderName]
c.RUnlock()
if ok {
select {
case schedulers.RecvRegionInfo(s.Scheduler) <- region:
default:
Expand Down
37 changes: 18 additions & 19 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,6 @@ func TestAddScheduler(t *testing.T) {
re.NoError(co.removeScheduler(schedulers.BalanceRegionName))
re.NoError(co.removeScheduler(schedulers.HotRegionName))
re.NoError(co.removeScheduler(schedulers.SplitBucketName))
re.NoError(co.removeScheduler(schedulers.TransferWitnessLeaderName))
re.Empty(co.schedulers)

stream := mockhbstream.NewHeartbeatStream()
Expand Down Expand Up @@ -739,7 +738,7 @@ func TestPersistScheduler(t *testing.T) {
re.NoError(tc.addLeaderStore(1, 1))
re.NoError(tc.addLeaderStore(2, 1))

re.Len(co.schedulers, 5)
re.Len(co.schedulers, 4)
oc := co.opController
storage := tc.RaftCluster.storage

Expand All @@ -749,15 +748,15 @@ func TestPersistScheduler(t *testing.T) {
evict, err := schedule.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}))
re.NoError(err)
re.NoError(co.addScheduler(evict, "2"))
re.Len(co.schedulers, 7)
re.Len(co.schedulers, 6)
sches, _, err := storage.LoadAllScheduleConfig()
re.NoError(err)
re.Len(sches, 7)
re.Len(sches, 6)
re.NoError(co.removeScheduler(schedulers.BalanceLeaderName))
re.NoError(co.removeScheduler(schedulers.BalanceRegionName))
re.NoError(co.removeScheduler(schedulers.HotRegionName))
re.NoError(co.removeScheduler(schedulers.SplitBucketName))
re.Len(co.schedulers, 3)
re.Len(co.schedulers, 2)
re.NoError(co.cluster.opt.Persist(storage))
co.stop()
co.wg.Wait()
Expand All @@ -772,21 +771,21 @@ func TestPersistScheduler(t *testing.T) {
defer func() {
config.DefaultSchedulers = config.DefaultSchedulers[:len(config.DefaultSchedulers)-1]
}()
re.Len(newOpt.GetSchedulers(), 5)
re.Len(newOpt.GetSchedulers(), 4)
re.NoError(newOpt.Reload(storage))
// only remains 3 items with independent config.
sches, _, err = storage.LoadAllScheduleConfig()
re.NoError(err)
re.Len(sches, 4)
re.Len(sches, 3)

// option have 6 items because the default scheduler do not remove.
re.Len(newOpt.GetSchedulers(), 8)
re.Len(newOpt.GetSchedulers(), 7)
re.NoError(newOpt.Persist(storage))
tc.RaftCluster.opt = newOpt

co = newCoordinator(ctx, tc.RaftCluster, hbStreams)
co.run()
re.Len(co.schedulers, 4)
re.Len(co.schedulers, 3)
co.stop()
co.wg.Wait()
// suppose restart PD again
Expand All @@ -796,22 +795,22 @@ func TestPersistScheduler(t *testing.T) {
tc.RaftCluster.opt = newOpt
co = newCoordinator(ctx, tc.RaftCluster, hbStreams)
co.run()
re.Len(co.schedulers, 4)
re.Len(co.schedulers, 3)
bls, err := schedule.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""}))
re.NoError(err)
re.NoError(co.addScheduler(bls))
brs, err := schedule.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedule.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""}))
re.NoError(err)
re.NoError(co.addScheduler(brs))
re.Len(co.schedulers, 6)
re.Len(co.schedulers, 5)

// the scheduler option should contain 6 items
// the `hot scheduler` are disabled
re.Len(co.cluster.opt.GetSchedulers(), 8)
re.Len(co.cluster.opt.GetSchedulers(), 7)
re.NoError(co.removeScheduler(schedulers.GrantLeaderName))
// the scheduler that is not enable by default will be completely deleted
re.Len(co.cluster.opt.GetSchedulers(), 7)
re.Len(co.schedulers, 5)
re.Len(co.cluster.opt.GetSchedulers(), 6)
re.Len(co.schedulers, 4)
re.NoError(co.cluster.opt.Persist(co.cluster.storage))
co.stop()
co.wg.Wait()
Expand All @@ -822,9 +821,9 @@ func TestPersistScheduler(t *testing.T) {
co = newCoordinator(ctx, tc.RaftCluster, hbStreams)

co.run()
re.Len(co.schedulers, 5)
re.NoError(co.removeScheduler(schedulers.EvictLeaderName))
re.Len(co.schedulers, 4)
re.NoError(co.removeScheduler(schedulers.EvictLeaderName))
re.Len(co.schedulers, 3)
}

func TestRemoveScheduler(t *testing.T) {
Expand All @@ -849,10 +848,10 @@ func TestRemoveScheduler(t *testing.T) {
gls1, err := schedule.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}))
re.NoError(err)
re.NoError(co.addScheduler(gls1, "1"))
re.Len(co.schedulers, 6)
re.Len(co.schedulers, 5)
sches, _, err := storage.LoadAllScheduleConfig()
re.NoError(err)
re.Len(sches, 6)
re.Len(sches, 5)

// remove all schedulers
re.NoError(co.removeScheduler(schedulers.BalanceLeaderName))
Expand All @@ -879,7 +878,7 @@ func TestRemoveScheduler(t *testing.T) {
co.run()
re.Empty(co.schedulers)
// the option remains default scheduler
re.Len(co.cluster.opt.GetSchedulers(), 5)
re.Len(co.cluster.opt.GetSchedulers(), 4)
co.stop()
co.wg.Wait()
}
Expand Down
1 change: 0 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,6 @@ var DefaultSchedulers = SchedulerConfigs{
{Type: "balance-leader"},
{Type: "hot-region"},
{Type: "split-bucket"},
{Type: "transfer-witness-leader"},
}

// IsDefaultScheduler checks whether the scheduler is enable by default.
Expand Down
18 changes: 9 additions & 9 deletions server/schedulers/transfer_witness_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,28 @@ func init() {
})
}

type transferLeaderScheduler struct {
type trasferWitnessLeaderScheduler struct {
*BaseScheduler
regions chan *core.RegionInfo
}

// newTransferWitnessLeaderScheduler creates an admin scheduler that transfers witness leader of a region.
func newTransferWitnessLeaderScheduler(opController *schedule.OperatorController) schedule.Scheduler {
return &transferLeaderScheduler{
return &trasferWitnessLeaderScheduler{
BaseScheduler: NewBaseScheduler(opController),
regions: make(chan *core.RegionInfo, TransferWitnessLeaderRecvMaxRegionSize),
}
}

func (s *transferLeaderScheduler) GetName() string {
func (s *trasferWitnessLeaderScheduler) GetName() string {
return TransferWitnessLeaderName
}

func (s *transferLeaderScheduler) GetType() string {
func (s *trasferWitnessLeaderScheduler) GetType() string {
return TransferWitnessLeaderType
}

func (s *transferLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
func (s *trasferWitnessLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
// TODO: make sure the restriction is reasonable
allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit()
if !allowed {
Expand All @@ -81,12 +81,12 @@ func (s *transferLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bo
return allowed
}

func (s *transferLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
func (s *trasferWitnessLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
return s.scheduleTransferWitnessLeaderBatch(s.GetName(), s.GetType(), cluster, TransferWitnessLeaderBatchSize), nil
}

func (s *transferLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster schedule.Cluster, batchSize int) []*operator.Operator {
func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster schedule.Cluster, batchSize int) []*operator.Operator {
var ops []*operator.Operator
for i := 0; i < batchSize; i++ {
select {
Expand All @@ -108,7 +108,7 @@ func (s *transferLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ s
return ops
}

func (s *transferLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster schedule.Cluster, region *core.RegionInfo) (*operator.Operator, error) {
func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster schedule.Cluster, region *core.RegionInfo) (*operator.Operator, error) {
var filters []filter.Filter
unhealthyPeerStores := make(map[uint64]struct{})
for _, peer := range region.GetDownPeers() {
Expand Down Expand Up @@ -144,5 +144,5 @@ func NeedTransferWitnessLeader(region *core.RegionInfo) bool {

// RecvRegionInfo is used to return a writable channel to recv region info from other places
func RecvRegionInfo(s schedule.Scheduler) chan<- *core.RegionInfo {
return s.(*transferLeaderScheduler).regions
return s.(*trasferWitnessLeaderScheduler).regions
}

0 comments on commit 4f1d24c

Please sign in to comment.