Skip to content

Commit

Permalink
add transfer leader scheduler test
Browse files Browse the repository at this point in the history
Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

#ref tikv#5638
  • Loading branch information
ethercflow committed Oct 27, 2022
1 parent 97210eb commit cb96f40
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 34 deletions.
69 changes: 35 additions & 34 deletions server/schedulers/transfer_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,18 @@ func init() {
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}
regions := conf.getRegions()
for _, id := range regions {
for _, id := range conf.Regions {
if id == regionID {
return errs.ErrSchedulerConfig.FastGen("dup id")
}
}
regions = append(regions, regionID)
conf.Regions = append(conf.Regions, regionID)
return nil
}
})

schedule.RegisterScheduler(TransferLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &transferLeaderSchedulerConfig{regions: make([]uint64, 0), storage: storage}
conf := &transferLeaderSchedulerConfig{Regions: make([]uint64, 0), storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
Expand All @@ -78,23 +77,43 @@ func init() {
type transferLeaderSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
regions []uint64
Regions []uint64
cluster schedule.Cluster
}

func (conf *transferLeaderSchedulerConfig) getRegions() []uint64 {
conf.mu.RLock()
defer conf.mu.RUnlock()
return conf.regions
return conf.Regions
}

func (conf *transferLeaderSchedulerConfig) BuildWithArgs(args []string) error {
if len(args) != 1 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}
regionID, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}
conf.mu.RLock()
for _, id := range conf.Regions {
if id == regionID {
conf.mu.RUnlock()
return errs.ErrSchedulerConfig.FastGen("dup id")
}
}
conf.Regions = append(conf.Regions, regionID)
conf.mu.RUnlock()
return nil
}

func (conf *transferLeaderSchedulerConfig) Clone() *transferLeaderSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
regions := make([]uint64, len(conf.regions))
copy(regions, conf.regions)
regions := make([]uint64, len(conf.Regions))
copy(regions, conf.Regions)
return &transferLeaderSchedulerConfig{
regions: regions,
Regions: regions,
}
}

Expand All @@ -116,12 +135,11 @@ func (conf *transferLeaderSchedulerConfig) removeRegionID(id uint64) (succ bool,
conf.mu.Lock()
defer conf.mu.Unlock()
succ, last = false, false
regionIDs := conf.getRegions()
for i, other := range regionIDs {
for i, other := range conf.Regions {
if other == id {
regionIDs = append(regionIDs[:i], regionIDs[i+1:]...)
conf.Regions = append(conf.Regions[:i], conf.Regions[i+1:]...)
succ = true
last = len(regionIDs) == 0
last = len(conf.Regions) == 0
break
}
}
Expand All @@ -145,10 +163,6 @@ func newTransferLeaderScheduler(opController *schedule.OperatorController, conf
}
}

func (s *transferLeaderScheduler) RegionIDs() []uint64 {
return s.conf.getRegions()
}

func (s *transferLeaderScheduler) GetName() string {
return TransferLeaderName
}
Expand Down Expand Up @@ -177,29 +191,16 @@ func (s *transferLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool
}

func (s *transferLeaderScheduler) UpdateConfig(args []string) error {
if len(args) != 1 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}
regionID, err := strconv.ParseUint(args[0], 10, 64)
err := s.conf.BuildWithArgs(args)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}
s.conf.mu.RLock()
regions := s.conf.getRegions()
for _, id := range regions {
if id == regionID {
s.conf.mu.RUnlock()
return errs.ErrSchedulerConfig.FastGen("dup id")
}
return err
}
regions = append(regions, regionID)
s.conf.mu.RUnlock()
err = s.conf.Persist()
if err != nil {
regionID, _ := strconv.ParseUint(args[0], 10, 64)
s.conf.removeRegionID(regionID)
return err
}
return nil
return err
}

type transferLeaderSchedulerConf interface {
Expand Down
114 changes: 114 additions & 0 deletions server/schedulers/transfer_leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"context"
"reflect"
"testing"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/testutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/storage"
)

func TestTransferLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)

// Add stores 1, 2, 3
tc.AddLeaderStore(1, 0)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(3, 0)
// Add regions 1 with leader in stores 1
tc.AddLeaderRegion(1, 1, 2, 3)

sl, err := schedule.CreateScheduler(TransferLeaderType, schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(TransferLeaderType, []string{"1"}))
re.NoError(err)
re.True(sl.IsScheduleAllowed(tc))
ops, _ := sl.Schedule(tc, false)
testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2, 3})
re.False(ops[0].Step(0).(operator.TransferLeader).IsFinish(tc.MockRegionInfo(1, 1, []uint64{2, 3}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0})))
re.True(ops[0].Step(0).(operator.TransferLeader).IsFinish(tc.MockRegionInfo(1, 2, []uint64{1, 3}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0})))
}

func TestTransferLeaderWithUnhealthyPeer(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
sl, err := schedule.CreateScheduler(TransferLeaderType, schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(TransferLeaderType, []string{"1"}))
re.NoError(err)

// Add stores 1, 2, 3
tc.AddLeaderStore(1, 0)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(3, 0)
// Add region 1, which has 3 peers. 1 is leader. 2 is healthy or pending, 3 is healthy or down.
tc.AddLeaderRegion(1, 1, 2, 3)
region := tc.MockRegionInfo(1, 1, []uint64{2, 3}, nil, nil)
withDownPeer := core.WithDownPeers([]*pdpb.PeerStats{{
Peer: region.GetPeers()[2],
DownSeconds: 1000,
}})
withPendingPeer := core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[1]})

// only pending
tc.PutRegion(region.Clone(withPendingPeer))
ops, _ := sl.Schedule(tc, false)
testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{3})
// only down
tc.PutRegion(region.Clone(withDownPeer))
ops, _ = sl.Schedule(tc, false)
testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2})
// pending + down
tc.PutRegion(region.Clone(withPendingPeer, withDownPeer))
ops, _ = sl.Schedule(tc, false)
re.Empty(ops)
}

func TestTransferLeaderConfigClone(t *testing.T) {
re := require.New(t)

emptyConf := &transferLeaderSchedulerConfig{Regions: make([]uint64, 0)}
con2 := emptyConf.Clone()
re.Empty(emptyConf.getRegions())
re.NoError(con2.BuildWithArgs([]string{"1"}))
re.NotEmpty(con2.getRegions())
re.Empty(emptyConf.getRegions())

con3 := con2.Clone()
con3.Regions = []uint64{1, 2, 3}
re.Empty(emptyConf.getRegions())
re.False(len(con3.getRegions()) == len(con2.getRegions()))

con4 := con3.Clone()
re.True(reflect.DeepEqual(con4.getRegions(), con3.getRegions()))
con4.Regions[0] = 4
re.False(con4.Regions[0] == con3.Regions[0])
}

0 comments on commit cb96f40

Please sign in to comment.