Skip to content

Commit

Permalink
schedulers: add witness transfer leader scheduler (#5639)
Browse files Browse the repository at this point in the history
close #5638

add transfer leader scheduler

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
ethercflow and ti-chi-bot authored Nov 7, 2022
1 parent ea9b1e9 commit b3bd6da
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 42 deletions.
1 change: 1 addition & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.coordinator.CheckTransferWitnessLeader(region)

// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
Expand Down
17 changes: 17 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/plan"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/storage"
"go.uber.org/zap"
Expand Down Expand Up @@ -966,3 +967,19 @@ func (c *coordinator) getPausedSchedulerDelayUntil(name string) (int64, error) {
}
return s.GetDelayUntil(), nil
}

// CheckTransferWitnessLeader determines if transfer leader is required, then sends to the scheduler if needed
func (c *coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) {
if core.NeedTransferWitnessLeader(region) {
c.RLock()
s, ok := c.schedulers[schedulers.TransferWitnessLeaderName]
c.RUnlock()
if ok {
select {
case schedulers.RecvRegionInfo(s.Scheduler) <- region:
default:
log.Warn("drop transfer witness leader due to recv region channel full", zap.Uint64("region-id", region.GetID()))
}
}
}
}
8 changes: 8 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,3 +1421,11 @@ func (h HexRegionsMeta) String() string {
}
return strings.TrimSpace(b.String())
}

// NeedTransferWitnessLeader is used to judge if the region's leader is a witness
func NeedTransferWitnessLeader(region *RegionInfo) bool {
if region == nil || region.GetLeader() == nil {
return false
}
return region.GetLeader().IsWitness
}
140 changes: 140 additions & 0 deletions server/schedulers/transfer_witness_leader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/filter"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/plan"
"github.com/tikv/pd/server/storage/endpoint"
)

const (
// TransferWitnessLeaderName is transfer witness leader scheduler name.
TransferWitnessLeaderName = "transfer-witness-leader-scheduler"
// TransferWitnessLeaderType is transfer witness leader scheduler type.
TransferWitnessLeaderType = "transfer-witness-leader"
// TransferWitnessLeaderBatchSize is the number of operators to to transfer
// leaders by one scheduling
transferWitnessLeaderBatchSize = 3
// TransferWitnessLeaderRecvMaxRegionSize is the max number of region can receive
// TODO: make it a reasonable value
transferWitnessLeaderRecvMaxRegionSize = 1000
)

func init() {
schedule.RegisterSliceDecoderBuilder(TransferWitnessLeaderType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
return nil
}
})

schedule.RegisterScheduler(TransferWitnessLeaderType, func(opController *schedule.OperatorController, _ endpoint.ConfigStorage, _ schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newTransferWitnessLeaderScheduler(opController), nil
})
}

type trasferWitnessLeaderScheduler struct {
*BaseScheduler
regions chan *core.RegionInfo
}

// newTransferWitnessLeaderScheduler creates an admin scheduler that transfers witness leader of a region.
func newTransferWitnessLeaderScheduler(opController *schedule.OperatorController) schedule.Scheduler {
return &trasferWitnessLeaderScheduler{
BaseScheduler: NewBaseScheduler(opController),
regions: make(chan *core.RegionInfo, transferWitnessLeaderRecvMaxRegionSize),
}
}

func (s *trasferWitnessLeaderScheduler) GetName() string {
return TransferWitnessLeaderName
}

func (s *trasferWitnessLeaderScheduler) GetType() string {
return TransferWitnessLeaderType
}

func (s *trasferWitnessLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
// TODO: make sure the restriction is reasonable
allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc()
}
return allowed
}

func (s *trasferWitnessLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
return s.scheduleTransferWitnessLeaderBatch(s.GetName(), s.GetType(), cluster, transferWitnessLeaderBatchSize), nil
}

func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster schedule.Cluster, batchSize int) []*operator.Operator {
var ops []*operator.Operator
for i := 0; i < batchSize; i++ {
select {
case region := <-s.regions:
op, err := s.scheduleTransferWitnessLeader(name, typ, cluster, region)
if err != nil {
log.Debug("fail to create transfer leader operator", errs.ZapError(err))
continue
}
if op != nil {
op.SetPriorityLevel(core.Urgent)
op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(name, "new-operator"))
ops = append(ops, op)
}
default:
break
}
}
return ops
}

func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster schedule.Cluster, region *core.RegionInfo) (*operator.Operator, error) {
var filters []filter.Filter
unhealthyPeerStores := make(map[uint64]struct{})
for _, peer := range region.GetDownPeers() {
unhealthyPeerStores[peer.GetPeer().GetStoreId()] = struct{}{}
}
for _, peer := range region.GetPendingPeers() {
unhealthyPeerStores[peer.GetStoreId()] = struct{}{}
}
filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), &filter.StoreStateFilter{ActionScope: name, TransferLeader: true})
candidates := filter.NewCandidates(cluster.GetFollowerStores(region)).FilterTarget(cluster.GetOpts(), nil, nil, filters...)
// Compatible with old TiKV transfer leader logic.
target := candidates.RandomPick()
targets := candidates.PickAll()
// `targets` MUST contains `target`, so only needs to check if `target` is nil here.
if target == nil {
schedulerCounter.WithLabelValues(name, "no-target-store").Inc()
return nil, errors.New("no target store to schedule")
}
targetIDs := make([]uint64, 0, len(targets))
for _, t := range targets {
targetIDs = append(targetIDs, t.GetID())
}
return operator.CreateTransferLeaderOperator(typ, cluster, region, region.GetLeader().GetStoreId(), target.GetID(), targetIDs, operator.OpLeader)
}

// RecvRegionInfo receives a checked region from coordinator
func RecvRegionInfo(s schedule.Scheduler) chan<- *core.RegionInfo {
return s.(*trasferWitnessLeaderScheduler).regions
}
99 changes: 99 additions & 0 deletions server/schedulers/transfer_witness_leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"context"
"testing"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/testutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/storage"
)

func TestTransferWitnessLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)

// Add stores 1, 2, 3
tc.AddLeaderStore(1, 0)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(3, 0)
// Add regions 1 with leader in stores 1
tc.AddLeaderRegion(1, 1, 2, 3)

sl, err := schedule.CreateScheduler(TransferWitnessLeaderType, schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
RecvRegionInfo(sl) <- tc.GetRegion(1)
re.True(sl.IsScheduleAllowed(tc))
ops, _ := sl.Schedule(tc, false)
testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2, 3})
re.False(ops[0].Step(0).(operator.TransferLeader).IsFinish(tc.MockRegionInfo(1, 1, []uint64{2, 3}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0})))
re.True(ops[0].Step(0).(operator.TransferLeader).IsFinish(tc.MockRegionInfo(1, 2, []uint64{1, 3}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0})))
}

func TestTransferWitnessLeaderWithUnhealthyPeer(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
sl, err := schedule.CreateScheduler(TransferWitnessLeaderType, schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)

// Add stores 1, 2, 3
tc.AddLeaderStore(1, 0)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(3, 0)
// Add region 1, which has 3 peers. 1 is leader. 2 is healthy or pending, 3 is healthy or down.
tc.AddLeaderRegion(1, 1, 2, 3)
region := tc.MockRegionInfo(1, 1, []uint64{2, 3}, nil, nil)
withDownPeer := core.WithDownPeers([]*pdpb.PeerStats{{
Peer: region.GetPeers()[2],
DownSeconds: 1000,
}})
withPendingPeer := core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[1]})

// only pending
tc.PutRegion(region.Clone(withPendingPeer))
RecvRegionInfo(sl) <- tc.GetRegion(1)
ops, _ := sl.Schedule(tc, false)
testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{3})
ops, _ = sl.Schedule(tc, false)
re.Nil(ops)
// only down
tc.PutRegion(region.Clone(withDownPeer))
RecvRegionInfo(sl) <- tc.GetRegion(1)
ops, _ = sl.Schedule(tc, false)
testutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2})
// pending + down
tc.PutRegion(region.Clone(withPendingPeer, withDownPeer))
ops, _ = sl.Schedule(tc, false)
re.Empty(ops)
}

// TODO: add more tests with witness
Loading

0 comments on commit b3bd6da

Please sign in to comment.