diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a687e841cfe..9f6eddc0ed5 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -793,6 +793,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) } + c.coordinator.CheckTransferWitnessLeader(region) // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 581e76557e3..973821469ce 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/server/schedule/hbstream" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/plan" + "github.com/tikv/pd/server/schedulers" "github.com/tikv/pd/server/statistics" "github.com/tikv/pd/server/storage" "go.uber.org/zap" @@ -966,3 +967,19 @@ func (c *coordinator) getPausedSchedulerDelayUntil(name string) (int64, error) { } return s.GetDelayUntil(), nil } + +// CheckTransferWitnessLeader determines if transfer leader is required, then sends to the scheduler if needed +func (c *coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) { + if core.NeedTransferWitnessLeader(region) { + c.RLock() + s, ok := c.schedulers[schedulers.TransferWitnessLeaderName] + c.RUnlock() + if ok { + select { + case schedulers.RecvRegionInfo(s.Scheduler) <- region: + default: + log.Warn("drop transfer witness leader due to recv region channel full", zap.Uint64("region-id", region.GetID())) + } + } + } +} diff --git a/server/core/region.go b/server/core/region.go index 524daeb7723..6be0ece03ae 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -1421,3 +1421,11 @@ func (h HexRegionsMeta) String() string { } return strings.TrimSpace(b.String()) } + +// NeedTransferWitnessLeader is used to judge if the region's leader is a witness +func NeedTransferWitnessLeader(region *RegionInfo) bool { + if region == nil || region.GetLeader() == nil { + return false + } + return region.GetLeader().IsWitness +} diff --git a/server/schedulers/transfer_witness_leader.go b/server/schedulers/transfer_witness_leader.go new file mode 100644 index 00000000000..2770de9c3f0 --- /dev/null +++ b/server/schedulers/transfer_witness_leader.go @@ -0,0 +1,140 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/schedule/filter" + "github.com/tikv/pd/server/schedule/operator" + "github.com/tikv/pd/server/schedule/plan" + "github.com/tikv/pd/server/storage/endpoint" +) + +const ( + // TransferWitnessLeaderName is transfer witness leader scheduler name. + TransferWitnessLeaderName = "transfer-witness-leader-scheduler" + // TransferWitnessLeaderType is transfer witness leader scheduler type. + TransferWitnessLeaderType = "transfer-witness-leader" + // TransferWitnessLeaderBatchSize is the number of operators to to transfer + // leaders by one scheduling + transferWitnessLeaderBatchSize = 3 + // TransferWitnessLeaderRecvMaxRegionSize is the max number of region can receive + // TODO: make it a reasonable value + transferWitnessLeaderRecvMaxRegionSize = 1000 +) + +func init() { + schedule.RegisterSliceDecoderBuilder(TransferWitnessLeaderType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + return nil + } + }) + + schedule.RegisterScheduler(TransferWitnessLeaderType, func(opController *schedule.OperatorController, _ endpoint.ConfigStorage, _ schedule.ConfigDecoder) (schedule.Scheduler, error) { + return newTransferWitnessLeaderScheduler(opController), nil + }) +} + +type trasferWitnessLeaderScheduler struct { + *BaseScheduler + regions chan *core.RegionInfo +} + +// newTransferWitnessLeaderScheduler creates an admin scheduler that transfers witness leader of a region. +func newTransferWitnessLeaderScheduler(opController *schedule.OperatorController) schedule.Scheduler { + return &trasferWitnessLeaderScheduler{ + BaseScheduler: NewBaseScheduler(opController), + regions: make(chan *core.RegionInfo, transferWitnessLeaderRecvMaxRegionSize), + } +} + +func (s *trasferWitnessLeaderScheduler) GetName() string { + return TransferWitnessLeaderName +} + +func (s *trasferWitnessLeaderScheduler) GetType() string { + return TransferWitnessLeaderType +} + +func (s *trasferWitnessLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool { + // TODO: make sure the restriction is reasonable + allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() + if !allowed { + operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() + } + return allowed +} + +func (s *trasferWitnessLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { + schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() + return s.scheduleTransferWitnessLeaderBatch(s.GetName(), s.GetType(), cluster, transferWitnessLeaderBatchSize), nil +} + +func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster schedule.Cluster, batchSize int) []*operator.Operator { + var ops []*operator.Operator + for i := 0; i < batchSize; i++ { + select { + case region := <-s.regions: + op, err := s.scheduleTransferWitnessLeader(name, typ, cluster, region) + if err != nil { + log.Debug("fail to create transfer leader operator", errs.ZapError(err)) + continue + } + if op != nil { + op.SetPriorityLevel(core.Urgent) + op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(name, "new-operator")) + ops = append(ops, op) + } + default: + break + } + } + return ops +} + +func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster schedule.Cluster, region *core.RegionInfo) (*operator.Operator, error) { + var filters []filter.Filter + unhealthyPeerStores := make(map[uint64]struct{}) + for _, peer := range region.GetDownPeers() { + unhealthyPeerStores[peer.GetPeer().GetStoreId()] = struct{}{} + } + for _, peer := range region.GetPendingPeers() { + unhealthyPeerStores[peer.GetStoreId()] = struct{}{} + } + filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), &filter.StoreStateFilter{ActionScope: name, TransferLeader: true}) + candidates := filter.NewCandidates(cluster.GetFollowerStores(region)).FilterTarget(cluster.GetOpts(), nil, nil, filters...) + // Compatible with old TiKV transfer leader logic. + target := candidates.RandomPick() + targets := candidates.PickAll() + // `targets` MUST contains `target`, so only needs to check if `target` is nil here. + if target == nil { + schedulerCounter.WithLabelValues(name, "no-target-store").Inc() + return nil, errors.New("no target store to schedule") + } + targetIDs := make([]uint64, 0, len(targets)) + for _, t := range targets { + targetIDs = append(targetIDs, t.GetID()) + } + return operator.CreateTransferLeaderOperator(typ, cluster, region, region.GetLeader().GetStoreId(), target.GetID(), targetIDs, operator.OpLeader) +} + +// RecvRegionInfo receives a checked region from coordinator +func RecvRegionInfo(s schedule.Scheduler) chan<- *core.RegionInfo { + return s.(*trasferWitnessLeaderScheduler).regions +} diff --git a/server/schedulers/transfer_witness_leader_test.go b/server/schedulers/transfer_witness_leader_test.go new file mode 100644 index 00000000000..1b65515b1cc --- /dev/null +++ b/server/schedulers/transfer_witness_leader_test.go @@ -0,0 +1,99 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "context" + "testing" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/mock/mockcluster" + "github.com/tikv/pd/pkg/testutil" + "github.com/tikv/pd/server/config" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/schedule/operator" + "github.com/tikv/pd/server/storage" +) + +func TestTransferWitnessLeader(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(ctx, opt) + + // Add stores 1, 2, 3 + tc.AddLeaderStore(1, 0) + tc.AddLeaderStore(2, 0) + tc.AddLeaderStore(3, 0) + // Add regions 1 with leader in stores 1 + tc.AddLeaderRegion(1, 1, 2, 3) + + sl, err := schedule.CreateScheduler(TransferWitnessLeaderType, schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + RecvRegionInfo(sl) <- tc.GetRegion(1) + re.True(sl.IsScheduleAllowed(tc)) + ops, _ := sl.Schedule(tc, false) + testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2, 3}) + re.False(ops[0].Step(0).(operator.TransferLeader).IsFinish(tc.MockRegionInfo(1, 1, []uint64{2, 3}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0}))) + re.True(ops[0].Step(0).(operator.TransferLeader).IsFinish(tc.MockRegionInfo(1, 2, []uint64{1, 3}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0}))) +} + +func TestTransferWitnessLeaderWithUnhealthyPeer(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(ctx, opt) + sl, err := schedule.CreateScheduler(TransferWitnessLeaderType, schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + + // Add stores 1, 2, 3 + tc.AddLeaderStore(1, 0) + tc.AddLeaderStore(2, 0) + tc.AddLeaderStore(3, 0) + // Add region 1, which has 3 peers. 1 is leader. 2 is healthy or pending, 3 is healthy or down. + tc.AddLeaderRegion(1, 1, 2, 3) + region := tc.MockRegionInfo(1, 1, []uint64{2, 3}, nil, nil) + withDownPeer := core.WithDownPeers([]*pdpb.PeerStats{{ + Peer: region.GetPeers()[2], + DownSeconds: 1000, + }}) + withPendingPeer := core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[1]}) + + // only pending + tc.PutRegion(region.Clone(withPendingPeer)) + RecvRegionInfo(sl) <- tc.GetRegion(1) + ops, _ := sl.Schedule(tc, false) + testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{3}) + ops, _ = sl.Schedule(tc, false) + re.Nil(ops) + // only down + tc.PutRegion(region.Clone(withDownPeer)) + RecvRegionInfo(sl) <- tc.GetRegion(1) + ops, _ = sl.Schedule(tc, false) + testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2}) + // pending + down + tc.PutRegion(region.Clone(withPendingPeer, withDownPeer)) + ops, _ = sl.Schedule(tc, false) + re.Empty(ops) +} + +// TODO: add more tests with witness diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 953c02934e8..0819500e52d 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -135,10 +135,11 @@ func TestScheduler(t *testing.T) { // scheduler show command expected := map[string]bool{ - "balance-region-scheduler": true, - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + "transfer-witness-leader-scheduler": true, } checkSchedulerCommand(nil, expected) @@ -149,9 +150,10 @@ func TestScheduler(t *testing.T) { // scheduler delete command args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"} expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + "transfer-witness-leader-scheduler": true, } checkSchedulerCommand(args, expected) @@ -163,10 +165,11 @@ func TestScheduler(t *testing.T) { // scheduler add command args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, - schedulers[idx]: true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, } checkSchedulerCommand(args, expected) @@ -178,10 +181,11 @@ func TestScheduler(t *testing.T) { // scheduler config update command args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, - schedulers[idx]: true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, } checkSchedulerCommand(args, expected) @@ -192,29 +196,32 @@ func TestScheduler(t *testing.T) { // scheduler delete command args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]} expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + "transfer-witness-leader-scheduler": true, } checkSchedulerCommand(args, expected) // scheduler add command args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, - schedulers[idx]: true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, } checkSchedulerCommand(args, expected) // scheduler add command twice args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "4"} expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, - schedulers[idx]: true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, } checkSchedulerCommand(args, expected) @@ -225,10 +232,11 @@ func TestScheduler(t *testing.T) { // scheduler remove command [old] args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-4"} expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, - schedulers[idx]: true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, } checkSchedulerCommand(args, expected) @@ -239,19 +247,21 @@ func TestScheduler(t *testing.T) { // scheduler remove command, when remove the last store, it should remove whole scheduler args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-2"} expected = map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + "transfer-witness-leader-scheduler": true, } checkSchedulerCommand(args, expected) } // test shuffle region config checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "shuffle-region-scheduler"}, map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, - "shuffle-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + "shuffle-region-scheduler": true, + "transfer-witness-leader-scheduler": true, }) var roles []string mustExec([]string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) @@ -264,11 +274,12 @@ func TestScheduler(t *testing.T) { // test grant hot region scheduler config checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "1", "1,2,3"}, map[string]bool{ - "balance-leader-scheduler": true, - "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, - "shuffle-region-scheduler": true, - "grant-hot-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "split-bucket-scheduler": true, + "shuffle-region-scheduler": true, + "grant-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, }) var conf3 map[string]interface{} expected3 := map[string]interface{}{