diff --git a/server/schedulers/transfer_leader.go b/server/schedulers/transfer_leader.go index d85d23b90e7..fde34bc41f1 100644 --- a/server/schedulers/transfer_leader.go +++ b/server/schedulers/transfer_leader.go @@ -54,19 +54,18 @@ func init() { if err != nil { return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() } - regions := conf.getRegions() - for _, id := range regions { + for _, id := range conf.Regions { if id == regionID { return errs.ErrSchedulerConfig.FastGen("dup id") } } - regions = append(regions, regionID) + conf.Regions = append(conf.Regions, regionID) return nil } }) schedule.RegisterScheduler(TransferLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - conf := &transferLeaderSchedulerConfig{regions: make([]uint64, 0), storage: storage} + conf := &transferLeaderSchedulerConfig{Regions: make([]uint64, 0), storage: storage} if err := decoder(conf); err != nil { return nil, err } @@ -78,23 +77,43 @@ func init() { type transferLeaderSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage - regions []uint64 + Regions []uint64 cluster schedule.Cluster } func (conf *transferLeaderSchedulerConfig) getRegions() []uint64 { conf.mu.RLock() defer conf.mu.RUnlock() - return conf.regions + return conf.Regions +} + +func (conf *transferLeaderSchedulerConfig) BuildWithArgs(args []string) error { + if len(args) != 1 { + return errs.ErrSchedulerConfig.FastGenByArgs("id") + } + regionID, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() + } + conf.mu.RLock() + for _, id := range conf.Regions { + if id == regionID { + conf.mu.RUnlock() + return errs.ErrSchedulerConfig.FastGen("dup id") + } + } + conf.Regions = append(conf.Regions, regionID) + conf.mu.RUnlock() + return nil } func (conf *transferLeaderSchedulerConfig) Clone() *transferLeaderSchedulerConfig { conf.mu.RLock() defer conf.mu.RUnlock() - regions := make([]uint64, len(conf.regions)) - copy(regions, conf.regions) + regions := make([]uint64, len(conf.Regions)) + copy(regions, conf.Regions) return &transferLeaderSchedulerConfig{ - regions: regions, + Regions: regions, } } @@ -116,12 +135,11 @@ func (conf *transferLeaderSchedulerConfig) removeRegionID(id uint64) (succ bool, conf.mu.Lock() defer conf.mu.Unlock() succ, last = false, false - regionIDs := conf.getRegions() - for i, other := range regionIDs { + for i, other := range conf.Regions { if other == id { - regionIDs = append(regionIDs[:i], regionIDs[i+1:]...) + conf.Regions = append(conf.Regions[:i], conf.Regions[i+1:]...) succ = true - last = len(regionIDs) == 0 + last = len(conf.Regions) == 0 break } } @@ -145,10 +163,6 @@ func newTransferLeaderScheduler(opController *schedule.OperatorController, conf } } -func (s *transferLeaderScheduler) RegionIDs() []uint64 { - return s.conf.getRegions() -} - func (s *transferLeaderScheduler) GetName() string { return TransferLeaderName } @@ -177,29 +191,16 @@ func (s *transferLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool } func (s *transferLeaderScheduler) UpdateConfig(args []string) error { - if len(args) != 1 { - return errs.ErrSchedulerConfig.FastGenByArgs("id") - } - regionID, err := strconv.ParseUint(args[0], 10, 64) + err := s.conf.BuildWithArgs(args) if err != nil { - return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause() - } - s.conf.mu.RLock() - regions := s.conf.getRegions() - for _, id := range regions { - if id == regionID { - s.conf.mu.RUnlock() - return errs.ErrSchedulerConfig.FastGen("dup id") - } + return err } - regions = append(regions, regionID) - s.conf.mu.RUnlock() err = s.conf.Persist() if err != nil { + regionID, _ := strconv.ParseUint(args[0], 10, 64) s.conf.removeRegionID(regionID) - return err } - return nil + return err } type transferLeaderSchedulerConf interface { diff --git a/server/schedulers/transfer_leader_test.go b/server/schedulers/transfer_leader_test.go new file mode 100644 index 00000000000..b338ef25b20 --- /dev/null +++ b/server/schedulers/transfer_leader_test.go @@ -0,0 +1,114 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "context" + "reflect" + "testing" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/mock/mockcluster" + "github.com/tikv/pd/pkg/testutil" + "github.com/tikv/pd/server/config" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/schedule/operator" + "github.com/tikv/pd/server/storage" +) + +func TestTransferLeader(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(ctx, opt) + + // Add stores 1, 2, 3 + tc.AddLeaderStore(1, 0) + tc.AddLeaderStore(2, 0) + tc.AddLeaderStore(3, 0) + // Add regions 1 with leader in stores 1 + tc.AddLeaderRegion(1, 1, 2, 3) + + sl, err := schedule.CreateScheduler(TransferLeaderType, schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(TransferLeaderType, []string{"1"})) + re.NoError(err) + re.True(sl.IsScheduleAllowed(tc)) + ops, _ := sl.Schedule(tc, false) + testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2, 3}) + re.False(ops[0].Step(0).(operator.TransferLeader).IsFinish(tc.MockRegionInfo(1, 1, []uint64{2, 3}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0}))) + re.True(ops[0].Step(0).(operator.TransferLeader).IsFinish(tc.MockRegionInfo(1, 2, []uint64{1, 3}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0}))) +} + +func TestTransferLeaderWithUnhealthyPeer(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(ctx, opt) + sl, err := schedule.CreateScheduler(TransferLeaderType, schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(TransferLeaderType, []string{"1"})) + re.NoError(err) + + // Add stores 1, 2, 3 + tc.AddLeaderStore(1, 0) + tc.AddLeaderStore(2, 0) + tc.AddLeaderStore(3, 0) + // Add region 1, which has 3 peers. 1 is leader. 2 is healthy or pending, 3 is healthy or down. + tc.AddLeaderRegion(1, 1, 2, 3) + region := tc.MockRegionInfo(1, 1, []uint64{2, 3}, nil, nil) + withDownPeer := core.WithDownPeers([]*pdpb.PeerStats{{ + Peer: region.GetPeers()[2], + DownSeconds: 1000, + }}) + withPendingPeer := core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[1]}) + + // only pending + tc.PutRegion(region.Clone(withPendingPeer)) + ops, _ := sl.Schedule(tc, false) + testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{3}) + // only down + tc.PutRegion(region.Clone(withDownPeer)) + ops, _ = sl.Schedule(tc, false) + testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2}) + // pending + down + tc.PutRegion(region.Clone(withPendingPeer, withDownPeer)) + ops, _ = sl.Schedule(tc, false) + re.Empty(ops) +} + +func TestTransferLeaderConfigClone(t *testing.T) { + re := require.New(t) + + emptyConf := &transferLeaderSchedulerConfig{Regions: make([]uint64, 0)} + con2 := emptyConf.Clone() + re.Empty(emptyConf.getRegions()) + re.NoError(con2.BuildWithArgs([]string{"1"})) + re.NotEmpty(con2.getRegions()) + re.Empty(emptyConf.getRegions()) + + con3 := con2.Clone() + con3.Regions = []uint64{1, 2, 3} + re.Empty(emptyConf.getRegions()) + re.False(len(con3.getRegions()) == len(con2.getRegions())) + + con4 := con3.Clone() + re.True(reflect.DeepEqual(con4.getRegions(), con3.getRegions())) + con4.Regions[0] = 4 + re.False(con4.Regions[0] == con3.Regions[0]) +}