Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: improve the leader distribution after region scatter (#2659) #2684

Merged
merged 1 commit into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,16 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
}

// AddLeaderRegion adds region with specified leader and followers.
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderID uint64, followerIds ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, followerStoreIDs...)
region := origin.Clone(core.SetApproximateSize(10), core.SetApproximateKeys(10))
mc.PutRegion(region)
return region
}

// AddRegionWithLearner adds region with specified leader, followers and learners.
func (mc *Cluster) AddRegionWithLearner(regionID uint64, leaderID uint64, followerIDs, learnerIDs []uint64) *core.RegionInfo {
origin := mc.MockRegionInfo(regionID, leaderID, followerIDs, learnerIDs, nil)
func (mc *Cluster) AddRegionWithLearner(regionID uint64, leaderStoreID uint64, followerStoreIDs, learnerStoreIDs []uint64) *core.RegionInfo {
origin := mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, learnerStoreIDs, nil)
region := origin.Clone(core.SetApproximateSize(10), core.SetApproximateKeys(10))
mc.PutRegion(region)
return region
Expand Down Expand Up @@ -515,8 +515,8 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) {
mc.PutStore(newStore)
}

func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderID, followerIDs, []uint64{}, nil)
func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, []uint64{}, nil)
}

// GetOpt mocks method.
Expand Down Expand Up @@ -595,23 +595,23 @@ func (mc *Cluster) RemoveScheduler(name string) error {
}

// MockRegionInfo returns a mock region
func (mc *Cluster) MockRegionInfo(regionID uint64, leaderID uint64,
followerIDs, learnerIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {
func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64,
followerStoreIDs, learnerStoreIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {

region := &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
RegionEpoch: epoch,
}
leader, _ := mc.AllocPeer(leaderID)
leader, _ := mc.AllocPeer(leaderStoreID)
region.Peers = []*metapb.Peer{leader}
for _, id := range followerIDs {
peer, _ := mc.AllocPeer(id)
for _, storeID := range followerStoreIDs {
peer, _ := mc.AllocPeer(storeID)
region.Peers = append(region.Peers, peer)
}
for _, id := range learnerIDs {
peer, _ := mc.AllocPeer(id)
for _, storeID := range learnerStoreIDs {
peer, _ := mc.AllocPeer(storeID)
peer.IsLearner = true
region.Peers = append(region.Peers, peer)
}
Expand Down
5 changes: 4 additions & 1 deletion server/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func isRegionMatch(a, b *core.RegionInfo) bool {
}

// CreateScatterRegionOperator creates an operator that scatters the specified region.
func CreateScatterRegionOperator(desc string, cluster Cluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer) (*Operator, error) {
func CreateScatterRegionOperator(desc string, cluster Cluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) (*Operator, error) {
// randomly pick a leader.
var ids []uint64
for id, peer := range targetPeers {
Expand All @@ -159,6 +159,9 @@ func CreateScatterRegionOperator(desc string, cluster Cluster, origin *core.Regi
if len(ids) > 0 {
leader = ids[rand.Intn(len(ids))]
}
if targetLeader != 0 {
leader = targetLeader
}
return NewBuilder(desc, cluster, origin).
SetPeers(targetPeers).
SetLeader(leader).
Expand Down
58 changes: 53 additions & 5 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package schedule

import (
"math"
"math/rand"
"sync"

Expand All @@ -29,6 +30,29 @@ import (

const regionScatterName = "region-scatter"

type selectedLeaderStores struct {
mu sync.Mutex
stores map[uint64]uint64 // storeID -> hintCount
}

func (s *selectedLeaderStores) put(id uint64) {
s.mu.Lock()
defer s.mu.Unlock()
s.stores[id] = s.stores[id] + 1
}

func (s *selectedLeaderStores) get(id uint64) uint64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.stores[id]
}

func newSelectedLeaderStores() *selectedLeaderStores {
return &selectedLeaderStores{
stores: make(map[uint64]uint64),
}
}

type selectedStores struct {
mu sync.Mutex
stores map[uint64]struct{}
Expand Down Expand Up @@ -86,15 +110,17 @@ func NewRegionScatterer(cluster opt.Cluster) *RegionScatterer {
}

type engineContext struct {
filters []filter.Filter
selected *selectedStores
filters []filter.Filter
selected *selectedStores
selectedLeader *selectedLeaderStores
}

func newEngineContext(filters ...filter.Filter) engineContext {
filters = append(filters, filter.StoreStateFilter{ActionScope: regionScatterName})
return engineContext{
filters: filters,
selected: newSelectedStores(),
filters: filters,
selected: newSelectedStores(),
selectedLeader: newSelectedLeaderStores(),
}
}

Expand Down Expand Up @@ -153,6 +179,11 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *operator.Opera
}

scatterWithSameEngine(ordinaryPeers, r.ordinaryEngine)
// FIXME: target leader only considers the ordinary stores,maybe we need to consider the
// special engine stores if the engine supports to become a leader. But now there is only
// one engine, tiflash, which does not support the leader, so don't consider it for now.
targetLeader := r.searchLeastleaderStore(targetPeers, r.ordinaryEngine)

for engine, peers := range specialPeers {
context, ok := r.specialEngines[engine]
if !ok {
Expand All @@ -162,7 +193,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *operator.Opera
scatterWithSameEngine(peers, context)
}

op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers)
op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader)
if err != nil {
log.Debug("fail to create scatter region operator", zap.Error(err))
return nil
Expand Down Expand Up @@ -209,6 +240,7 @@ func (r *RegionScatterer) collectAvailableStores(region *core.RegionInfo, contex
filters := []filter.Filter{
context.selected.newFilter(r.name),
filter.NewExcludedFilter(r.name, nil, region.GetStoreIds()),
filter.StoreStateFilter{ActionScope: r.name, MoveRegion: true},
}
filters = append(filters, context.filters...)

Expand All @@ -221,3 +253,19 @@ func (r *RegionScatterer) collectAvailableStores(region *core.RegionInfo, contex
}
return targets
}

func (r *RegionScatterer) searchLeastleaderStore(peers map[uint64]*metapb.Peer, context engineContext) uint64 {
m := uint64(math.MaxUint64)
id := uint64(0)
for storeID := range peers {
count := context.selectedLeader.get(storeID)
if m > count {
m = count
id = storeID
}
}
if id != 0 {
context.selectedLeader.put(id)
}
return id
}
54 changes: 38 additions & 16 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package schedule

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/pd/v4/pkg/mock/mockcluster"
"github.com/pingcap/pd/v4/pkg/mock/mockhbstream"
Expand Down Expand Up @@ -41,21 +42,21 @@ var _ = Suite(&testScatterRegionSuite{})
type testScatterRegionSuite struct{}

func (s *testScatterRegionSuite) TestSixStores(c *C) {
s.scatter(c, 6, 4, false)
s.scatter(c, 6, 4, true)
s.scatter(c, 6, 100, false)
s.scatter(c, 6, 100, true)
}

func (s *testScatterRegionSuite) TestFiveStores(c *C) {
s.scatter(c, 5, 5, false)
s.scatter(c, 5, 5, true)
s.scatter(c, 5, 100, false)
s.scatter(c, 5, 100, true)
}

func (s *testScatterRegionSuite) TestSixSpecialStores(c *C) {
s.scatterSpecial(c, 3, 6, 4)
s.scatterSpecial(c, 3, 6, 100)
}

func (s *testScatterRegionSuite) TestFiveSpecialStores(c *C) {
s.scatterSpecial(c, 5, 5, 5)
s.scatterSpecial(c, 5, 5, 100)
}

func (s *testScatterRegionSuite) checkOperator(op *operator.Operator, c *C) {
Expand All @@ -81,11 +82,11 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64, use
}
tc.EnablePlacementRules = useRules

seq := newSequencer(numStores)
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
tc.AddLeaderRegion(1, 1, 2, 3)
for i := uint64(2); i <= numRegions; i++ {
tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next())
// region distributed in same stores.
tc.AddLeaderRegion(i, 1, 2, 3)
}

scatterer := NewRegionScatterer(tc)
Expand All @@ -99,16 +100,29 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64, use
}

countPeers := make(map[uint64]uint64)
countLeader := make(map[uint64]uint64)
for i := uint64(1); i <= numRegions; i++ {
region := tc.GetRegion(i)
for _, peer := range region.GetPeers() {
countPeers[peer.GetStoreId()]++
if peer.GetId() == region.GetLeader().GetId() {
countLeader[peer.GetStoreId()]++
}
}
}

// Each store should have the same number of peers.
for _, count := range countPeers {
c.Assert(count, Equals, numRegions*3/numStores)
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions*3)/float64(numStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions*3)/float64(numStores))
}

// Each store should have the same number of leaders.
c.Assert(len(countPeers), Equals, int(numStores))
c.Assert(len(countLeader), Equals, int(numStores))
for _, count := range countLeader {
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions)/float64(numStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions)/float64(numStores))
}
}

Expand All @@ -129,16 +143,14 @@ func (s *testScatterRegionSuite) scatterSpecial(c *C, numOrdinaryStores, numSpec
GroupID: "pd", ID: "learner", Role: placement.Learner, Count: 3,
LabelConstraints: []placement.LabelConstraint{{Key: "engine", Op: placement.In, Values: []string{"tiflash"}}}}), IsNil)

ordinarySeq := newSequencer(numOrdinaryStores)
specialSeq := newSequencerWithMinID(numOrdinaryStores+1, numOrdinaryStores+numSpecialStores)
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
tc.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{numOrdinaryStores + 1, numOrdinaryStores + 2, numOrdinaryStores + 3})
for i := uint64(2); i <= numRegions; i++ {
tc.AddRegionWithLearner(
i,
ordinarySeq.next(),
[]uint64{ordinarySeq.next(), ordinarySeq.next()},
[]uint64{specialSeq.next(), specialSeq.next(), specialSeq.next()},
1,
[]uint64{2, 3},
[]uint64{numOrdinaryStores + 1, numOrdinaryStores + 2, numOrdinaryStores + 3},
)
}

Expand All @@ -154,6 +166,7 @@ func (s *testScatterRegionSuite) scatterSpecial(c *C, numOrdinaryStores, numSpec

countOrdinaryPeers := make(map[uint64]uint64)
countSpecialPeers := make(map[uint64]uint64)
countOrdinaryLeaders := make(map[uint64]uint64)
for i := uint64(1); i <= numRegions; i++ {
region := tc.GetRegion(i)
for _, peer := range region.GetPeers() {
Expand All @@ -164,15 +177,24 @@ func (s *testScatterRegionSuite) scatterSpecial(c *C, numOrdinaryStores, numSpec
} else {
countOrdinaryPeers[storeID]++
}
if peer.GetId() == region.GetLeader().GetId() {
countOrdinaryLeaders[storeID]++
}
}
}

// Each store should have the same number of peers.
for _, count := range countOrdinaryPeers {
c.Assert(count, Equals, numRegions*3/numOrdinaryStores)
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions*3)/float64(numOrdinaryStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions*3)/float64(numOrdinaryStores))
}
for _, count := range countSpecialPeers {
c.Assert(count, Equals, numRegions*3/numSpecialStores)
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions*3)/float64(numSpecialStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions*3)/float64(numSpecialStores))
}
for _, count := range countOrdinaryLeaders {
c.Assert(float64(count), LessEqual, 1.1*float64(numRegions)/float64(numOrdinaryStores))
c.Assert(float64(count), GreaterEqual, 0.9*float64(numRegions)/float64(numOrdinaryStores))
}
}

Expand Down