From 914e19c1ae5a750b8c9cd24d3a22eb3900d6eb32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=B7=E6=B2=8CDM?= Date: Wed, 26 Oct 2022 15:29:57 +0800 Subject: [PATCH 1/4] operator: fix some error checks (#5629) close tikv/pd#5623 Signed-off-by: HunDunDM Co-authored-by: Ti Chi Robot --- server/schedule/operator/step.go | 43 +++++------ server/schedule/operator/step_test.go | 107 ++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 24 deletions(-) diff --git a/server/schedule/operator/step.go b/server/schedule/operator/step.go index 8374f8362ff..3000cd6a650 100644 --- a/server/schedule/operator/step.go +++ b/server/schedule/operator/step.go @@ -404,22 +404,24 @@ type PromoteLearner struct { } // ConfVerChanged returns the delta value for version increased by this step. +// It is also used by ChangePeerV2Leave. Since there are currently four roles, +// we need to confirm whether it is a Voter, not a DemotingVoter, etc. func (pl PromoteLearner) ConfVerChanged(region *core.RegionInfo) uint64 { peer := region.GetStoreVoter(pl.ToStore) - return typeutil.BoolToUint64(peer.GetId() == pl.PeerID) + return typeutil.BoolToUint64(peer.GetId() == pl.PeerID && peer.GetRole() == metapb.PeerRole_Voter) } func (pl PromoteLearner) String() string { return fmt.Sprintf("promote learner peer %v on store %v to voter", pl.PeerID, pl.ToStore) } -// IsFinish checks if current step is finished. +// IsFinish checks if current step is finished. It is also used by ChangePeerV2Leave. func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool { if peer := region.GetStoreVoter(pl.ToStore); peer != nil { if peer.GetId() != pl.PeerID { log.Warn("obtain unexpected peer", zap.String("expect", pl.String()), zap.Uint64("obtain-voter", peer.GetId())) } - return peer.GetId() == pl.PeerID + return peer.GetId() == pl.PeerID && peer.GetRole() == metapb.PeerRole_Voter } return false } @@ -643,9 +645,10 @@ func (dv DemoteVoter) String() string { } // ConfVerChanged returns the delta value for version increased by this step. -func (dv DemoteVoter) ConfVerChanged(region *core.RegionInfo) bool { - peer := region.GetStoreLearner(dv.ToStore) - return peer.GetId() == dv.PeerID +func (dv DemoteVoter) ConfVerChanged(region *core.RegionInfo) uint64 { + peer := region.GetStorePeer(dv.ToStore) + // the demoting peer may be removed later. + return typeutil.BoolToUint64(peer == nil || (peer.GetId() == dv.PeerID && peer.GetRole() == metapb.PeerRole_Learner)) } // IsFinish checks if current step is finished. @@ -700,7 +703,8 @@ func (cpe ChangePeerV2Enter) ConfVerChanged(region *core.RegionInfo) uint64 { } } for _, dv := range cpe.DemoteVoters { - peer := region.GetStoreVoter(dv.ToStore) + peer := region.GetStorePeer(dv.ToStore) + // the demoting peer may be removed later. if peer != nil && (peer.GetId() != dv.PeerID || !core.IsLearnerOrDemotingVoter(peer)) { return 0 } @@ -715,16 +719,16 @@ func (cpe ChangePeerV2Enter) IsFinish(region *core.RegionInfo) bool { if peer != nil && peer.GetId() != pl.PeerID { log.Warn("obtain unexpected peer", zap.String("expect", pl.String()), zap.Uint64("obtain-voter", peer.GetId())) } - if peer.GetId() != pl.PeerID || peer.GetRole() != metapb.PeerRole_IncomingVoter { + if peer.GetId() != pl.PeerID || !core.IsVoterOrIncomingVoter(peer) { return false } } for _, dv := range cpe.DemoteVoters { - peer := region.GetStoreVoter(dv.ToStore) + peer := region.GetStorePeer(dv.ToStore) if peer != nil && peer.GetId() != dv.PeerID { log.Warn("obtain unexpected peer", zap.String("expect", dv.String()), zap.Uint64("obtain-learner", peer.GetId())) } - if peer.GetId() != dv.PeerID || peer.GetRole() != metapb.PeerRole_DemotingVoter { + if peer.GetId() != dv.PeerID || !core.IsLearnerOrDemotingVoter(peer) { return false } } @@ -740,12 +744,10 @@ func (cpe ChangePeerV2Enter) CheckInProgress(_ ClusterInformer, region *core.Reg return errors.New("peer does not exist") } switch peer.GetRole() { - case metapb.PeerRole_Learner: + case metapb.PeerRole_Learner, metapb.PeerRole_Voter: notInJointState = true case metapb.PeerRole_IncomingVoter: inJointState = true - case metapb.PeerRole_Voter: - return errors.New("peer already is a voter") case metapb.PeerRole_DemotingVoter: return errors.New("cannot promote a demoting voter") default: @@ -758,12 +760,10 @@ func (cpe ChangePeerV2Enter) CheckInProgress(_ ClusterInformer, region *core.Reg return errors.New("peer does not exist") } switch peer.GetRole() { - case metapb.PeerRole_Voter: + case metapb.PeerRole_Voter, metapb.PeerRole_Learner: notInJointState = true case metapb.PeerRole_DemotingVoter: inJointState = true - case metapb.PeerRole_Learner: - return errors.New("peer already is a learner") case metapb.PeerRole_IncomingVoter: return errors.New("cannot demote a incoming voter") default: @@ -833,13 +833,12 @@ func (cpl ChangePeerV2Leave) String() string { // ConfVerChanged returns the delta value for version increased by this step. func (cpl ChangePeerV2Leave) ConfVerChanged(region *core.RegionInfo) uint64 { for _, pl := range cpl.PromoteLearners { - peer := region.GetStoreVoter(pl.ToStore) - if peer.GetId() != pl.PeerID || peer.GetRole() != metapb.PeerRole_Voter { + if pl.ConfVerChanged(region) == 0 { return 0 } } for _, dv := range cpl.DemoteVoters { - if region.GetStorePeer(dv.PeerID) != nil && !dv.ConfVerChanged(region) { + if dv.ConfVerChanged(region) == 0 { return 0 } } @@ -849,11 +848,7 @@ func (cpl ChangePeerV2Leave) ConfVerChanged(region *core.RegionInfo) uint64 { // IsFinish checks if current step is finished. func (cpl ChangePeerV2Leave) IsFinish(region *core.RegionInfo) bool { for _, pl := range cpl.PromoteLearners { - peer := region.GetStoreVoter(pl.ToStore) - if peer != nil && peer.GetId() != pl.PeerID { - log.Warn("obtain unexpected peer", zap.String("expect", pl.String()), zap.Uint64("obtain-voter", peer.GetId())) - } - if peer.GetId() != pl.PeerID || peer.GetRole() != metapb.PeerRole_Voter { + if !pl.IsFinish(region) { return false } } diff --git a/server/schedule/operator/step_test.go b/server/schedule/operator/step_test.go index 2b5141b8bd3..983723815a1 100644 --- a/server/schedule/operator/step_test.go +++ b/server/schedule/operator/step_test.go @@ -293,6 +293,113 @@ func (suite *operatorStepTestSuite) TestChangePeerV2Enter() { suite.check(cpe, desc, testCases) } +func (suite *operatorStepTestSuite) TestChangePeerV2EnterWithSingleChange() { + cpe := ChangePeerV2Enter{ + PromoteLearners: []PromoteLearner{{PeerID: 3, ToStore: 3}}, + } + testCases := []testCase{ + { // before step + []*metapb.Peer{ + {Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter}, + {Id: 3, StoreId: 3, Role: metapb.PeerRole_Learner}, + }, + 0, + false, + suite.NoError, + }, + { // after step + []*metapb.Peer{ + {Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter}, + {Id: 3, StoreId: 3, Role: metapb.PeerRole_IncomingVoter}, + }, + 1, + true, + suite.NoError, + }, + { // after step (direct) + []*metapb.Peer{ + {Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter}, + {Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter}, + }, + 1, + true, + suite.NoError, + }, + { // error role + []*metapb.Peer{ + {Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter}, + {Id: 3, StoreId: 3, Role: metapb.PeerRole_DemotingVoter}, + }, + 0, + false, + suite.Error, + }, + } + desc := "use joint consensus, promote learner peer 3 on store 3 to voter" + suite.check(cpe, desc, testCases) + + cpe = ChangePeerV2Enter{ + DemoteVoters: []DemoteVoter{{PeerID: 3, ToStore: 3}}, + } + testCases = []testCase{ + { // before step + []*metapb.Peer{ + {Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter}, + {Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter}, + }, + 0, + false, + suite.NoError, + }, + { // after step + []*metapb.Peer{ + {Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter}, + {Id: 3, StoreId: 3, Role: metapb.PeerRole_DemotingVoter}, + }, + 1, + true, + suite.NoError, + }, + { // after step (direct) + []*metapb.Peer{ + {Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter}, + {Id: 3, StoreId: 3, Role: metapb.PeerRole_Learner}, + }, + 1, + true, + suite.NoError, + }, + { // demote and remove peer + []*metapb.Peer{ + {Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter}, + }, + 1, // correct calculation is required + false, + suite.Error, + }, + { // error role + []*metapb.Peer{ + {Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter}, + {Id: 3, StoreId: 3, Role: metapb.PeerRole_IncomingVoter}, + }, + 0, + false, + suite.Error, + }, + } + desc = "use joint consensus, demote voter peer 3 on store 3 to learner" + suite.check(cpe, desc, testCases) +} + func (suite *operatorStepTestSuite) TestChangePeerV2Leave() { cpl := ChangePeerV2Leave{ PromoteLearners: []PromoteLearner{{PeerID: 3, ToStore: 3}, {PeerID: 4, ToStore: 4}}, From 8e2bd59fc68b0cf8e93e590cc60933976b4af694 Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Wed, 26 Oct 2022 20:25:57 +0800 Subject: [PATCH 2/4] schedule: batch to report the metrics of target filter (#5561) close tikv/pd#5538 Signed-off-by: bufferflies <1045931706@qq.com> --- plugin/scheduler_example/evict_leader.go | 2 +- server/schedule/checker/replica_strategy.go | 6 +- server/schedule/filter/candidates.go | 8 +- server/schedule/filter/candidates_test.go | 10 +- server/schedule/filter/counter.go | 213 ++++++++++++++++++++ server/schedule/filter/counter_test.go | 50 +++++ server/schedule/filter/filters.go | 177 ++++++++-------- server/schedule/filter/filters_test.go | 22 +- server/schedulers/balance_leader.go | 23 ++- server/schedulers/balance_region.go | 24 ++- server/schedulers/evict_leader.go | 2 +- server/schedulers/label.go | 2 +- server/schedulers/random_merge.go | 2 +- server/schedulers/shuffle_leader.go | 2 +- server/schedulers/shuffle_region.go | 4 +- server/schedulers/utils.go | 2 +- 16 files changed, 417 insertions(+), 132 deletions(-) create mode 100644 server/schedule/filter/counter.go create mode 100644 server/schedule/filter/counter_test.go diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index ef5a50a6803..77a2e11ea4d 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -225,7 +225,7 @@ func (s *evictLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ( continue } target := filter.NewCandidates(cluster.GetFollowerStores(region)). - FilterTarget(cluster.GetOpts(), nil, &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true}). + FilterTarget(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true}). RandomPick() if target == nil { continue diff --git a/server/schedule/checker/replica_strategy.go b/server/schedule/checker/replica_strategy.go index f609ccb01d1..92f35595f73 100644 --- a/server/schedule/checker/replica_strategy.go +++ b/server/schedule/checker/replica_strategy.go @@ -71,12 +71,12 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores) strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true} targetCandidate := filter.NewCandidates(s.cluster.GetStores()). - FilterTarget(s.cluster.GetOpts(), nil, filters...). + FilterTarget(s.cluster.GetOpts(), nil, nil, filters...). KeepTheTopStores(isolationComparer, false) // greater isolation score is better if targetCandidate.Len() == 0 { return 0, false } - target := targetCandidate.FilterTarget(s.cluster.GetOpts(), nil, strictStateFilter). + target := targetCandidate.FilterTarget(s.cluster.GetOpts(), nil, nil, strictStateFilter). PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), true) // less region score is better if target == nil { return 0, true // filter by temporary states @@ -123,7 +123,7 @@ func (s *ReplicaStrategy) swapStoreToFirst(stores []*core.StoreInfo, id uint64) func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo) uint64 { isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores) source := filter.NewCandidates(coLocationStores). - FilterSource(s.cluster.GetOpts(), nil, &filter.StoreStateFilter{ActionScope: replicaCheckerName, MoveRegion: true}). + FilterSource(s.cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: replicaCheckerName, MoveRegion: true}). KeepTheTopStores(isolationComparer, true). PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), false) if source == nil { diff --git a/server/schedule/filter/candidates.go b/server/schedule/filter/candidates.go index bfdaccf207f..3eb38d8cad2 100644 --- a/server/schedule/filter/candidates.go +++ b/server/schedule/filter/candidates.go @@ -35,14 +35,14 @@ func NewCandidates(stores []*core.StoreInfo) *StoreCandidates { } // FilterSource keeps stores that can pass all source filters. -func (c *StoreCandidates) FilterSource(opt *config.PersistOptions, collector *plan.Collector, filters ...Filter) *StoreCandidates { - c.Stores = SelectSourceStores(c.Stores, filters, opt, collector) +func (c *StoreCandidates) FilterSource(opt *config.PersistOptions, collector *plan.Collector, counter *Counter, filters ...Filter) *StoreCandidates { + c.Stores = SelectSourceStores(c.Stores, filters, opt, collector, counter) return c } // FilterTarget keeps stores that can pass all target filters. -func (c *StoreCandidates) FilterTarget(opt *config.PersistOptions, collector *plan.Collector, filters ...Filter) *StoreCandidates { - c.Stores = SelectTargetStores(c.Stores, filters, opt, collector) +func (c *StoreCandidates) FilterTarget(opt *config.PersistOptions, collector *plan.Collector, counter *Counter, filters ...Filter) *StoreCandidates { + c.Stores = SelectTargetStores(c.Stores, filters, opt, collector, counter) return c } diff --git a/server/schedule/filter/candidates_test.go b/server/schedule/filter/candidates_test.go index a49513977e0..78ed69c0f79 100644 --- a/server/schedule/filter/candidates_test.go +++ b/server/schedule/filter/candidates_test.go @@ -48,8 +48,8 @@ func idComparer2(a, b *core.StoreInfo) int { type idFilter func(uint64) bool -func (f idFilter) Scope() string { return "idFilter" } -func (f idFilter) Type() string { return "idFilter" } +func (f idFilter) Scope() string { return "idFilter" } +func (f idFilter) Type() filterType { return filterType(0) } func (f idFilter) Source(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { if f(store.GetID()) { return statusOK @@ -68,11 +68,11 @@ func (f idFilter) Target(opt *config.PersistOptions, store *core.StoreInfo) *pla func TestCandidates(t *testing.T) { re := require.New(t) cs := newTestCandidates(1, 2, 3, 4, 5) - cs.FilterSource(nil, nil, idFilter(func(id uint64) bool { return id > 2 })) + cs.FilterSource(nil, nil, nil, idFilter(func(id uint64) bool { return id > 2 })) check(re, cs, 3, 4, 5) - cs.FilterTarget(nil, nil, idFilter(func(id uint64) bool { return id%2 == 1 })) + cs.FilterTarget(nil, nil, nil, idFilter(func(id uint64) bool { return id%2 == 1 })) check(re, cs, 3, 5) - cs.FilterTarget(nil, nil, idFilter(func(id uint64) bool { return id > 100 })) + cs.FilterTarget(nil, nil, nil, idFilter(func(id uint64) bool { return id > 100 })) check(re, cs) store := cs.PickFirst() re.Nil(store) diff --git a/server/schedule/filter/counter.go b/server/schedule/filter/counter.go new file mode 100644 index 00000000000..9cdeb6aad97 --- /dev/null +++ b/server/schedule/filter/counter.go @@ -0,0 +1,213 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "strconv" +) + +type action int + +const ( + source action = iota + target + + actionLen +) + +var actions = [actionLen]string{ + "filter-source", + "filter-target", +} + +// String implements fmt.Stringer interface. +func (a action) String() string { + if a < actionLen { + return actions[a] + } + return "unknown" +} + +type scope int + +const ( + // BalanceLeader is the filter type for balance leader. + BalanceLeader scope = iota + // BalanceRegion is the filter type for balance region. + BalanceRegion + // BalanceHotRegion is the filter type for hot region. + BalanceHotRegion + // Label is the filter type for replica. + Label + + // EvictLeader is the filter type for evict leader. + EvictLeader + // RegionScatter is the filter type for scatter region. + RegionScatter + // ReplicaChecker is the filter type for replica. + ReplicaChecker + // RuleChecker is the filter type for rule. + RuleChecker + + // GrantHotLeader is the filter type for grant hot leader. + GrantHotLeader + // ShuffleHotRegion is the filter type for shuffle hot region. + ShuffleHotRegion + // ShuffleRegion is the filter type for shuffle region. + ShuffleRegion + // RandomMerge is the filter type for random merge. + RandomMerge + scopeLen +) + +var scopes = [scopeLen]string{ + "balance-leader-scheduler", + "balance-region-scheduler", + "balance-hot-region-scheduler", + "label-scheduler", + + "evict-leader-scheduler", + "region-scatter", + "replica-checker", + "rule-checker", + + "grant-hot-leader-scheduler", + "shuffle-region-scheduler", + "shuffle-region-scheduler", + "random-merge-scheduler", +} + +// String implements fmt.Stringer interface. +func (s scope) String() string { + if s >= scopeLen { + return "unknown" + } + return scopes[s] +} + +type filterType int + +const ( + excluded filterType = iota + storageThreshold + distinctScore + labelConstraint + ruleFit + ruleLeader + engine + specialUse + isolation + + storeStateOK + storeStateTombstone + storeStateDown + storeStateOffline + storeStatePauseLeader + storeStateSlow + storeStateDisconnected + storeStateBusy + storeStateExceedRemoveLimit + storeStateExceedAddLimit + storeStateTooManySnapshot + storeStateTooManyPendingPeer + storeStateRejectLeader + + filtersLen +) + +var filters = [filtersLen]string{ + "exclude-filter", + "storage-threshold-filter", + "distinct-filter", + "label-constraint-filter", + "rule-fit-filter", + "rule-fit-leader-filter", + "engine-filter", + "special-use-filter", + "isolation-filter", + + "store-state-ok-filter", + "store-state-tombstone-filter", + "store-state-down-filter", + "store-state-offline-filter", + "store-state-pause-leader-filter", + "store-state-slow-filter", + "store-state-disconnect-filter", + "store-state-busy-filter", + "store-state-exceed-remove-limit-filter", + "store-state-exceed-add-limit-filter", + "store-state-too-many-snapshots-filter", + "store-state-too-many-pending-peers-filter", + "store-state-reject-leader-filter", +} + +// String implements fmt.Stringer interface. +func (f filterType) String() string { + if f < filtersLen { + return filters[f] + } + + return "unknown" +} + +// Counter records the filter counter. +type Counter struct { + scope string + // record filter counter for each store. + // [action][type][sourceID][targetID]count + // [source-filter][rule-fit-filter]<1->2><10> + counter [][]map[uint64]map[uint64]int +} + +// NewCounter creates a Counter. +func NewCounter(scope string) *Counter { + counter := make([][]map[uint64]map[uint64]int, actionLen) + for i := range counter { + counter[i] = make([]map[uint64]map[uint64]int, filtersLen) + for k := range counter[i] { + counter[i][k] = make(map[uint64]map[uint64]int) + } + } + return &Counter{counter: counter, scope: scope} +} + +// Add adds the filter counter. +func (c *Counter) inc(action action, filterType filterType, sourceID uint64, targetID uint64) { + if _, ok := c.counter[action][filterType][sourceID]; !ok { + c.counter[action][filterType][sourceID] = make(map[uint64]int) + } + c.counter[action][filterType][sourceID][targetID]++ +} + +// Flush flushes the counter to the metrics. +func (c *Counter) Flush() { + for i, actions := range c.counter { + actionName := action(i).String() + for j, counters := range actions { + filterName := filterType(j).String() + for sourceID, count := range counters { + sourceIDStr := strconv.FormatUint(sourceID, 10) + for targetID, value := range count { + targetIDStr := strconv.FormatUint(sourceID, 10) + if value > 0 { + filterCounter.WithLabelValues(actionName, c.scope, filterName, sourceIDStr, targetIDStr). + Add(float64(value)) + counters[sourceID][targetID] = 0 + } + } + } + } + } +} diff --git a/server/schedule/filter/counter_test.go b/server/schedule/filter/counter_test.go new file mode 100644 index 00000000000..65531e26de7 --- /dev/null +++ b/server/schedule/filter/counter_test.go @@ -0,0 +1,50 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestString(t *testing.T) { + re := require.New(t) + testcases := []struct { + filterType int + expected string + }{ + {int(storeStateTombstone), "store-state-tombstone-filter"}, + {int(filtersLen - 1), "store-state-reject-leader-filter"}, + {int(filtersLen), "unknown"}, + } + + for _, data := range testcases { + re.Equal(data.expected, filterType(data.filterType).String()) + } + re.Equal(int(filtersLen), len(filters)) +} + +func TestCounter(t *testing.T) { + re := require.New(t) + counter := NewCounter(BalanceLeader.String()) + counter.inc(source, storeStateTombstone, 1, 2) + counter.inc(target, storeStateTombstone, 1, 2) + re.Equal(counter.counter[source][storeStateTombstone][1][2], 1) + re.Equal(counter.counter[target][storeStateTombstone][1][2], 1) + counter.Flush() + re.Equal(counter.counter[source][storeStateTombstone][1][2], 0) + re.Equal(counter.counter[target][storeStateTombstone][1][2], 0) +} diff --git a/server/schedule/filter/filters.go b/server/schedule/filter/filters.go index 645fa99d973..df364117d05 100644 --- a/server/schedule/filter/filters.go +++ b/server/schedule/filter/filters.go @@ -15,7 +15,6 @@ package filter import ( - "fmt" "strconv" "github.com/pingcap/kvproto/pkg/metapb" @@ -28,19 +27,19 @@ import ( "github.com/tikv/pd/server/schedule/plan" ) -const ( - filterSource = "filter-source" - filterTarget = "filter-target" -) - // SelectSourceStores selects stores that be selected as source store from the list. -func SelectSourceStores(stores []*core.StoreInfo, filters []Filter, opt *config.PersistOptions, collector *plan.Collector) []*core.StoreInfo { +func SelectSourceStores(stores []*core.StoreInfo, filters []Filter, opt *config.PersistOptions, collector *plan.Collector, + counter *Counter) []*core.StoreInfo { return filterStoresBy(stores, func(s *core.StoreInfo) bool { - sourceID := strconv.FormatUint(s.GetID(), 10) return slice.AllOf(filters, func(i int) bool { status := filters[i].Source(opt, s) if !status.IsOK() { - filterCounter.WithLabelValues(filterSource, filters[i].Scope(), filters[i].Type(), sourceID, "").Inc() + if counter != nil { + counter.inc(source, filters[i].Type(), s.GetID(), 0) + } else { + sourceID := strconv.FormatUint(s.GetID(), 10) + filterCounter.WithLabelValues(source.String(), filters[i].Scope(), filters[i].Type().String(), sourceID, "").Inc() + } if collector != nil { collector.Collect(plan.SetResource(s), plan.SetStatus(status)) } @@ -52,18 +51,25 @@ func SelectSourceStores(stores []*core.StoreInfo, filters []Filter, opt *config. } // SelectUnavailableTargetStores selects unavailable stores that can't be selected as target store from the list. -func SelectUnavailableTargetStores(stores []*core.StoreInfo, filters []Filter, opt *config.PersistOptions, collector *plan.Collector) []*core.StoreInfo { +func SelectUnavailableTargetStores(stores []*core.StoreInfo, filters []Filter, opt *config.PersistOptions, + collector *plan.Collector, counter *Counter) []*core.StoreInfo { return filterStoresBy(stores, func(s *core.StoreInfo) bool { targetID := strconv.FormatUint(s.GetID(), 10) return slice.AnyOf(filters, func(i int) bool { status := filters[i].Target(opt, s) if !status.IsOK() { cfilter, ok := filters[i].(comparingFilter) - sourceID := "" + sourceID := uint64(0) if ok { - sourceID = strconv.FormatUint(cfilter.GetSourceStoreID(), 10) + sourceID = cfilter.GetSourceStoreID() } - filterCounter.WithLabelValues(filterTarget, filters[i].Scope(), filters[i].Type(), targetID, sourceID).Inc() + if counter != nil { + counter.inc(target, filters[i].Type(), sourceID, s.GetID()) + } else { + filterCounter.WithLabelValues(target.String(), filters[i].Scope(), filters[i].Type().String(), + strconv.FormatUint(sourceID, 10), targetID).Inc() + } + if collector != nil { collector.Collect(plan.SetResourceWithStep(s, 2), plan.SetStatus(status)) } @@ -75,22 +81,33 @@ func SelectUnavailableTargetStores(stores []*core.StoreInfo, filters []Filter, o } // SelectTargetStores selects stores that be selected as target store from the list. -func SelectTargetStores(stores []*core.StoreInfo, filters []Filter, opt *config.PersistOptions, collector *plan.Collector) []*core.StoreInfo { +func SelectTargetStores(stores []*core.StoreInfo, filters []Filter, opt *config.PersistOptions, collector *plan.Collector, + counter *Counter) []*core.StoreInfo { + if len(filters) == 0 { + return stores + } + return filterStoresBy(stores, func(s *core.StoreInfo) bool { - targetID := strconv.FormatUint(s.GetID(), 10) return slice.AllOf(filters, func(i int) bool { filter := filters[i] status := filter.Target(opt, s) if !status.IsOK() { cfilter, ok := filter.(comparingFilter) - sourceID := "" + sourceID := uint64(0) if ok { - sourceID = strconv.FormatUint(cfilter.GetSourceStoreID(), 10) + sourceID = cfilter.GetSourceStoreID() + } + if counter != nil { + counter.inc(target, filter.Type(), sourceID, s.GetID()) + } else { + targetIDStr := strconv.FormatUint(s.GetID(), 10) + sourceIDStr := strconv.FormatUint(sourceID, 10) + filterCounter.WithLabelValues(target.String(), filter.Scope(), filter.Type().String(), sourceIDStr, targetIDStr).Inc() } - filterCounter.WithLabelValues(filterTarget, filters[i].Scope(), filters[i].Type(), sourceID, targetID).Inc() if collector != nil { collector.Collect(plan.SetResource(s), plan.SetStatus(status)) } + return false } return true @@ -111,7 +128,7 @@ func filterStoresBy(stores []*core.StoreInfo, keepPred func(*core.StoreInfo) boo type Filter interface { // Scope is used to indicate where the filter will act on. Scope() string - Type() string + Type() filterType // Return plan.Status to show whether be filtered as source Source(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status // Return plan.Status to show whether be filtered as target @@ -138,7 +155,7 @@ func Target(opt *config.PersistOptions, store *core.StoreInfo, filters []Filter) if ok { sourceID = strconv.FormatUint(cfilter.GetSourceStoreID(), 10) } - filterCounter.WithLabelValues(filterTarget, filter.Scope(), filter.Type(), sourceID, targetID).Inc() + filterCounter.WithLabelValues(target.String(), filter.Scope(), filter.Type().String(), sourceID, targetID).Inc() } return false } @@ -165,8 +182,8 @@ func (f *excludedFilter) Scope() string { return f.scope } -func (f *excludedFilter) Type() string { - return "exclude-filter" +func (f *excludedFilter) Type() filterType { + return excluded } func (f *excludedFilter) Source(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { @@ -195,8 +212,8 @@ func (f *storageThresholdFilter) Scope() string { return f.scope } -func (f *storageThresholdFilter) Type() string { - return "storage-threshold-filter" +func (f *storageThresholdFilter) Type() filterType { + return storageThreshold } func (f *storageThresholdFilter) Source(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { @@ -263,15 +280,15 @@ func (f *distinctScoreFilter) Scope() string { return f.scope } -func (f *distinctScoreFilter) Type() string { - return "distinct-filter" +func (f *distinctScoreFilter) Type() filterType { + return distinctScore } -func (f *distinctScoreFilter) Source(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *distinctScoreFilter) Source(_ *config.PersistOptions, _ *core.StoreInfo) *plan.Status { return statusOK } -func (f *distinctScoreFilter) Target(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *distinctScoreFilter) Target(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { score := core.DistinctScore(f.labels, f.stores, store) switch f.policy { case locationSafeguard: @@ -305,7 +322,7 @@ type StoreStateFilter struct { // Set true if allows temporary states. AllowTemporaryStates bool // Reason is used to distinguish the reason of store state filter - Reason string + Reason filterType } // Scope returns the scheduler or the checker which the filter acts on. @@ -314,103 +331,103 @@ func (f *StoreStateFilter) Scope() string { } // Type returns the type of the Filter. -func (f *StoreStateFilter) Type() string { - return fmt.Sprintf("store-state-%s-filter", f.Reason) +func (f *StoreStateFilter) Type() filterType { + return f.Reason } // conditionFunc defines condition to determine a store should be selected. // It should consider if the filter allows temporary states. type conditionFunc func(*config.PersistOptions, *core.StoreInfo) *plan.Status -func (f *StoreStateFilter) isRemoved(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *StoreStateFilter) isRemoved(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if store.IsRemoved() { - f.Reason = "tombstone" + f.Reason = storeStateTombstone return statusStoreRemoved } - f.Reason = "" + f.Reason = storeStateOK return statusOK } func (f *StoreStateFilter) isDown(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { if store.DownTime() > opt.GetMaxStoreDownTime() { - f.Reason = "down" + f.Reason = storeStateDown return statusStoreDown } - f.Reason = "" + f.Reason = storeStateOK return statusOK } -func (f *StoreStateFilter) isRemoving(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *StoreStateFilter) isRemoving(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if store.IsRemoving() { - f.Reason = "offline" + f.Reason = storeStateOffline return statusStoresRemoving } - f.Reason = "" + f.Reason = storeStateOK return statusOK } -func (f *StoreStateFilter) pauseLeaderTransfer(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *StoreStateFilter) pauseLeaderTransfer(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if !store.AllowLeaderTransfer() { - f.Reason = "pause-leader" + f.Reason = storeStatePauseLeader return statusStoreRejectLeader } - f.Reason = "" + f.Reason = storeStateOK return statusOK } func (f *StoreStateFilter) slowStoreEvicted(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { if store.EvictedAsSlowStore() { - f.Reason = "slow-store" + f.Reason = storeStateSlow return statusStoreRejectLeader } - f.Reason = "" + f.Reason = storeStateOK return statusOK } -func (f *StoreStateFilter) isDisconnected(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *StoreStateFilter) isDisconnected(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if !f.AllowTemporaryStates && store.IsDisconnected() { - f.Reason = "disconnected" + f.Reason = storeStateDisconnected return statusStoreDisconnected } - f.Reason = "" + f.Reason = storeStateOK return statusOK } -func (f *StoreStateFilter) isBusy(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *StoreStateFilter) isBusy(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if !f.AllowTemporaryStates && store.IsBusy() { - f.Reason = "busy" + f.Reason = storeStateBusy return statusStoreBusy } - f.Reason = "" + f.Reason = storeStateOK return statusOK } -func (f *StoreStateFilter) exceedRemoveLimit(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *StoreStateFilter) exceedRemoveLimit(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if !f.AllowTemporaryStates && !store.IsAvailable(storelimit.RemovePeer) { - f.Reason = "exceed-remove-limit" + f.Reason = storeStateExceedRemoveLimit return statusStoreRemoveLimit } - f.Reason = "" + f.Reason = storeStateOK return statusOK } -func (f *StoreStateFilter) exceedAddLimit(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *StoreStateFilter) exceedAddLimit(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if !f.AllowTemporaryStates && !store.IsAvailable(storelimit.AddPeer) { - f.Reason = "exceed-add-limit" + f.Reason = storeStateExceedAddLimit return statusStoreAddLimit } - f.Reason = "" + f.Reason = storeStateOK return statusOK } func (f *StoreStateFilter) tooManySnapshots(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { if !f.AllowTemporaryStates && (uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() || uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount()) { - f.Reason = "too-many-snapshot" + f.Reason = storeStateTooManySnapshot return statusStoreSnapshotThrottled } - f.Reason = "" + f.Reason = storeStateOK return statusOK } @@ -418,19 +435,19 @@ func (f *StoreStateFilter) tooManyPendingPeers(opt *config.PersistOptions, store if !f.AllowTemporaryStates && opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) { - f.Reason = "too-many-pending-peer" + f.Reason = storeStateTooManyPendingPeer return statusStorePendingPeerThrottled } - f.Reason = "" + f.Reason = storeStateOK return statusOK } func (f *StoreStateFilter) hasRejectLeaderProperty(opts *config.PersistOptions, store *core.StoreInfo) *plan.Status { if opts.CheckLabelProperty(config.RejectLeader, store.GetLabels()) { - f.Reason = "reject-leader" + f.Reason = storeStateRejectLeader return statusStoreRejectLeader } - f.Reason = "" + f.Reason = storeStateOK return statusOK } @@ -533,8 +550,8 @@ func (f labelConstraintFilter) Scope() string { } // Type returns the name of the filter. -func (f labelConstraintFilter) Type() string { - return "label-constraint-filter" +func (f labelConstraintFilter) Type() filterType { + return labelConstraint } // Source filters stores when select them as schedule source. @@ -546,7 +563,7 @@ func (f labelConstraintFilter) Source(opt *config.PersistOptions, store *core.St } // Target filters stores when select them as schedule target. -func (f labelConstraintFilter) Target(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f labelConstraintFilter) Target(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if placement.MatchLabelConstraints(store, f.constraints) { return statusOK } @@ -580,11 +597,11 @@ func (f *ruleFitFilter) Scope() string { return f.scope } -func (f *ruleFitFilter) Type() string { - return "rule-fit-filter" +func (f *ruleFitFilter) Type() filterType { + return ruleFit } -func (f *ruleFitFilter) Source(options *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *ruleFitFilter) Source(_ *config.PersistOptions, _ *core.StoreInfo) *plan.Status { return statusOK } @@ -633,11 +650,11 @@ func (f *ruleLeaderFitFilter) Scope() string { return f.scope } -func (f *ruleLeaderFitFilter) Type() string { - return "rule-fit-leader-filter" +func (f *ruleLeaderFitFilter) Type() filterType { + return ruleLeader } -func (f *ruleLeaderFitFilter) Source(options *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *ruleLeaderFitFilter) Source(_ *config.PersistOptions, _ *core.StoreInfo) *plan.Status { return statusOK } @@ -688,18 +705,18 @@ func (f *engineFilter) Scope() string { return f.scope } -func (f *engineFilter) Type() string { - return "engine-filter" +func (f *engineFilter) Type() filterType { + return engine } -func (f *engineFilter) Source(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *engineFilter) Source(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if f.constraint.MatchStore(store) { return statusOK } return statusStoreNotMatchRule } -func (f *engineFilter) Target(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *engineFilter) Target(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { if f.constraint.MatchStore(store) { return statusOK } @@ -731,8 +748,8 @@ func (f *specialUseFilter) Scope() string { return f.scope } -func (f *specialUseFilter) Type() string { - return "special-use-filter" +func (f *specialUseFilter) Type() filterType { + return specialUse } func (f *specialUseFilter) Source(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { @@ -805,15 +822,15 @@ func (f *isolationFilter) Scope() string { return f.scope } -func (f *isolationFilter) Type() string { - return "isolation-filter" +func (f *isolationFilter) Type() filterType { + return isolation } func (f *isolationFilter) Source(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { return statusOK } -func (f *isolationFilter) Target(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { +func (f *isolationFilter) Target(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { // No isolation constraint to fit if len(f.constraintSet) == 0 { return statusStoreNotMatchIsolation diff --git a/server/schedule/filter/filters_test.go b/server/schedule/filter/filters_test.go index b9d225d802c..adb7d8aeb69 100644 --- a/server/schedule/filter/filters_test.go +++ b/server/schedule/filter/filters_test.go @@ -210,26 +210,26 @@ func TestStoreStateFilterReason(t *testing.T) { check := func(store *core.StoreInfo, testCases []testCase) { for _, testCase := range testCases { filters[testCase.filterIdx].Source(opt, store) - re.Equal(testCase.sourceReason, filters[testCase.filterIdx].(*StoreStateFilter).Reason) + re.Equal(testCase.sourceReason, filters[testCase.filterIdx].(*StoreStateFilter).Reason.String()) filters[testCase.filterIdx].Source(opt, store) - re.Equal(testCase.targetReason, filters[testCase.filterIdx].(*StoreStateFilter).Reason) + re.Equal(testCase.targetReason, filters[testCase.filterIdx].(*StoreStateFilter).Reason.String()) } } // No reason catched store = store.Clone(core.SetLastHeartbeatTS(time.Now())) testCases := []testCase{ - {2, "", ""}, + {2, "store-state-ok-filter", "store-state-ok-filter"}, } check(store, testCases) // Disconnected store = store.Clone(core.SetLastHeartbeatTS(time.Now().Add(-5 * time.Minute))) testCases = []testCase{ - {0, "disconnected", "disconnected"}, - {1, "", ""}, - {2, "disconnected", "disconnected"}, - {3, "", ""}, + {0, "store-state-disconnect-filter", "store-state-disconnect-filter"}, + {1, "store-state-ok-filter", "store-state-ok-filter"}, + {2, "store-state-disconnect-filter", "store-state-disconnect-filter"}, + {3, "store-state-ok-filter", "store-state-ok-filter"}, } check(store, testCases) @@ -237,10 +237,10 @@ func TestStoreStateFilterReason(t *testing.T) { store = store.Clone(core.SetLastHeartbeatTS(time.Now())). Clone(core.SetStoreStats(&pdpb.StoreStats{IsBusy: true})) testCases = []testCase{ - {0, "", ""}, - {1, "busy", "busy"}, - {2, "busy", "busy"}, - {3, "", ""}, + {0, "store-state-ok-filter", "store-state-ok-filter"}, + {1, "store-state-busy-filter", "store-state-busy-filter"}, + {2, "store-state-busy-filter", "store-state-busy-filter"}, + {3, "store-state-ok-filter", "store-state-ok-filter"}, } check(store, testCases) } diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index c1632d48abe..209b94feb82 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -177,12 +177,13 @@ func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, r *http.R type balanceLeaderScheduler struct { *BaseScheduler *retryQuota - name string - conf *balanceLeaderSchedulerConfig - handler http.Handler - opController *schedule.OperatorController - filters []filter.Filter - counter *prometheus.CounterVec + name string + conf *balanceLeaderSchedulerConfig + handler http.Handler + opController *schedule.OperatorController + filters []filter.Filter + counter *prometheus.CounterVec + filterCounter *filter.Counter } // newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on @@ -197,6 +198,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf * handler: newBalanceLeaderHandler(conf), opController: opController, counter: balanceLeaderCounter, + filterCounter: filter.NewCounter(filter.BalanceLeader.String()), } for _, option := range options { option(s) @@ -363,8 +365,8 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) scoreFunc := func(store *core.StoreInfo) float64 { return store.LeaderScore(solver.kind.Policy, solver.GetOpInfluence(store.GetID())) } - sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetOpts(), collector), false, scoreFunc) - targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetOpts(), nil), true, scoreFunc) + sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetOpts(), collector, l.filterCounter), false, scoreFunc) + targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetOpts(), nil, l.filterCounter), true, scoreFunc) usedRegions := make(map[uint64]struct{}) result := make([]*operator.Operator, 0, batch) @@ -392,6 +394,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) } } } + l.filterCounter.Flush() l.retryQuota.GC(append(sourceCandidate.stores, targetCandidate.stores...)) return result, collector.GetPlans() } @@ -472,7 +475,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), opts, solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source, false /*allowMoveLeader*/); leaderFilter != nil { finalFilters = append(l.filters, leaderFilter) } - targets = filter.SelectTargetStores(targets, finalFilters, opts, collector) + targets = filter.SelectTargetStores(targets, finalFilters, opts, collector, l.filterCounter) leaderSchedulePolicy := opts.GetLeaderSchedulePolicy() sort.Slice(targets, func(i, j int) bool { iOp := solver.GetOpInfluence(targets[i].GetID()) @@ -522,7 +525,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla finalFilters = append(l.filters, leaderFilter) } target := filter.NewCandidates([]*core.StoreInfo{solver.target}). - FilterTarget(opts, nil, finalFilters...). + FilterTarget(opts, nil, l.filterCounter, finalFilters...). PickFirst() if target == nil { log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.region.GetID())) diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 8f291f09e84..8209214691f 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -73,10 +73,11 @@ type balanceRegionSchedulerConfig struct { type balanceRegionScheduler struct { *BaseScheduler *retryQuota - conf *balanceRegionSchedulerConfig - opController *schedule.OperatorController - filters []filter.Filter - counter *prometheus.CounterVec + conf *balanceRegionSchedulerConfig + opController *schedule.OperatorController + filters []filter.Filter + counter *prometheus.CounterVec + filterCounter *filter.Counter } // newBalanceRegionScheduler creates a scheduler that tends to keep regions on @@ -89,6 +90,7 @@ func newBalanceRegionScheduler(opController *schedule.OperatorController, conf * conf: conf, opController: opController, counter: balanceRegionCounter, + filterCounter: filter.NewCounter(filter.BalanceRegion.String()), } for _, setOption := range opts { setOption(scheduler) @@ -146,8 +148,8 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() stores := cluster.GetStores() opts := cluster.GetOpts() - faultTargets := filter.SelectUnavailableTargetStores(stores, s.filters, opts, collector) - sourceStores := filter.SelectSourceStores(stores, s.filters, opts, collector) + faultTargets := filter.SelectUnavailableTargetStores(stores, s.filters, opts, collector, s.filterCounter) + sourceStores := filter.SelectSourceStores(stores, s.filters, opts, collector, s.filterCounter) opInfluence := s.opController.GetOpInfluence(cluster) s.OpController.GetFastOpInfluence(cluster, opInfluence) kind := core.NewScheduleKind(core.RegionKind, core.BySize) @@ -183,7 +185,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) } for i := 0; i < retryLimit; i++ { // Priority pick the region that has a pending peer. - // Pending region may means the disk is overload, remove the pending region firstly. + // Pending region may mean the disk is overload, remove the pending region firstly. solver.region = filter.SelectOneRegion(cluster.RandPendingRegions(solver.SourceStoreID(), s.conf.Ranges), collector, baseRegionFilters...) if solver.region == nil { @@ -197,7 +199,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) append(baseRegionFilters, pendingFilter)...) } if solver.region == nil { - // Finally pick learner. + // Finally, pick learner. solver.region = filter.SelectOneRegion(cluster.RandLearnerRegions(solver.SourceStoreID(), s.conf.Ranges), collector, append(baseRegionFilters, pendingFilter)...) } @@ -215,7 +217,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) schedulerCounter.WithLabelValues(s.GetName(), "region-hot").Inc() continue } - // Check region whether have leader + // Check region leader if solver.region.GetLeader() == nil { log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.region.GetID())) if collector != nil { @@ -234,6 +236,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) } s.retryQuota.Attenuate(solver.source) } + s.filterCounter.Flush() s.retryQuota.GC(stores) return nil, collector.GetPlans() } @@ -250,8 +253,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co filter.NewExcludedFilter(s.GetName(), nil, excludeTargets), filter.NewPlacementSafeguard(s.GetName(), solver.GetOpts(), solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source), } - - candidates := filter.NewCandidates(dstStores).FilterTarget(solver.GetOpts(), collector, filters...) + candidates := filter.NewCandidates(dstStores).FilterTarget(solver.GetOpts(), collector, s.filterCounter, filters...) if len(candidates.Stores) != 0 { solver.step++ } diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index 2e807f18a9a..c13c4eb4c70 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -330,7 +330,7 @@ func scheduleEvictLeaderOnce(name, typ string, cluster schedule.Cluster, conf ev filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true}) candidates := filter.NewCandidates(cluster.GetFollowerStores(region)). - FilterTarget(cluster.GetOpts(), nil, filters...) + FilterTarget(cluster.GetOpts(), nil, nil, filters...) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() targets := candidates.PickAll() diff --git a/server/schedulers/label.go b/server/schedulers/label.go index b2d8423dd17..e566b317d7e 100644 --- a/server/schedulers/label.go +++ b/server/schedulers/label.go @@ -127,7 +127,7 @@ func (s *labelScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*ope f := filter.NewExcludedFilter(s.GetName(), nil, excludeStores) target := filter.NewCandidates(cluster.GetFollowerStores(region)). - FilterTarget(cluster.GetOpts(), nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true}, f). + FilterTarget(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true}, f). RandomPick() if target == nil { log.Debug("label scheduler no target found for region", zap.Uint64("region-id", region.GetID())) diff --git a/server/schedulers/random_merge.go b/server/schedulers/random_merge.go index 53c6eb1cf33..c1b0be3bac4 100644 --- a/server/schedulers/random_merge.go +++ b/server/schedulers/random_merge.go @@ -104,7 +104,7 @@ func (s *randomMergeScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ( schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() store := filter.NewCandidates(cluster.GetStores()). - FilterSource(cluster.GetOpts(), nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true}). + FilterSource(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true}). RandomPick() if store == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-source-store").Inc() diff --git a/server/schedulers/shuffle_leader.go b/server/schedulers/shuffle_leader.go index da9c9777f60..c07f1abb320 100644 --- a/server/schedulers/shuffle_leader.go +++ b/server/schedulers/shuffle_leader.go @@ -110,7 +110,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) // 2. transfer a leader to the store. schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() targetStore := filter.NewCandidates(cluster.GetStores()). - FilterTarget(cluster.GetOpts(), nil, s.filters...). + FilterTarget(cluster.GetOpts(), nil, nil, s.filters...). RandomPick() if targetStore == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-target-store").Inc() diff --git a/server/schedulers/shuffle_region.go b/server/schedulers/shuffle_region.go index eeba44ef6af..acd822a3902 100644 --- a/server/schedulers/shuffle_region.go +++ b/server/schedulers/shuffle_region.go @@ -130,7 +130,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster schedule.Cluster) (*core.RegionInfo, *metapb.Peer) { candidates := filter.NewCandidates(cluster.GetStores()). - FilterSource(cluster.GetOpts(), nil, s.filters...). + FilterSource(cluster.GetOpts(), nil, nil, s.filters...). Shuffle() pendingFilter := filter.NewRegionPendingFilter() @@ -169,7 +169,7 @@ func (s *shuffleRegionScheduler) scheduleAddPeer(cluster schedule.Cluster, regio excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIDs()) target := filter.NewCandidates(cluster.GetStores()). - FilterTarget(cluster.GetOpts(), nil, append(s.filters, scoreGuard, excludedFilter)...). + FilterTarget(cluster.GetOpts(), nil, nil, append(s.filters, scoreGuard, excludedFilter)...). RandomPick() if target == nil { return nil diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index acd1fbe84f9..2228c8af934 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -146,7 +146,7 @@ func (p *solver) shouldBalance(scheduleName string) bool { // Make sure after move, source score is still greater than target score. shouldBalance := p.sourceScore > p.targetScore - if !shouldBalance { + if !shouldBalance && log.GetLevel() <= zap.DebugLevel { log.Debug("skip balance "+p.kind.Resource.String(), zap.String("scheduler", scheduleName), zap.Uint64("region-id", p.region.GetID()), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID), zap.Int64("source-size", p.source.GetRegionSize()), zap.Float64("source-score", p.sourceScore), From d033fbf11a5daadb1961d4c2d5a4d9befbfa820d Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 27 Oct 2022 12:01:57 +0800 Subject: [PATCH 3/4] *: reduce call when comparing key (#5628) ref tikv/pd#5606 Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- pkg/rangetree/range_tree.go | 3 +- server/core/region.go | 26 ++++++++--------- server/core/region_test.go | 4 +-- server/core/region_tree.go | 52 ++++++++++++++++----------------- server/core/region_tree_test.go | 26 ++++++++--------- 5 files changed, 56 insertions(+), 55 deletions(-) diff --git a/pkg/rangetree/range_tree.go b/pkg/rangetree/range_tree.go index 47d7e960a0e..174e338f775 100644 --- a/pkg/rangetree/range_tree.go +++ b/pkg/rangetree/range_tree.go @@ -76,9 +76,10 @@ func (r *RangeTree) GetOverlaps(item RangeItem) []RangeItem { } var overlaps []RangeItem + endKey := item.GetEndKey() r.tree.AscendGreaterOrEqual(result, func(i btree.Item) bool { over := i.(RangeItem) - if len(item.GetEndKey()) > 0 && bytes.Compare(item.GetEndKey(), over.GetStartKey()) <= 0 { + if len(endKey) > 0 && bytes.Compare(endKey, over.GetStartKey()) <= 0 { return false } overlaps = append(overlaps, over) diff --git a/server/core/region.go b/server/core/region.go index d041ede2cb9..cdf961749c4 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -702,7 +702,7 @@ func (rm regionMap) Get(id uint64) *regionItem { // If the regionItem already exists, it will be overwritten. // Note: Do not use this function when you only need to update the RegionInfo and do not need a new regionItem. func (rm regionMap) AddNew(region *RegionInfo) *regionItem { - item := ®ionItem{region: region} + item := ®ionItem{RegionInfo: region} rm[region.GetID()] = item return item } @@ -738,7 +738,7 @@ func NewRegionsInfo() *RegionsInfo { // GetRegion returns the RegionInfo with regionID func (r *RegionsInfo) GetRegion(regionID uint64) *RegionInfo { if item := r.regions.Get(regionID); item != nil { - return item.region + return item.RegionInfo } return nil } @@ -750,7 +750,7 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) (overlaps []*RegionInfo) { rangeChanged := true // This Region is new, or its range has changed. if item = r.regions.Get(region.GetID()); item != nil { // If this ID already exists, use the existing regionItem and pick out the origin. - origin := item.region + origin := item.RegionInfo rangeChanged = !origin.rangeEqualsTo(region) if rangeChanged { // Delete itself in regionTree so that overlaps will not contain itself. @@ -765,14 +765,14 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) (overlaps []*RegionInfo) { // If the peers are not changed, only the statistical on the sub regionTree needs to be updated. r.updateSubTreeStat(origin, region) // Update the RegionInfo in the regionItem. - item.region = region + item.RegionInfo = region return } // If the range or peers have changed, the sub regionTree needs to be cleaned up. // TODO: Improve performance by deleting only the different peers. r.removeRegionFromSubTree(origin) // Update the RegionInfo in the regionItem. - item.region = region + item.RegionInfo = region } else { // If this ID does not exist, generate a new regionItem and save it in the regionMap. item = r.regions.AddNew(region) @@ -963,7 +963,7 @@ func (r *RegionsInfo) GetPrevRegionByKey(regionKey []byte) *RegionInfo { func (r *RegionsInfo) GetRegions() []*RegionInfo { regions := make([]*RegionInfo, 0, r.regions.Len()) for _, item := range r.regions { - regions = append(regions, item.region) + regions = append(regions, item.RegionInfo) } return regions } @@ -1027,7 +1027,7 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo func (r *RegionsInfo) GetMetaRegions() []*metapb.Region { regions := make([]*metapb.Region, 0, r.regions.Len()) for _, item := range r.regions { - regions = append(regions, typeutil.DeepClone(item.region.meta, RegionFactory)) + regions = append(regions, typeutil.DeepClone(item.meta, RegionFactory)) } return regions } @@ -1110,7 +1110,7 @@ func (r *RegionsInfo) RandLearnerRegions(storeID uint64, ranges []KeyRange, n in // GetLeader returns leader RegionInfo by storeID and regionID (now only used in test) func (r *RegionsInfo) GetLeader(storeID uint64, region *RegionInfo) *RegionInfo { if leaders, ok := r.leaders[storeID]; ok { - return leaders.find(region).region + return leaders.find(region).RegionInfo } return nil } @@ -1118,7 +1118,7 @@ func (r *RegionsInfo) GetLeader(storeID uint64, region *RegionInfo) *RegionInfo // GetFollower returns follower RegionInfo by storeID and regionID (now only used in test) func (r *RegionsInfo) GetFollower(storeID uint64, region *RegionInfo) *RegionInfo { if followers, ok := r.followers[storeID]; ok { - return followers.find(region).region + return followers.find(region).RegionInfo } return nil } @@ -1215,11 +1215,11 @@ func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *Regi p, n := r.tree.getAdjacentRegions(region) var prev, next *RegionInfo // check key to avoid key range hole - if p != nil && bytes.Equal(p.region.GetEndKey(), region.GetStartKey()) { - prev = r.GetRegion(p.region.GetID()) + if p != nil && bytes.Equal(p.GetEndKey(), region.GetStartKey()) { + prev = r.GetRegion(p.GetID()) } - if n != nil && bytes.Equal(region.GetEndKey(), n.region.GetStartKey()) { - next = r.GetRegion(n.region.GetID()) + if n != nil && bytes.Equal(region.GetEndKey(), n.GetStartKey()) { + next = r.GetRegion(n.GetID()) } return prev, next } diff --git a/server/core/region_test.go b/server/core/region_test.go index f3f0c917a04..93367f8dbbc 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -401,7 +401,7 @@ func regionInfo(id uint64) *RegionInfo { func check(re *require.Assertions, rm regionMap, ids ...uint64) { // Check Get. for _, id := range ids { - re.Equal(id, rm.Get(id).region.GetID()) + re.Equal(id, rm.Get(id).GetID()) } // Check Len. re.Equal(len(ids), rm.Len()) @@ -412,7 +412,7 @@ func check(re *require.Assertions, rm regionMap, ids ...uint64) { } set1 := make(map[uint64]struct{}) for _, r := range rm { - set1[r.region.GetID()] = struct{}{} + set1[r.GetID()] = struct{}{} } re.Equal(expect, set1) } diff --git a/server/core/region_tree.go b/server/core/region_tree.go index a5d717a93e9..69056675837 100644 --- a/server/core/region_tree.go +++ b/server/core/region_tree.go @@ -30,28 +30,28 @@ import ( var _ rangetree.RangeItem = ®ionItem{} type regionItem struct { - region *RegionInfo + *RegionInfo } // GetStartKey returns the start key of the region. func (r *regionItem) GetStartKey() []byte { - return r.region.GetStartKey() + return r.meta.StartKey } // GetEndKey returns the end key of the region. func (r *regionItem) GetEndKey() []byte { - return r.region.GetEndKey() + return r.meta.EndKey } // Less returns true if the region start key is less than the other. func (r *regionItem) Less(other btree.Item) bool { - left := r.region.GetStartKey() - right := other.(rangetree.RangeItem).GetStartKey() + left := r.meta.StartKey + right := other.(*regionItem).meta.StartKey return bytes.Compare(left, right) < 0 } func (r *regionItem) Contains(key []byte) bool { - start, end := r.region.GetStartKey(), r.region.GetEndKey() + start, end := r.GetStartKey(), r.GetEndKey() return bytes.Compare(key, start) >= 0 && (len(end) == 0 || bytes.Compare(key, end) < 0) } @@ -88,11 +88,11 @@ func (t *regionTree) length() int { // getOverlaps gets the regions which are overlapped with the specified region range. func (t *regionTree) getOverlaps(region *RegionInfo) []*RegionInfo { - item := ®ionItem{region: region} + item := ®ionItem{RegionInfo: region} result := t.tree.GetOverlaps(item) overlaps := make([]*RegionInfo, len(result)) for i, r := range result { - overlaps[i] = r.(*regionItem).region + overlaps[i] = r.(*regionItem).RegionInfo } return overlaps } @@ -101,7 +101,7 @@ func (t *regionTree) getOverlaps(region *RegionInfo) []*RegionInfo { // It finds and deletes all the overlapped regions first, and then // insert the region. func (t *regionTree) update(item *regionItem) []*RegionInfo { - region := item.region + region := item.RegionInfo t.totalSize += region.approximateSize regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate() t.totalWriteBytesRate += regionWriteBytesRate @@ -110,7 +110,7 @@ func (t *regionTree) update(item *regionItem) []*RegionInfo { overlaps := t.tree.Update(item) result := make([]*RegionInfo, len(overlaps)) for i, overlap := range overlaps { - old := overlap.(*regionItem).region + old := overlap.(*regionItem).RegionInfo result[i] = old log.Debug("overlapping region", zap.Uint64("region-id", old.GetID()), @@ -125,7 +125,7 @@ func (t *regionTree) update(item *regionItem) []*RegionInfo { return result } -// updateStat is used to update statistics when regionItem.region is directly replaced. +// updateStat is used to update statistics when regionItem.RegionInfo is directly replaced. func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) { t.totalSize += region.approximateSize regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate() @@ -145,14 +145,14 @@ func (t *regionTree) remove(region *RegionInfo) { if t.length() == 0 { return } - item := ®ionItem{region: region} + item := ®ionItem{RegionInfo: region} result := t.tree.Find(item) - if result == nil || result.(*regionItem).region.GetID() != region.GetID() { + if result == nil || result.(*regionItem).GetID() != region.GetID() { return } - t.totalSize -= result.(*regionItem).region.GetApproximateSize() - regionWriteBytesRate, regionWriteKeysRate := result.(*regionItem).region.GetWriteRate() + t.totalSize -= result.(*regionItem).GetApproximateSize() + regionWriteBytesRate, regionWriteKeysRate := result.(*regionItem).GetWriteRate() t.totalWriteBytesRate -= regionWriteBytesRate t.totalWriteKeysRate -= regionWriteKeysRate t.tree.Remove(result) @@ -165,7 +165,7 @@ func (t *regionTree) search(regionKey []byte) *RegionInfo { if result == nil { return nil } - return result.region + return result.RegionInfo } // searchPrev returns the previous region of the region where the regionKey is located. @@ -175,20 +175,20 @@ func (t *regionTree) searchPrev(regionKey []byte) *RegionInfo { if curRegionItem == nil { return nil } - prevRegionItem, _ := t.getAdjacentRegions(curRegionItem.region) + prevRegionItem, _ := t.getAdjacentRegions(curRegionItem.RegionInfo) if prevRegionItem == nil { return nil } - if !bytes.Equal(prevRegionItem.region.GetEndKey(), curRegionItem.region.GetStartKey()) { + if !bytes.Equal(prevRegionItem.GetEndKey(), curRegionItem.GetStartKey()) { return nil } - return prevRegionItem.region + return prevRegionItem.RegionInfo } // find is a helper function to find an item that contains the regions start // key. func (t *regionTree) find(region *RegionInfo) *regionItem { - item := t.tree.Find(®ionItem{region: region}) + item := t.tree.Find(®ionItem{RegionInfo: region}) if item == nil { return nil } @@ -205,9 +205,9 @@ func (t *regionTree) scanRange(startKey []byte, f func(*RegionInfo) bool) { // find if there is a region with key range [s, d), s < startKey < d fn := func(item rangetree.RangeItem) bool { r := item.(*regionItem) - return f(r.region) + return f(r.RegionInfo) } - t.tree.ScanRange(®ionItem{region: region}, fn) + t.tree.ScanRange(®ionItem{RegionInfo: region}, fn) } func (t *regionTree) scanRanges() []*RegionInfo { @@ -223,7 +223,7 @@ func (t *regionTree) scanRanges() []*RegionInfo { } func (t *regionTree) getAdjacentRegions(region *RegionInfo) (*regionItem, *regionItem) { - item := ®ionItem{region: &RegionInfo{meta: &metapb.Region{StartKey: region.GetStartKey()}}} + item := ®ionItem{RegionInfo: &RegionInfo{meta: &metapb.Region{StartKey: region.GetStartKey()}}} prevItem, nextItem := t.tree.GetAdjacentItem(item) var prev, next *regionItem if prevItem != nil { @@ -248,10 +248,10 @@ func (t *regionTree) RandomRegion(ranges []KeyRange) *RegionInfo { for _, i := range rand.Perm(len(ranges)) { var endIndex int startKey, endKey := ranges[i].StartKey, ranges[i].EndKey - startRegion, startIndex := t.tree.GetWithIndex(®ionItem{region: &RegionInfo{meta: &metapb.Region{StartKey: startKey}}}) + startRegion, startIndex := t.tree.GetWithIndex(®ionItem{RegionInfo: &RegionInfo{meta: &metapb.Region{StartKey: startKey}}}) if len(endKey) != 0 { - _, endIndex = t.tree.GetWithIndex(®ionItem{region: &RegionInfo{meta: &metapb.Region{StartKey: endKey}}}) + _, endIndex = t.tree.GetWithIndex(®ionItem{RegionInfo: &RegionInfo{meta: &metapb.Region{StartKey: endKey}}}) } else { endIndex = t.tree.Len() } @@ -272,7 +272,7 @@ func (t *regionTree) RandomRegion(ranges []KeyRange) *RegionInfo { continue } index := rand.Intn(endIndex-startIndex) + startIndex - region := t.tree.GetAt(index).(*regionItem).region + region := t.tree.GetAt(index).(*regionItem).RegionInfo if region.isInvolved(startKey, endKey) { return region } diff --git a/server/core/region_tree_test.go b/server/core/region_tree_test.go index a8a8d6a3ba7..3237a5daf09 100644 --- a/server/core/region_tree_test.go +++ b/server/core/region_tree_test.go @@ -185,28 +185,28 @@ func TestRegionTree(t *testing.T) { // check get adjacent regions prev, next := tree.getAdjacentRegions(regionA) re.Nil(prev) - re.Equal(regionB, next.region) + re.Equal(regionB, next.RegionInfo) prev, next = tree.getAdjacentRegions(regionB) - re.Equal(regionA, prev.region) - re.Equal(regionD, next.region) + re.Equal(regionA, prev.RegionInfo) + re.Equal(regionD, next.RegionInfo) prev, next = tree.getAdjacentRegions(regionC) - re.Equal(regionB, prev.region) - re.Equal(regionD, next.region) + re.Equal(regionB, prev.RegionInfo) + re.Equal(regionD, next.RegionInfo) prev, next = tree.getAdjacentRegions(regionD) - re.Equal(regionB, prev.region) + re.Equal(regionB, prev.RegionInfo) re.Nil(next) // region with the same range and different region id will not be delete. - region0 := newRegionItem([]byte{}, []byte("a")).region + region0 := newRegionItem([]byte{}, []byte("a")).RegionInfo updateNewItem(tree, region0) re.Equal(region0, tree.search([]byte{})) - anotherRegion0 := newRegionItem([]byte{}, []byte("a")).region + anotherRegion0 := newRegionItem([]byte{}, []byte("a")).RegionInfo anotherRegion0.meta.Id = 123 tree.remove(anotherRegion0) re.Equal(region0, tree.search([]byte{})) // overlaps with 0, A, B, C. - region0D := newRegionItem([]byte(""), []byte("d")).region + region0D := newRegionItem([]byte(""), []byte("d")).RegionInfo updateNewItem(tree, region0D) re.Equal(region0D, tree.search([]byte{})) re.Equal(region0D, tree.search([]byte("a"))) @@ -215,7 +215,7 @@ func TestRegionTree(t *testing.T) { re.Equal(regionD, tree.search([]byte("d"))) // overlaps with D. - regionE := newRegionItem([]byte("e"), []byte{}).region + regionE := newRegionItem([]byte("e"), []byte{}).RegionInfo updateNewItem(tree, regionE) re.Equal(region0D, tree.search([]byte{})) re.Equal(region0D, tree.search([]byte("a"))) @@ -240,7 +240,7 @@ func updateRegions(re *require.Assertions, tree *regionTree, regions []*RegionIn func TestRegionTreeSplitAndMerge(t *testing.T) { re := require.New(t) tree := newRegionTree() - regions := []*RegionInfo{newRegionItem([]byte{}, []byte{}).region} + regions := []*RegionInfo{newRegionItem([]byte{}, []byte{}).RegionInfo} // Byte will underflow/overflow if n > 7. n := 7 @@ -355,7 +355,7 @@ func TestRandomRegionDiscontinuous(t *testing.T) { } func updateNewItem(tree *regionTree, region *RegionInfo) { - item := ®ionItem{region: region} + item := ®ionItem{RegionInfo: region} tree.update(item) } @@ -379,7 +379,7 @@ func checkRandomRegion(re *require.Assertions, tree *regionTree, regions []*Regi } func newRegionItem(start, end []byte) *regionItem { - return ®ionItem{region: NewTestRegionInfo(start, end)} + return ®ionItem{RegionInfo: NewTestRegionInfo(start, end)} } type mockRegionTreeData struct { From 7aba282ff2aa30c6e5e264ab9b3de3a4f932302b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=B7=E6=B2=8CDM?= Date: Thu, 27 Oct 2022 12:23:57 +0800 Subject: [PATCH 4/4] simulator: support change peer v2 (#5609) close tikv/pd#5469 Signed-off-by: HunDunDM Co-authored-by: Ti Chi Robot --- server/api/trend_test.go | 4 +- server/core/region_option.go | 6 +- server/schedule/operator_controller_test.go | 2 +- tools/pd-simulator/simulator/node.go | 11 +- tools/pd-simulator/simulator/statistics.go | 40 +- tools/pd-simulator/simulator/task.go | 654 +++++++++++--------- 6 files changed, 404 insertions(+), 313 deletions(-) diff --git a/server/api/trend_test.go b/server/api/trend_test.go index cdf27206e8a..cf74a187f87 100644 --- a/server/api/trend_test.go +++ b/server/api/trend_test.go @@ -62,7 +62,7 @@ func TestTrend(t *testing.T) { newPeerID := op.Step(0).(operator.AddLearner).PeerID region5 = region5.Clone(core.WithAddPeer(&metapb.Peer{Id: newPeerID, StoreId: 3, Role: metapb.PeerRole_Learner}), core.WithIncConfVer()) mustRegionHeartbeat(re, svr, region5) - region5 = region5.Clone(core.WithPromoteLearner(newPeerID), core.WithRemoveStorePeer(2), core.WithIncConfVer()) + region5 = region5.Clone(core.WithRole(newPeerID, metapb.PeerRole_Voter), core.WithRemoveStorePeer(2), core.WithIncConfVer()) mustRegionHeartbeat(re, svr, region5) op, err = svr.GetHandler().GetOperator(6) @@ -71,7 +71,7 @@ func TestTrend(t *testing.T) { newPeerID = op.Step(0).(operator.AddLearner).PeerID region6 = region6.Clone(core.WithAddPeer(&metapb.Peer{Id: newPeerID, StoreId: 3, Role: metapb.PeerRole_Learner}), core.WithIncConfVer()) mustRegionHeartbeat(re, svr, region6) - region6 = region6.Clone(core.WithPromoteLearner(newPeerID), core.WithLeader(region6.GetStorePeer(2)), core.WithRemoveStorePeer(1), core.WithIncConfVer()) + region6 = region6.Clone(core.WithRole(newPeerID, metapb.PeerRole_Voter), core.WithLeader(region6.GetStorePeer(2)), core.WithRemoveStorePeer(1), core.WithIncConfVer()) mustRegionHeartbeat(re, svr, region6) var trend Trend diff --git a/server/core/region_option.go b/server/core/region_option.go index 99ec2a8d876..64e329fd3b3 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -325,12 +325,12 @@ func WithAddPeer(peer *metapb.Peer) RegionCreateOption { } } -// WithPromoteLearner promotes the learner. -func WithPromoteLearner(peerID uint64) RegionCreateOption { +// WithRole changes the role. +func WithRole(peerID uint64, role metapb.PeerRole) RegionCreateOption { return func(region *RegionInfo) { for _, p := range region.GetPeers() { if p.GetId() == peerID { - p.Role = metapb.PeerRole_Voter + p.Role = role } } } diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index e3b3610af72..7ab638e3e06 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -638,7 +638,7 @@ func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { suite.Equal(2, stream.MsgLength()) region4 := region3.Clone( - core.WithPromoteLearner(3), + core.WithRole(3, metapb.PeerRole_Voter), core.WithIncConfVer(), ) suite.True(steps[1].IsFinish(region4)) diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index a6a9c478735..7cf84c4e941 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -43,7 +43,7 @@ type Node struct { stats *info.StoreStats tick uint64 wg sync.WaitGroup - tasks map[uint64]Task + tasks map[uint64]*Task client Client receiveRegionHeartbeatCh <-chan *pdpb.RegionHeartbeatResponse ctx context.Context @@ -99,7 +99,7 @@ func NewNode(s *cases.Store, pdAddr string, config *SimConfig) (*Node, error) { client: client, ctx: ctx, cancel: cancel, - tasks: make(map[uint64]Task), + tasks: make(map[uint64]*Task), receiveRegionHeartbeatCh: receiveRegionHeartbeatCh, limiter: ratelimit.NewRateLimiter(float64(speed), int(speed)), tick: uint64(rand.Intn(storeHeartBeatPeriod)), @@ -125,7 +125,7 @@ func (n *Node) receiveRegionHeartbeat() { for { select { case resp := <-n.receiveRegionHeartbeatCh: - task := responseToTask(resp, n.raftEngine) + task := responseToTask(n.raftEngine, resp) if task != nil { n.AddTask(task) } @@ -156,8 +156,7 @@ func (n *Node) stepTask() { n.Lock() defer n.Unlock() for _, task := range n.tasks { - task.Step(n.raftEngine) - if task.IsFinished() { + if isFinished := task.Step(n.raftEngine); isFinished { simutil.Logger.Debug("task status", zap.Uint64("node-id", n.Id), zap.Uint64("region-id", task.RegionID()), @@ -246,7 +245,7 @@ func (n *Node) reportRegionChange() { } // AddTask adds task in this node. -func (n *Node) AddTask(task Task) { +func (n *Node) AddTask(task *Task) { n.Lock() defer n.Unlock() if t, ok := n.tasks[task.RegionID()]; ok { diff --git a/tools/pd-simulator/simulator/statistics.go b/tools/pd-simulator/simulator/statistics.go index 37e666f05bc..2dadd78020d 100644 --- a/tools/pd-simulator/simulator/statistics.go +++ b/tools/pd-simulator/simulator/statistics.go @@ -23,20 +23,22 @@ import ( type taskStatistics struct { syncutil.RWMutex - addPeer map[uint64]int + addVoter map[uint64]int removePeer map[uint64]int addLearner map[uint64]int promoteLeaner map[uint64]int + demoteVoter map[uint64]int transferLeader map[uint64]map[uint64]int mergeRegion int } func newTaskStatistics() *taskStatistics { return &taskStatistics{ - addPeer: make(map[uint64]int), + addVoter: make(map[uint64]int), removePeer: make(map[uint64]int), addLearner: make(map[uint64]int), promoteLeaner: make(map[uint64]int), + demoteVoter: make(map[uint64]int), transferLeader: make(map[uint64]map[uint64]int), } } @@ -45,10 +47,11 @@ func (t *taskStatistics) getStatistics() map[string]int { t.RLock() defer t.RUnlock() stats := make(map[string]int) - addPeer := getSum(t.addPeer) + addVoter := getSum(t.addVoter) removePeer := getSum(t.removePeer) addLearner := getSum(t.addLearner) - promoteLeaner := getSum(t.promoteLeaner) + promoteLearner := getSum(t.promoteLeaner) + demoteVoter := getSum(t.demoteVoter) var transferLeader int for _, to := range t.transferLeader { @@ -57,34 +60,41 @@ func (t *taskStatistics) getStatistics() map[string]int { } } - stats["Add Peer (task)"] = addPeer + stats["Add Voter (task)"] = addVoter stats["Remove Peer (task)"] = removePeer stats["Add Learner (task)"] = addLearner - stats["Promote Learner (task)"] = promoteLeaner + stats["Promote Learner (task)"] = promoteLearner + stats["Demote Voter (task)"] = demoteVoter stats["Transfer Leader (task)"] = transferLeader stats["Merge Region (task)"] = t.mergeRegion return stats } -func (t *taskStatistics) incAddPeer(regionID uint64) { +func (t *taskStatistics) incAddVoter(regionID uint64) { t.Lock() defer t.Unlock() - t.addPeer[regionID]++ + t.addVoter[regionID]++ } -func (t *taskStatistics) incAddLeaner(regionID uint64) { +func (t *taskStatistics) incAddLearner(regionID uint64) { t.Lock() defer t.Unlock() t.addLearner[regionID]++ } -func (t *taskStatistics) incPromoteLeaner(regionID uint64) { +func (t *taskStatistics) incPromoteLearner(regionID uint64) { t.Lock() defer t.Unlock() t.promoteLeaner[regionID]++ } +func (t *taskStatistics) incDemoteVoter(regionID uint64) { + t.Lock() + defer t.Unlock() + t.demoteVoter[regionID]++ +} + func (t *taskStatistics) incRemovePeer(regionID uint64) { t.Lock() defer t.Unlock() @@ -97,16 +107,16 @@ func (t *taskStatistics) incMergeRegion() { t.mergeRegion++ } -func (t *taskStatistics) incTransferLeader(fromPeerID, toPeerID uint64) { +func (t *taskStatistics) incTransferLeader(fromPeerStoreID, toPeerStoreID uint64) { t.Lock() defer t.Unlock() - _, ok := t.transferLeader[fromPeerID] + _, ok := t.transferLeader[fromPeerStoreID] if ok { - t.transferLeader[fromPeerID][toPeerID]++ + t.transferLeader[fromPeerStoreID][toPeerStoreID]++ } else { m := make(map[uint64]int) - m[toPeerID]++ - t.transferLeader[fromPeerID] = m + m[toPeerStoreID]++ + t.transferLeader[fromPeerStoreID] = m } } diff --git a/tools/pd-simulator/simulator/task.go b/tools/pd-simulator/simulator/task.go index 083d8b6774c..3ad9c0af600 100644 --- a/tools/pd-simulator/simulator/task.go +++ b/tools/pd-simulator/simulator/task.go @@ -17,6 +17,7 @@ package simulator import ( "bytes" "fmt" + "strings" "time" "github.com/docker/go-units" @@ -25,19 +26,22 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core" "github.com/tikv/pd/tools/pd-analysis/analysis" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" + "go.uber.org/zap" ) -var ( - chunkSize = int64(4 * units.KiB) - maxSnapGeneratorPoolSize = uint32(2) - maxSnapReceivePoolSize = uint32(4) - compressionRatio = int64(2) +const ( + removeSpeed = 100 * units.MiB + chunkSize = 4 * units.KiB + maxSnapGeneratorPoolSize = 2 + maxSnapReceivePoolSize = 4 + compressionRatio = 2 ) type snapAction int const ( - generate = iota + generate snapAction = iota receive ) @@ -49,115 +53,180 @@ const ( finished ) -// Task running in node. -type Task interface { - Desc() string - RegionID() uint64 - Step(r *RaftEngine) - IsFinished() bool -} +func responseToTask(engine *RaftEngine, resp *pdpb.RegionHeartbeatResponse) *Task { + var ( + regionID = resp.GetRegionId() + region = engine.GetRegion(regionID) + op operator + desc string + ) -func responseToTask(resp *pdpb.RegionHeartbeatResponse, r *RaftEngine) Task { - regionID := resp.GetRegionId() - region := r.GetRegion(regionID) - epoch := resp.GetRegionEpoch() - - // change peer - if resp.GetChangePeer() != nil { - changePeer := resp.GetChangePeer() - switch changePeer.GetChangeType() { - case eraftpb.ConfChangeType_AddNode: - return &addPeer{ - regionID: regionID, - epoch: epoch, - peer: changePeer.GetPeer(), - } - case eraftpb.ConfChangeType_RemoveNode: - return &removePeer{ - regionID: regionID, - size: region.GetApproximateSize(), - keys: region.GetApproximateKeys(), - speed: 100 * 1000 * 1000, - epoch: epoch, - peer: changePeer.GetPeer(), - } - case eraftpb.ConfChangeType_AddLearnerNode: - return &addLearner{ - regionID: regionID, - size: region.GetApproximateSize(), - keys: region.GetApproximateKeys(), - epoch: epoch, - peer: changePeer.GetPeer(), - // This two variables are used to simulate sending and receiving snapshot processes. - sendingStat: newSnapshotState(region.GetApproximateSize(), generate), - receivingStat: newSnapshotState(region.GetApproximateSize(), receive), + switch { + case resp.GetChangePeer() != nil: + op, desc = changePeerToOperator(region, resp.GetChangePeer()) + case resp.GetChangePeerV2() != nil: + cps := resp.GetChangePeerV2().GetChanges() + if len(cps) == 0 { + // leave joint state + desc = fmt.Sprintf("leave joint state for region %d", regionID) + op = &changePeerV2Leave{} + } else if len(cps) == 1 { + // original ChangePeer + op, desc = changePeerToOperator(region, cps[0]) + } else { + // enter joint state, it can only contain PromoteLearner and DemoteVoter. + subDesc := make([]string, 0, len(cps)) + cp2 := &changePeerV2Enter{} + for _, cp := range cps { + peer := cp.GetPeer() + subOp, _ := changePeerToOperator(region, cp) + switch subOp.(type) { + case *promoteLearner: + subDesc = append(subDesc, fmt.Sprintf("promote peer %+v", peer)) + cp2.promoteLearners = append(cp2.promoteLearners, peer) + case *demoteVoter: + subDesc = append(subDesc, fmt.Sprintf("demote peer %+v", peer)) + cp2.demoteVoters = append(cp2.demoteVoters, peer) + default: + simutil.Logger.Error("cannot exec AddPeer or RemovePeer when using joint state") + return nil + } } + desc = fmt.Sprintf("%s for region %d", strings.Join(subDesc, ", "), regionID) + op = cp2 } - } else if resp.GetTransferLeader() != nil { - changePeer := resp.GetTransferLeader().GetPeer() - fromPeer := region.GetLeader() - return &transferLeader{ - regionID: regionID, - epoch: epoch, - fromPeer: fromPeer, - peer: changePeer, + case resp.GetTransferLeader() != nil: + fromPeerStoreID := region.GetLeader().GetStoreId() + // When this field is included, it means that TiKV needs to decide the optimal Leader by itself. + toPeers := resp.GetTransferLeader().GetPeers() + // When no Peers are included, use Peer to build Peers of length 1. + if len(toPeers) == 0 { + toPeers = []*metapb.Peer{resp.GetTransferLeader().GetPeer()} } - } else if resp.GetMerge() != nil { + desc = fmt.Sprintf("transfer leader from store %d to store %d", fromPeerStoreID, toPeers[0].GetStoreId()) + op = &transferLeader{ + fromPeerStoreID: fromPeerStoreID, + toPeers: toPeers, + } + case resp.GetMerge() != nil: targetRegion := resp.GetMerge().GetTarget() - return &mergeRegion{ - regionID: regionID, - epoch: epoch, - targetRegion: targetRegion, + desc = fmt.Sprintf("merge region %d into %d", regionID, targetRegion.GetId()) + op = &mergeRegion{targetRegion: targetRegion} + case resp.GetSplitRegion() != nil: + // TODO: support split region + simutil.Logger.Error("split region scheduling is currently not supported") + return nil + default: + return nil + } + + if op == nil { + return nil + } + + return &Task{ + operator: op, + desc: desc, + regionID: regionID, + epoch: resp.GetRegionEpoch(), + isFinished: false, + } +} + +func changePeerToOperator(region *core.RegionInfo, cp *pdpb.ChangePeer) (operator, string) { + regionID := region.GetID() + peer := cp.GetPeer() + switch cp.GetChangeType() { + case eraftpb.ConfChangeType_AddNode: + if region.GetPeer(peer.GetId()) != nil { + return &promoteLearner{peer: peer}, fmt.Sprintf("promote learner %+v for region %d", peer, regionID) + } + return &addPeer{ + peer: peer, + size: region.GetApproximateSize(), + keys: region.GetApproximateKeys(), + sendingStat: newSnapshotState(region.GetApproximateSize(), generate), + receivingStat: newSnapshotState(region.GetApproximateSize(), receive), + }, fmt.Sprintf("add voter %+v for region %d", peer, regionID) + case eraftpb.ConfChangeType_AddLearnerNode: + if region.GetPeer(peer.GetId()) != nil { + return &demoteVoter{peer: peer}, fmt.Sprintf("demote voter %+v for region %d", peer, regionID) } + return &addPeer{ + peer: peer, + size: region.GetApproximateSize(), + keys: region.GetApproximateKeys(), + sendingStat: newSnapshotState(region.GetApproximateSize(), generate), + receivingStat: newSnapshotState(region.GetApproximateSize(), receive), + }, fmt.Sprintf("add learner %+v for region %d", peer, regionID) + case eraftpb.ConfChangeType_RemoveNode: + return &removePeer{ + peer: peer, + size: region.GetApproximateSize(), + speed: removeSpeed, + }, fmt.Sprintf("remove peer %+v for region %d", peer, regionID) + default: + return nil, "" } - return nil } -type snapshotStat struct { - action snapAction - remainSize int64 - status snapStatus - start time.Time +// Simulate the execution of the Operator. +type operator interface { + // Returns new region if the execution is finished, otherwise returns nil. + tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *core.RegionInfo, isFinished bool) } -func newSnapshotState(size int64, action snapAction) *snapshotStat { - if action == receive { - size /= compressionRatio - } - return &snapshotStat{ - remainSize: size, - action: action, - status: pending, - start: time.Now(), - } +// Task running in node. +type Task struct { + operator + desc string + regionID uint64 + epoch *metapb.RegionEpoch + isFinished bool } -type mergeRegion struct { - regionID uint64 - epoch *metapb.RegionEpoch - targetRegion *metapb.Region - finished bool +// Desc returns the description of the Task. +func (t *Task) Desc() string { + return t.desc } -func (m *mergeRegion) Desc() string { - return fmt.Sprintf("merge region %d into %d", m.regionID, m.targetRegion.GetId()) +// RegionID returns the region-id of the Task. +func (t *Task) RegionID() uint64 { + return t.regionID } -func (m *mergeRegion) Step(r *RaftEngine) { - if m.finished { - return +// Step execute once on the Task. +func (t *Task) Step(engine *RaftEngine) (isFinished bool) { + if t.isFinished { + return true } - region := r.GetRegion(m.regionID) - // If region equals to nil, it means that the region has already been merged. - if region == nil || region.GetRegionEpoch().GetConfVer() > m.epoch.ConfVer || region.GetRegionEpoch().GetVersion() > m.epoch.Version { - m.finished = true + region := engine.GetRegion(t.regionID) + if region == nil || region.GetRegionEpoch().GetConfVer() > t.epoch.ConfVer || region.GetRegionEpoch().GetVersion() > t.epoch.Version { + t.isFinished = true return } - targetRegion := r.GetRegion(m.targetRegion.Id) + var newRegion *core.RegionInfo + newRegion, t.isFinished = t.tick(engine, region) + + if newRegion != nil { + engine.SetRegion(newRegion) + engine.recordRegionChange(newRegion) + } + + return t.isFinished +} + +type mergeRegion struct { + targetRegion *metapb.Region +} + +func (m *mergeRegion) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *core.RegionInfo, isFinished bool) { + targetRegion := engine.GetRegion(m.targetRegion.Id) + var startKey, endKey []byte - if bytes.Equal(m.targetRegion.EndKey, region.GetStartKey()) { + if bytes.Equal(m.targetRegion.GetEndKey(), region.GetStartKey()) { startKey = targetRegion.GetStartKey() endKey = region.GetEndKey() } else { @@ -165,16 +234,16 @@ func (m *mergeRegion) Step(r *RaftEngine) { endKey = targetRegion.GetEndKey() } - epoch := targetRegion.GetRegionEpoch() - if m.epoch.ConfVer > m.targetRegion.RegionEpoch.ConfVer { - epoch.ConfVer = m.epoch.ConfVer + epoch := targetRegion.Clone().GetRegionEpoch() + if region.GetRegionEpoch().GetConfVer() > epoch.GetConfVer() { + epoch.ConfVer = region.GetRegionEpoch().GetConfVer() } - - if m.epoch.Version > m.targetRegion.RegionEpoch.Version { - epoch.Version = m.epoch.Version + if region.GetRegionEpoch().GetVersion() > epoch.GetVersion() { + epoch.Version = region.GetRegionEpoch().GetVersion() } epoch.Version++ - mergeRegion := targetRegion.Clone( + + newRegion = targetRegion.Clone( core.WithStartKey(startKey), core.WithEndKey(endKey), core.SetRegionConfVer(epoch.ConfVer), @@ -182,247 +251,260 @@ func (m *mergeRegion) Step(r *RaftEngine) { core.SetApproximateSize(targetRegion.GetApproximateSize()+region.GetApproximateSize()), core.SetApproximateKeys(targetRegion.GetApproximateKeys()+region.GetApproximateKeys()), ) - r.SetRegion(mergeRegion) - r.recordRegionChange(mergeRegion) - r.schedulerStats.taskStats.incMergeRegion() - m.finished = true -} - -func (m *mergeRegion) RegionID() uint64 { - return m.regionID -} - -func (m *mergeRegion) IsFinished() bool { - return m.finished + engine.schedulerStats.taskStats.incMergeRegion() + return newRegion, true } type transferLeader struct { - regionID uint64 - epoch *metapb.RegionEpoch - fromPeer *metapb.Peer - peer *metapb.Peer - finished bool -} - -func (t *transferLeader) Desc() string { - return fmt.Sprintf("transfer leader from store %d to store %d", t.fromPeer.GetStoreId(), t.peer.GetStoreId()) + fromPeerStoreID uint64 + toPeers []*metapb.Peer } -func (t *transferLeader) Step(r *RaftEngine) { - if t.finished { +func (t *transferLeader) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *core.RegionInfo, isFinished bool) { + isFinished = true + toPeer := t.toPeers[0] // TODO: Support selection logic + if peer := region.GetPeer(toPeer.GetId()); peer == nil || peer.GetRole() != toPeer.GetRole() || core.IsLearner(peer) { return } - region := r.GetRegion(t.regionID) - if region.GetRegionEpoch().GetVersion() > t.epoch.Version || region.GetRegionEpoch().GetConfVer() > t.epoch.ConfVer { - t.finished = true - return - } - var newRegion *core.RegionInfo - if region.GetPeer(t.peer.GetId()) != nil { - newRegion = region.Clone(core.WithLeader(t.peer)) - } else { - // This branch will be executed - t.finished = true - return + if toPeer.GetRole() == metapb.PeerRole_DemotingVoter { + simutil.Logger.Error("set demoting-voter as leader", + zap.Uint64("region-id", region.GetID()), + zap.String("peer", toPeer.String())) } - t.finished = true - r.SetRegion(newRegion) - r.recordRegionChange(newRegion) - fromPeerID := t.fromPeer.GetId() - toPeerID := t.peer.GetId() - r.schedulerStats.taskStats.incTransferLeader(fromPeerID, toPeerID) -} -func (t *transferLeader) RegionID() uint64 { - return t.regionID + newRegion = region.Clone(core.WithLeader(toPeer)) + engine.schedulerStats.taskStats.incTransferLeader(t.fromPeerStoreID, toPeer.GetStoreId()) + return } -func (t *transferLeader) IsFinished() bool { - return t.finished -} - -type addPeer struct { - regionID uint64 - epoch *metapb.RegionEpoch - peer *metapb.Peer - finished bool -} - -func (a *addPeer) Desc() string { - return fmt.Sprintf("add peer %+v for region %d", a.peer, a.regionID) -} - -func (a *addPeer) Step(r *RaftEngine) { - if a.finished { - return +func checkAndCreateChangePeerOption(engine *RaftEngine, region *core.RegionInfo, + peer *metapb.Peer, from, to metapb.PeerRole) []core.RegionCreateOption { + // `from` and `to` need to satisfy the combination in switch. + + // check `from` Role + if peer.GetRole() != from { + simutil.Logger.Error( + "unexpected role", + zap.String("role", peer.GetRole().String()), + zap.String("expected", from.String())) + return nil } - region := r.GetRegion(a.regionID) - if region.GetRegionEpoch().GetVersion() > a.epoch.Version || region.GetRegionEpoch().GetConfVer() > a.epoch.ConfVer { - a.finished = true - return + // Leader cannot be demoted + if (to == metapb.PeerRole_DemotingVoter || to == metapb.PeerRole_Learner) && region.GetLeader().GetId() == peer.GetId() { + simutil.Logger.Error("demote leader", zap.String("region", region.GetMeta().String())) + return nil } - - var opts []core.RegionCreateOption - if region.GetPeer(a.peer.GetId()) == nil { - opts = append(opts, core.WithAddPeer(a.peer)) - r.schedulerStats.taskStats.incAddPeer(region.GetID()) - } else { - opts = append(opts, core.WithPromoteLearner(a.peer.GetId())) - r.schedulerStats.taskStats.incPromoteLeaner(region.GetID()) - } - opts = append(opts, core.WithIncConfVer()) - newRegion := region.Clone(opts...) - r.SetRegion(newRegion) - r.recordRegionChange(newRegion) - a.finished = true + // create option + switch to { + case metapb.PeerRole_Voter: // Learner/IncomingVoter -> Voter + engine.schedulerStats.taskStats.incPromoteLearner(region.GetID()) + case metapb.PeerRole_Learner: // Voter/DemotingVoter -> Learner + engine.schedulerStats.taskStats.incDemoteVoter(region.GetID()) + case metapb.PeerRole_IncomingVoter: // Learner -> IncomingVoter, only in joint state + case metapb.PeerRole_DemotingVoter: // Voter -> DemotingVoter, only in joint state + default: + return nil + } + return []core.RegionCreateOption{core.WithRole(peer.GetId(), to), core.WithIncConfVer()} } -func (a *addPeer) RegionID() uint64 { - return a.regionID +type promoteLearner struct { + peer *metapb.Peer } -func (a *addPeer) IsFinished() bool { - return a.finished +func (pl *promoteLearner) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *core.RegionInfo, isFinished bool) { + isFinished = true + peer := region.GetPeer(pl.peer.GetId()) + opts := checkAndCreateChangePeerOption(engine, region, peer, metapb.PeerRole_Learner, metapb.PeerRole_Voter) + if len(opts) > 0 { + newRegion = region.Clone(opts...) + } + return } -type removePeer struct { - regionID uint64 - size int64 - keys int64 - speed int64 - epoch *metapb.RegionEpoch - peer *metapb.Peer - finished bool +type demoteVoter struct { + peer *metapb.Peer } -func (a *removePeer) Desc() string { - return fmt.Sprintf("remove peer %+v for region %d", a.peer, a.regionID) +func (dv *demoteVoter) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *core.RegionInfo, isFinished bool) { + isFinished = true + peer := region.GetPeer(dv.peer.GetId()) + opts := checkAndCreateChangePeerOption(engine, region, peer, metapb.PeerRole_Voter, metapb.PeerRole_Learner) + if len(opts) > 0 { + newRegion = region.Clone(opts...) + } + return } -func (a *removePeer) Step(r *RaftEngine) { - if a.finished { - return - } - region := r.GetRegion(a.regionID) - if region.GetRegionEpoch().GetVersion() > a.epoch.Version || region.GetRegionEpoch().GetConfVer() > a.epoch.ConfVer { - a.finished = true - return - } +type changePeerV2Enter struct { + promoteLearners []*metapb.Peer + demoteVoters []*metapb.Peer +} - regionSize := uint64(region.GetApproximateSize()) - a.size -= a.speed - if a.size < 0 { - for _, peer := range region.GetPeers() { - if peer.GetId() == a.peer.GetId() { - storeID := peer.GetStoreId() - var downPeers []*pdpb.PeerStats - if r.conn.Nodes[storeID] == nil { - for _, downPeer := range region.GetDownPeers() { - if downPeer.Peer.StoreId != storeID { - downPeers = append(downPeers, downPeer) - } - } - } - newRegion := region.Clone( - core.WithRemoveStorePeer(storeID), - core.WithIncConfVer(), - core.WithDownPeers(downPeers), - ) - r.SetRegion(newRegion) - r.recordRegionChange(newRegion) - r.schedulerStats.taskStats.incRemovePeer(region.GetID()) - if r.conn.Nodes[storeID] == nil { - a.finished = true - return - } - r.conn.Nodes[storeID].decUsedSize(regionSize) - break - } +func (ce *changePeerV2Enter) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *core.RegionInfo, isFinished bool) { + isFinished = true + var opts []core.RegionCreateOption + for _, pl := range ce.promoteLearners { + peer := region.GetPeer(pl.GetId()) + subOpts := checkAndCreateChangePeerOption(engine, region, peer, metapb.PeerRole_Learner, metapb.PeerRole_IncomingVoter) + if len(subOpts) == 0 { + return } - a.finished = true - if analysis.GetTransferCounter().IsValid { - analysis.GetTransferCounter().AddSource(a.regionID, a.peer.StoreId) + opts = append(opts, subOpts...) + } + for _, dv := range ce.demoteVoters { + peer := region.GetPeer(dv.GetId()) + subOpts := checkAndCreateChangePeerOption(engine, region, peer, metapb.PeerRole_Voter, metapb.PeerRole_DemotingVoter) + if len(subOpts) == 0 { + return } + opts = append(opts, subOpts...) } + newRegion = region.Clone(opts...) + return } -func (a *removePeer) RegionID() uint64 { - return a.regionID -} +type changePeerV2Leave struct{} -func (a *removePeer) IsFinished() bool { - return a.finished +func (cl *changePeerV2Leave) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *core.RegionInfo, isFinished bool) { + isFinished = true + var opts []core.RegionCreateOption + for _, peer := range region.GetPeers() { + switch peer.GetRole() { + case metapb.PeerRole_IncomingVoter: + opts = append(opts, checkAndCreateChangePeerOption(engine, region, peer, metapb.PeerRole_IncomingVoter, metapb.PeerRole_Voter)...) + case metapb.PeerRole_DemotingVoter: + opts = append(opts, checkAndCreateChangePeerOption(engine, region, peer, metapb.PeerRole_IncomingVoter, metapb.PeerRole_Voter)...) + } + } + if len(opts) < 4 { + simutil.Logger.Error("fewer than two peers should not need to leave the joint state") + return + } + newRegion = region.Clone(opts...) + return } -type addLearner struct { - regionID uint64 +type addPeer struct { + peer *metapb.Peer size int64 keys int64 - epoch *metapb.RegionEpoch - peer *metapb.Peer - finished bool sendingStat *snapshotStat receivingStat *snapshotStat } -func (a *addLearner) Desc() string { - return fmt.Sprintf("add learner %+v for region %d", a.peer, a.regionID) -} - -func (a *addLearner) Step(r *RaftEngine) { - if a.finished { - return +func (a *addPeer) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *core.RegionInfo, isFinished bool) { + // Check + sendNode := engine.conn.Nodes[region.GetLeader().GetStoreId()] + if sendNode == nil { + return nil, true } - region := r.GetRegion(a.regionID) - if region.GetRegionEpoch().GetVersion() > a.epoch.Version || region.GetRegionEpoch().GetConfVer() > a.epoch.ConfVer { - a.finished = true - return + recvNode := engine.conn.Nodes[a.peer.GetStoreId()] + if recvNode == nil { + return nil, true } - - snapshotSize := region.GetApproximateSize() - sendNode := r.conn.Nodes[region.GetLeader().GetStoreId()] - if sendNode == nil { - a.finished = true - return + // Step 1: Generate Pending Peers + if region.GetPeer(a.peer.GetId()) == nil { + switch a.peer.GetRole() { + case metapb.PeerRole_Voter: + engine.schedulerStats.taskStats.incAddVoter(region.GetID()) + case metapb.PeerRole_Learner: + engine.schedulerStats.taskStats.incAddLearner(region.GetID()) + } + pendingPeers := append(region.GetPendingPeers(), a.peer) + return region.Clone(core.WithAddPeer(a.peer), core.WithIncConfVer(), core.WithPendingPeers(pendingPeers)), false } + // Step 2: Process Snapshot if !processSnapshot(sendNode, a.sendingStat) { - return + return nil, false } - r.schedulerStats.snapshotStats.incSendSnapshot(sendNode.Id) + engine.schedulerStats.snapshotStats.incSendSnapshot(sendNode.Id) + if !processSnapshot(recvNode, a.receivingStat) { + return nil, false + } + engine.schedulerStats.snapshotStats.incReceiveSnapshot(recvNode.Id) + recvNode.incUsedSize(uint64(region.GetApproximateSize())) + // Step 3: Remove the Pending state + newRegion = region.Clone(removePendingPeer(region, a.peer)) + isFinished = true - recvNode := r.conn.Nodes[a.peer.GetStoreId()] - if recvNode == nil { - a.finished = true - return + // analysis + if analysis.GetTransferCounter().IsValid { + analysis.GetTransferCounter().AddTarget(region.GetID(), a.peer.GetStoreId()) } - if !processSnapshot(recvNode, a.receivingStat) { - return + + return +} + +type removePeer struct { + peer *metapb.Peer + size int64 + speed int64 +} + +func (r *removePeer) tick(engine *RaftEngine, region *core.RegionInfo) (newRegion *core.RegionInfo, isFinished bool) { + // Step 1: Delete data + r.size -= r.speed + if r.size > 0 { + return nil, false + } + // Step 2: Remove Peer + engine.schedulerStats.taskStats.incRemovePeer(region.GetID()) + newRegion = region.Clone( + core.WithIncConfVer(), + core.WithRemoveStorePeer(r.peer.GetStoreId()), + removePendingPeer(region, r.peer), + removeDownPeers(region, r.peer)) + isFinished = true + + if store := engine.conn.Nodes[r.peer.GetStoreId()]; store != nil { + store.decUsedSize(uint64(region.GetApproximateSize())) + // analysis + if analysis.GetTransferCounter().IsValid { + analysis.GetTransferCounter().AddSource(region.GetID(), r.peer.GetStoreId()) + } } - r.schedulerStats.snapshotStats.incReceiveSnapshot(recvNode.Id) - if region.GetPeer(a.peer.GetId()) == nil { - newRegion := region.Clone( - core.WithAddPeer(a.peer), - core.WithIncConfVer(), - ) - r.SetRegion(newRegion) - r.recordRegionChange(newRegion) - r.schedulerStats.taskStats.incAddLeaner(region.GetID()) - recvNode.incUsedSize(uint64(snapshotSize)) - a.finished = true + return +} + +func removePendingPeer(region *core.RegionInfo, removePeer *metapb.Peer) core.RegionCreateOption { + pendingPeers := make([]*metapb.Peer, 0, len(region.GetPendingPeers())) + for _, peer := range region.GetPendingPeers() { + if peer.GetId() != removePeer.GetId() { + pendingPeers = append(pendingPeers, peer) + } } + return core.WithPendingPeers(pendingPeers) +} - if analysis.GetTransferCounter().IsValid { - analysis.GetTransferCounter().AddTarget(a.regionID, a.peer.StoreId) +func removeDownPeers(region *core.RegionInfo, removePeer *metapb.Peer) core.RegionCreateOption { + downPeers := make([]*pdpb.PeerStats, 0, len(region.GetDownPeers())) + for _, peer := range region.GetDownPeers() { + if peer.GetPeer().GetId() != removePeer.GetId() { + downPeers = append(downPeers, peer) + } } + return core.WithDownPeers(downPeers) } -func (a *addLearner) RegionID() uint64 { - return a.regionID +type snapshotStat struct { + action snapAction + remainSize int64 + status snapStatus + start time.Time } -func (a *addLearner) IsFinished() bool { - return a.finished +func newSnapshotState(size int64, action snapAction) *snapshotStat { + if action == receive { + size /= compressionRatio + } + return &snapshotStat{ + remainSize: size, + action: action, + status: pending, + start: time.Now(), + } } func processSnapshot(n *Node, stat *snapshotStat) bool {