Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: support range for schedulers #1791

Merged
merged 7 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,23 +619,23 @@ func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo {
}

// RandLeaderRegion returns a random region that has leader on the store.
func (c *RaftCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLeaderRegion(storeID, opts...)
func (c *RaftCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLeaderRegion(storeID, ranges, opts...)
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (c *RaftCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandFollowerRegion(storeID, opts...)
func (c *RaftCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandFollowerRegion(storeID, ranges, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (c *RaftCluster) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandPendingRegion(storeID, opts...)
func (c *RaftCluster) RandPendingRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandPendingRegion(storeID, ranges, opts...)
}

// RandLearnerRegion returns a random region that has a learner peer on the store.
func (c *RaftCluster) RandLearnerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLearnerRegion(storeID, opts...)
func (c *RaftCluster) RandLearnerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLearnerRegion(storeID, ranges, opts...)
}

// RandHotRegionFromStore randomly picks a hot region in specified store.
Expand Down
10 changes: 5 additions & 5 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,10 +833,10 @@ func (s *testRegionsInfoSuite) Test(c *C) {
}

for i := uint64(0); i < n; i++ {
region := cluster.RandLeaderRegion(i, core.HealthRegion())
region := cache.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")})
rleungx marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(region.GetLeader().GetStoreId(), Equals, i)

region = cluster.RandFollowerRegion(i, core.HealthRegion())
region = cache.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")})
c.Assert(region.GetLeader().GetStoreId(), Not(Equals), i)

c.Assert(region.GetStorePeer(i), NotNil)
Expand All @@ -852,14 +852,14 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// All regions will be filtered out if they have pending peers.
for i := uint64(0); i < n; i++ {
for j := 0; j < cache.GetStoreLeaderCount(i); j++ {
region := cluster.RandLeaderRegion(i, core.HealthRegion())
region := cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion())
newRegion := region.Clone(core.WithPendingPeers(region.GetPeers()))
cache.SetRegion(newRegion)
}
c.Assert(cluster.RandLeaderRegion(i, core.HealthRegion()), IsNil)
c.Assert(cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil)
}
for i := uint64(0); i < n; i++ {
c.Assert(cluster.RandFollowerRegion(i, core.HealthRegion()), IsNil)
c.Assert(cluster.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil)
}
}

Expand Down
14 changes: 7 additions & 7 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,13 +708,14 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
co.run()
storage = tc.RaftCluster.storage
c.Assert(co.schedulers, HasLen, 3)
bls, err := schedule.CreateScheduler("balance-leader", oc, storage, nil)
bls, err := schedule.CreateScheduler("balance-leader", oc, storage, schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
c.Assert(co.addScheduler(bls), IsNil)
brs, err := schedule.CreateScheduler("balance-region", oc, storage, nil)
brs, err := schedule.CreateScheduler("balance-region", oc, storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)
c.Assert(co.addScheduler(brs), IsNil)
c.Assert(co.schedulers, HasLen, 5)

// the scheduler option should contain 7 items
// the `hot scheduler` and `label scheduler` are disabled
c.Assert(co.cluster.opt.GetSchedulers(), HasLen, 7)
Expand All @@ -725,7 +726,6 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
c.Assert(co.cluster.opt.Persist(co.cluster.storage), IsNil)
co.stop()
co.wg.Wait()

_, newOpt, err = newTestScheduleConfig()
c.Assert(err, IsNil)
c.Assert(newOpt.Reload(co.cluster.storage), IsNil)
Expand Down Expand Up @@ -924,7 +924,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
}, nil, nil, c)
defer cleanup()
oc := co.opController
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, nil)
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)
c.Assert(tc.addRegionStore(4, 100), IsNil)
c.Assert(tc.addRegionStore(3, 100), IsNil)
Expand Down Expand Up @@ -964,7 +964,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) {
}, nil, nil, c)
defer cleanup()
oc := co.opController
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, nil)
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)

c.Assert(tc.addRegionStore(4, 100), IsNil)
Expand Down Expand Up @@ -1024,7 +1024,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {

c.Assert(tc.addLeaderRegion(1, 1), IsNil)
c.Assert(tc.addLeaderRegion(2, 2), IsNil)
scheduler, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), nil)
scheduler, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
lb := &mockLimitScheduler{
Scheduler: scheduler,
Expand Down Expand Up @@ -1094,7 +1094,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {
_, co, cleanup := prepare(nil, nil, nil, c)
defer cleanup()

lb, err := schedule.CreateScheduler("balance-leader", co.opController, core.NewStorage(kv.NewMemoryKV()), nil)
lb, err := schedule.CreateScheduler("balance-leader", co.opController, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
sc := newScheduleController(co, lb)

Expand Down
41 changes: 29 additions & 12 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,39 +155,42 @@ func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regio
const randomRegionMaxRetry = 10

// RandFollowerRegion returns a random region that has a follower on the store.
func (bc *BasicCluster) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
regions := bc.Regions.RandFollowerRegions(storeID, randomRegionMaxRetry)
regions := bc.Regions.RandFollowerRegions(storeID, ranges, randomRegionMaxRetry)
bc.RUnlock()
return bc.selectRegion(regions, opts...)
}

// RandLeaderRegion returns a random region that has leader on the store.
func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
regions := bc.Regions.RandLeaderRegions(storeID, randomRegionMaxRetry)
regions := bc.Regions.RandLeaderRegions(storeID, ranges, randomRegionMaxRetry)
bc.RUnlock()
return bc.selectRegion(regions, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
regions := bc.Regions.RandPendingRegions(storeID, randomRegionMaxRetry)
regions := bc.Regions.RandPendingRegions(storeID, ranges, randomRegionMaxRetry)
bc.RUnlock()
return bc.selectRegion(regions, opts...)
}

// RandLearnerRegion returns a random region that has a learner peer on the store.
func (bc *BasicCluster) RandLearnerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandLearnerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
regions := bc.Regions.RandLearnerRegions(storeID, randomRegionMaxRetry)
regions := bc.Regions.RandLearnerRegions(storeID, ranges, randomRegionMaxRetry)
bc.RUnlock()
return bc.selectRegion(regions, opts...)
}

func (bc *BasicCluster) selectRegion(regions []*RegionInfo, opts ...RegionOption) *RegionInfo {
for _, r := range regions {
if r == nil {
break
}
if slice.AllOf(opts, func(i int) bool { return opts[i](r) }) {
return r
}
Expand Down Expand Up @@ -331,10 +334,10 @@ func (bc *BasicCluster) Length() int {

// RegionSetInformer provides access to a shared informer of regions.
type RegionSetInformer interface {
RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandLearnerRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
RandLearnerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *RegionInfo
Expand All @@ -359,3 +362,17 @@ type StoreSetController interface {

AttachAvailableFunc(id uint64, f func() bool)
}

// KeyRange is a key range.
type KeyRange struct {
StartKey []byte `json:"start-key"`
EndKey []byte `json:"end-key"`
}

// NewKeyRange create a KeyRange with the given start key and end key.
func NewKeyRange(startKey, endKey string) KeyRange {
return KeyRange{
StartKey: []byte(startKey),
EndKey: []byte(endKey),
}
}
58 changes: 41 additions & 17 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"math/rand"
"reflect"
"strings"

Expand Down Expand Up @@ -504,9 +505,14 @@ func (rst *regionSubTree) RandomRegions(n int, startKey, endKey []byte) []*Regio
if rst.length() == 0 {
return nil
}

regions := make([]*RegionInfo, 0, n)
for i := 0; i < n; i++ {
regions = append(regions, rst.regionTree.RandomRegion(startKey, endKey))
region := rst.regionTree.RandomRegion(startKey, endKey)
if region == nil || !isInvolved(region, startKey, endKey) {
continue
}
regions = append(regions, region)
}
return regions
}
Expand Down Expand Up @@ -739,43 +745,57 @@ func (r *RegionsInfo) GetStoreLearnerCount(storeID uint64) int {
}

// RandPendingRegion randomly gets a store's region with a pending peer.
func (r *RegionsInfo) RandPendingRegion(storeID uint64) *RegionInfo {
return r.pendingPeers[storeID].RandomRegion(nil, nil)
func (r *RegionsInfo) RandPendingRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.pendingPeers[storeID].RandomRegion(startKey, endKey)
}

// RandPendingRegions randomly gets a store's n regions with a pending peer.
func (r *RegionsInfo) RandPendingRegions(storeID uint64, n int) []*RegionInfo {
return r.pendingPeers[storeID].RandomRegions(n, nil, nil)
func (r *RegionsInfo) RandPendingRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.pendingPeers[storeID].RandomRegions(n, startKey, endKey)
}

// RandLeaderRegion randomly gets a store's leader region.
func (r *RegionsInfo) RandLeaderRegion(storeID uint64) *RegionInfo {
return r.leaders[storeID].RandomRegion(nil, nil)
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.leaders[storeID].RandomRegion(startKey, endKey)
}

// RandLeaderRegions randomly gets a store's n leader regions.
func (r *RegionsInfo) RandLeaderRegions(storeID uint64, n int) []*RegionInfo {
return r.leaders[storeID].RandomRegions(n, nil, nil)
func (r *RegionsInfo) RandLeaderRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.leaders[storeID].RandomRegions(n, startKey, endKey)
}

// RandFollowerRegion randomly gets a store's follower region.
func (r *RegionsInfo) RandFollowerRegion(storeID uint64) *RegionInfo {
return r.followers[storeID].RandomRegion(nil, nil)
func (r *RegionsInfo) RandFollowerRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.followers[storeID].RandomRegion(startKey, endKey)
}

// RandFollowerRegions randomly gets a store's n follower regions.
func (r *RegionsInfo) RandFollowerRegions(storeID uint64, n int) []*RegionInfo {
return r.followers[storeID].RandomRegions(n, nil, nil)
func (r *RegionsInfo) RandFollowerRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.followers[storeID].RandomRegions(n, startKey, endKey)
}

// RandLearnerRegion randomly gets a store's learner region.
func (r *RegionsInfo) RandLearnerRegion(storeID uint64) *RegionInfo {
return r.learners[storeID].RandomRegion(nil, nil)
func (r *RegionsInfo) RandLearnerRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.learners[storeID].RandomRegion(startKey, endKey)
}

// RandLearnerRegions randomly gets a store's n learner regions.
func (r *RegionsInfo) RandLearnerRegions(storeID uint64, n int) []*RegionInfo {
return r.learners[storeID].RandomRegions(n, nil, nil)
func (r *RegionsInfo) RandLearnerRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.learners[storeID].RandomRegions(n, startKey, endKey)
}

// GetKeys gets the start key and end key from random key range.
func (r *RegionsInfo) GetKeys(ranges []KeyRange) ([]byte, []byte) {
idx := rand.Intn(len(ranges))
return ranges[idx].StartKey, ranges[idx].EndKey
}

// GetLeader return leader RegionInfo by storeID and regionID(now only used in test)
Expand Down Expand Up @@ -880,6 +900,10 @@ func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string {
return strings.Join(ret, ", ")
}

func isInvolved(region *RegionInfo, startKey, endKey []byte) bool {
return bytes.Compare(region.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(region.GetEndKey()) > 0 && bytes.Compare(region.GetEndKey(), endKey) <= 0))
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}

// HexRegionKey converts region key to hex format. Used for formating region in
// logs.
func HexRegionKey(key []byte) []byte {
Expand Down
2 changes: 1 addition & 1 deletion server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func BenchmarkRandomRegion(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
regions.RandLeaderRegion(1)
regions.RandLeaderRegion(1, []KeyRange{NewKeyRange("", "")})
}
}

Expand Down
8 changes: 4 additions & 4 deletions server/schedule/range_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ func (r *RangeCluster) GetTolerantSizeRatio() float64 {
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (r *RangeCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return r.subCluster.RandFollowerRegion(storeID, opts...)
func (r *RangeCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return r.subCluster.RandFollowerRegion(storeID, ranges, opts...)
}

// RandLeaderRegion returns a random region that has leader on the store.
func (r *RangeCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return r.subCluster.RandLeaderRegion(storeID, opts...)
func (r *RangeCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return r.subCluster.RandLeaderRegion(storeID, ranges, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
Expand Down
4 changes: 3 additions & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func init() {
}
conf.LeaderLimit = defaultAdjacentLeaderLimit
conf.PeerLimit = defaultAdjacentPeerLimit
conf.Name = balanceAdjacentRegionName
return nil
}
})
Expand All @@ -77,6 +78,7 @@ func init() {
}

type balanceAdjacentRegionConfig struct {
Name string `json:"name"`
LeaderLimit uint64 `json:"leader-limit"`
PeerLimit uint64 `json:"peer-limit"`
}
Expand Down Expand Up @@ -128,7 +130,7 @@ func newBalanceAdjacentRegionScheduler(opController *schedule.OperatorController
}

func (l *balanceAdjacentRegionScheduler) GetName() string {
return balanceAdjacentRegionName
return l.conf.Name
}

func (l *balanceAdjacentRegionScheduler) GetType() string {
Expand Down
Loading