From 5e4573293e3728c38bb2e69fa97c311a71404178 Mon Sep 17 00:00:00 2001 From: Wenbo Zhang Date: Fri, 4 Nov 2022 17:34:03 +0800 Subject: [PATCH] address comments ref tikv/pd#5638 Signed-off-by: Wenbo Zhang --- server/cluster/coordinator.go | 9 +++-- server/cluster/coordinator_test.go | 40 ++++++++++---------- server/config/config.go | 1 - server/schedulers/transfer_witness_leader.go | 18 ++++----- 4 files changed, 33 insertions(+), 35 deletions(-) diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 122884ce8ff..b3c0f832025 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -969,10 +969,11 @@ func (c *coordinator) getPausedSchedulerDelayUntil(name string) (int64, error) { } func (c *coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) { - c.RLock() - defer c.RUnlock() - if s, ok := c.schedulers[schedulers.TransferWitnessLeaderName]; ok { - if schedulers.NeedTransferWitnessLeader(region) { + if schedulers.NeedTransferWitnessLeader(region) { + c.RLock() + s, ok := c.schedulers[schedulers.TransferWitnessLeaderName] + c.RUnlock() + if ok { select { case schedulers.RecvRegionInfo(s.Scheduler) <- region: default: diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index ef7ab881c21..84c43b7e60b 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -672,7 +672,6 @@ func TestAddScheduler(t *testing.T) { re.NoError(co.removeScheduler(schedulers.BalanceRegionName)) re.NoError(co.removeScheduler(schedulers.HotRegionName)) re.NoError(co.removeScheduler(schedulers.SplitBucketName)) - re.NoError(co.removeScheduler(schedulers.TransferWitnessLeaderName)) re.Empty(co.schedulers) stream := mockhbstream.NewHeartbeatStream() @@ -739,7 +738,7 @@ func TestPersistScheduler(t *testing.T) { re.NoError(tc.addLeaderStore(1, 1)) re.NoError(tc.addLeaderStore(2, 1)) - re.Len(co.schedulers, 5) + re.Len(co.schedulers, 4) oc := co.opController storage := tc.RaftCluster.storage @@ -749,15 +748,15 @@ func TestPersistScheduler(t *testing.T) { evict, err := schedule.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"})) re.NoError(err) re.NoError(co.addScheduler(evict, "2")) - re.Len(co.schedulers, 7) + re.Len(co.schedulers, 6) sches, _, err := storage.LoadAllScheduleConfig() re.NoError(err) - re.Len(sches, 7) + re.Len(sches, 6) re.NoError(co.removeScheduler(schedulers.BalanceLeaderName)) re.NoError(co.removeScheduler(schedulers.BalanceRegionName)) re.NoError(co.removeScheduler(schedulers.HotRegionName)) re.NoError(co.removeScheduler(schedulers.SplitBucketName)) - re.Len(co.schedulers, 3) + re.Len(co.schedulers, 2) re.NoError(co.cluster.opt.Persist(storage)) co.stop() co.wg.Wait() @@ -772,21 +771,21 @@ func TestPersistScheduler(t *testing.T) { defer func() { config.DefaultSchedulers = config.DefaultSchedulers[:len(config.DefaultSchedulers)-1] }() - re.Len(newOpt.GetSchedulers(), 5) + re.Len(newOpt.GetSchedulers(), 4) re.NoError(newOpt.Reload(storage)) // only remains 3 items with independent config. sches, _, err = storage.LoadAllScheduleConfig() re.NoError(err) - re.Len(sches, 4) + re.Len(sches, 3) // option have 6 items because the default scheduler do not remove. - re.Len(newOpt.GetSchedulers(), 8) + re.Len(newOpt.GetSchedulers(), 7) re.NoError(newOpt.Persist(storage)) tc.RaftCluster.opt = newOpt co = newCoordinator(ctx, tc.RaftCluster, hbStreams) co.run() - re.Len(co.schedulers, 4) + re.Len(co.schedulers, 3) co.stop() co.wg.Wait() // suppose restart PD again @@ -796,22 +795,22 @@ func TestPersistScheduler(t *testing.T) { tc.RaftCluster.opt = newOpt co = newCoordinator(ctx, tc.RaftCluster, hbStreams) co.run() - re.Len(co.schedulers, 4) + re.Len(co.schedulers, 3) bls, err := schedule.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) re.NoError(co.addScheduler(bls)) brs, err := schedule.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedule.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) re.NoError(co.addScheduler(brs)) - re.Len(co.schedulers, 6) + re.Len(co.schedulers, 5) // the scheduler option should contain 6 items // the `hot scheduler` are disabled - re.Len(co.cluster.opt.GetSchedulers(), 8) + re.Len(co.cluster.opt.GetSchedulers(), 7) re.NoError(co.removeScheduler(schedulers.GrantLeaderName)) // the scheduler that is not enable by default will be completely deleted - re.Len(co.cluster.opt.GetSchedulers(), 7) - re.Len(co.schedulers, 5) + re.Len(co.cluster.opt.GetSchedulers(), 6) + re.Len(co.schedulers, 4) re.NoError(co.cluster.opt.Persist(co.cluster.storage)) co.stop() co.wg.Wait() @@ -822,9 +821,9 @@ func TestPersistScheduler(t *testing.T) { co = newCoordinator(ctx, tc.RaftCluster, hbStreams) co.run() - re.Len(co.schedulers, 5) - re.NoError(co.removeScheduler(schedulers.EvictLeaderName)) re.Len(co.schedulers, 4) + re.NoError(co.removeScheduler(schedulers.EvictLeaderName)) + re.Len(co.schedulers, 3) } func TestRemoveScheduler(t *testing.T) { @@ -842,17 +841,17 @@ func TestRemoveScheduler(t *testing.T) { re.NoError(tc.addLeaderStore(1, 1)) re.NoError(tc.addLeaderStore(2, 1)) - re.Len(co.schedulers, 5) + re.Len(co.schedulers, 4) oc := co.opController storage := tc.RaftCluster.storage gls1, err := schedule.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) re.NoError(err) re.NoError(co.addScheduler(gls1, "1")) - re.Len(co.schedulers, 6) + re.Len(co.schedulers, 5) sches, _, err := storage.LoadAllScheduleConfig() re.NoError(err) - re.Len(sches, 6) + re.Len(sches, 5) // remove all schedulers re.NoError(co.removeScheduler(schedulers.BalanceLeaderName)) @@ -860,7 +859,6 @@ func TestRemoveScheduler(t *testing.T) { re.NoError(co.removeScheduler(schedulers.HotRegionName)) re.NoError(co.removeScheduler(schedulers.GrantLeaderName)) re.NoError(co.removeScheduler(schedulers.SplitBucketName)) - re.NoError(co.removeScheduler(schedulers.TransferWitnessLeaderName)) // all removed sches, _, err = storage.LoadAllScheduleConfig() re.NoError(err) @@ -879,7 +877,7 @@ func TestRemoveScheduler(t *testing.T) { co.run() re.Empty(co.schedulers) // the option remains default scheduler - re.Len(co.cluster.opt.GetSchedulers(), 5) + re.Len(co.cluster.opt.GetSchedulers(), 4) co.stop() co.wg.Wait() } diff --git a/server/config/config.go b/server/config/config.go index ad99dbfda43..e596104c372 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -1042,7 +1042,6 @@ var DefaultSchedulers = SchedulerConfigs{ {Type: "balance-leader"}, {Type: "hot-region"}, {Type: "split-bucket"}, - {Type: "transfer-witness-leader"}, } // IsDefaultScheduler checks whether the scheduler is enable by default. diff --git a/server/schedulers/transfer_witness_leader.go b/server/schedulers/transfer_witness_leader.go index 00a1da6b47a..26f68c1fcd7 100644 --- a/server/schedulers/transfer_witness_leader.go +++ b/server/schedulers/transfer_witness_leader.go @@ -51,28 +51,28 @@ func init() { }) } -type transferLeaderScheduler struct { +type trasferWitnessLeaderScheduler struct { *BaseScheduler regions chan *core.RegionInfo } // newTransferWitnessLeaderScheduler creates an admin scheduler that transfers witness leader of a region. func newTransferWitnessLeaderScheduler(opController *schedule.OperatorController) schedule.Scheduler { - return &transferLeaderScheduler{ + return &trasferWitnessLeaderScheduler{ BaseScheduler: NewBaseScheduler(opController), regions: make(chan *core.RegionInfo, TransferWitnessLeaderRecvMaxRegionSize), } } -func (s *transferLeaderScheduler) GetName() string { +func (s *trasferWitnessLeaderScheduler) GetName() string { return TransferWitnessLeaderName } -func (s *transferLeaderScheduler) GetType() string { +func (s *trasferWitnessLeaderScheduler) GetType() string { return TransferWitnessLeaderType } -func (s *transferLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool { +func (s *trasferWitnessLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool { // TODO: make sure the restriction is reasonable allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { @@ -81,12 +81,12 @@ func (s *transferLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bo return allowed } -func (s *transferLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *trasferWitnessLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() return s.scheduleTransferWitnessLeaderBatch(s.GetName(), s.GetType(), cluster, TransferWitnessLeaderBatchSize), nil } -func (s *transferLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster schedule.Cluster, batchSize int) []*operator.Operator { +func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster schedule.Cluster, batchSize int) []*operator.Operator { var ops []*operator.Operator for i := 0; i < batchSize; i++ { select { @@ -108,7 +108,7 @@ func (s *transferLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ s return ops } -func (s *transferLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster schedule.Cluster, region *core.RegionInfo) (*operator.Operator, error) { +func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster schedule.Cluster, region *core.RegionInfo) (*operator.Operator, error) { var filters []filter.Filter unhealthyPeerStores := make(map[uint64]struct{}) for _, peer := range region.GetDownPeers() { @@ -144,5 +144,5 @@ func NeedTransferWitnessLeader(region *core.RegionInfo) bool { // RecvRegionInfo is used to return a writable channel to recv region info from other places func RecvRegionInfo(s schedule.Scheduler) chan<- *core.RegionInfo { - return s.(*transferLeaderScheduler).regions + return s.(*trasferWitnessLeaderScheduler).regions }