Skip to content

Commit

Permalink
*: scatter the leader distribution in the specified range (#1037)
Browse files Browse the repository at this point in the history
*: add scatter range leader scheduler
  • Loading branch information
nolouch authored May 3, 2018
1 parent 00a31f0 commit aa811bf
Show file tree
Hide file tree
Showing 13 changed files with 441 additions and 27 deletions.
26 changes: 26 additions & 0 deletions pdctl/command/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package command
import (
"fmt"
"net/http"
"net/url"
"strconv"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewAddSchedulerCommand() *cobra.Command {
c.AddCommand(NewEvictLeaderSchedulerCommand())
c.AddCommand(NewShuffleLeaderSchedulerCommand())
c.AddCommand(NewShuffleRegionSchedulerCommand())
c.AddCommand(NewScatterRangeSchedulerCommand())
return c
}

Expand Down Expand Up @@ -143,6 +145,30 @@ func addSchedulerCommandFunc(cmd *cobra.Command, args []string) {
postJSON(cmd, schedulersPrefix, input)
}

// NewScatterRangeSchedulerCommand returns a command to add a scatter-range-scheduler.
func NewScatterRangeSchedulerCommand() *cobra.Command {
c := &cobra.Command{
Use: "scatter-range <start_key> <end_key> <range_name>",
Short: "add a scheduler to scatter range",
Run: addSchedulerForScatterRangeCommandFunc,
}
return c
}

func addSchedulerForScatterRangeCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 3 {
fmt.Println(cmd.UsageString())
return
}

input := make(map[string]interface{})
input["name"] = cmd.Name()
input["start_key"] = url.QueryEscape(args[0])
input["end_key"] = url.QueryEscape(args[1])
input["range_name"] = args[2]
postJSON(cmd, schedulersPrefix, input)
}

// NewRemoveSchedulerCommand returns a command to remove scheduler.
func NewRemoveSchedulerCommand() *cobra.Command {
c := &cobra.Command{
Expand Down
19 changes: 19 additions & 0 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ func (h *schedulerHandler) Post(w http.ResponseWriter, r *http.Request) {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case "scatter-range":
var args []string
startKey, ok := input["start_key"].(string)
if ok {
args = append(args, startKey)
}
endKey, ok := input["end_key"].(string)
if ok {
args = append(args, endKey)
}
name, ok := input["range_name"].(string)
if ok {
args = append(args, name)
}
if err := h.AddScatterRangeScheduler(args...); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}

case "balance-adjacent-region-scheduler":
var args []string
leaderLimit, ok := input["leader_limit"].(string)
Expand Down
5 changes: 5 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ func (h *Handler) AddLabelScheduler() error {
return h.AddScheduler("label")
}

// AddScatterRangeScheduler adds a balance-range-leader-scheduler
func (h *Handler) AddScatterRangeScheduler(args ...string) error {
return h.AddScheduler("scatter-range", args...)
}

// AddAdjacentRegionScheduler adds a balance-adjacent-region-scheduler.
func (h *Handler) AddAdjacentRegionScheduler(args ...string) error {
return h.AddScheduler("adjacent-region", args...)
Expand Down
26 changes: 19 additions & 7 deletions server/schedule/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,45 @@ type BasicCluster struct {

// NewOpInfluence creates a OpInfluence.
func NewOpInfluence(operators []*Operator, cluster Cluster) OpInfluence {
m := make(map[uint64]*StoreInfluence)
influence := OpInfluence{
storesInfluence: make(map[uint64]*StoreInfluence),
regionsInfluence: make(map[uint64]*Operator),
}

for _, op := range operators {
if !op.IsTimeout() && !op.IsFinish() {
region := cluster.GetRegion(op.RegionID())
if region != nil {
op.Influence(m, region)
op.Influence(influence, region)
}
}
influence.regionsInfluence[op.RegionID()] = op
}

return m
return influence
}

// OpInfluence is a map of StoreInfluence.
type OpInfluence map[uint64]*StoreInfluence
// OpInfluence records the influence of the cluster.
type OpInfluence struct {
storesInfluence map[uint64]*StoreInfluence
regionsInfluence map[uint64]*Operator
}

// GetStoreInfluence get storeInfluence of specific store.
func (m OpInfluence) GetStoreInfluence(id uint64) *StoreInfluence {
storeInfluence, ok := m[id]
storeInfluence, ok := m.storesInfluence[id]
if !ok {
storeInfluence = &StoreInfluence{}
m[id] = storeInfluence
m.storesInfluence[id] = storeInfluence
}
return storeInfluence
}

// GetRegionsInfluence gets regionInfluence of specific region.
func (m OpInfluence) GetRegionsInfluence() map[uint64]*Operator {
return m.regionsInfluence
}

// StoreInfluence records influences that pending operators will make.
type StoreInfluence struct {
RegionSize int64
Expand Down
22 changes: 20 additions & 2 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,21 @@ func (mc *MockCluster) UpdateStorageReadBytes(storeID uint64, BytesRead uint64)
mc.PutStore(store)
}

// UpdateStoreStatus updates store status.
func (mc *MockCluster) UpdateStoreStatus(id uint64) {
mc.Stores.SetLeaderCount(id, mc.Regions.GetStoreLeaderCount(id))
mc.Stores.SetRegionCount(id, mc.Regions.GetStoreRegionCount(id))
mc.Stores.SetPendingPeerCount(id, mc.Regions.GetStorePendingPeerCount(id))
mc.Stores.SetLeaderSize(id, mc.Regions.GetStoreLeaderRegionSize(id))
mc.Stores.SetRegionSize(id, mc.Regions.GetStoreRegionSize(id))
store := mc.Stores.GetStore(id)
store.Stats = &pdpb.StoreStats{}
store.Stats.Capacity = 1000 * (1 << 20)
store.Stats.Available = store.Stats.Capacity - uint64(store.RegionSize)
store.Stats.UsedSize = uint64(store.RegionSize)
mc.PutStore(store)
}

func (mc *MockCluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIds ...uint64) *core.RegionInfo {
region := &metapb.Region{
Id: regionID,
Expand Down Expand Up @@ -303,7 +318,7 @@ func (mc *MockCluster) ApplyOperator(op *Operator) {
Id: s.PeerID,
StoreId: s.ToStore,
}
region.Peers = append(region.Peers, peer)
region.AddPeer(peer)
case RemovePeer:
if region.GetStorePeer(s.FromStore) == nil {
panic("Remove peer that doesn't exist")
Expand All @@ -318,13 +333,16 @@ func (mc *MockCluster) ApplyOperator(op *Operator) {
StoreId: s.ToStore,
IsLearner: true,
}
region.Peers = append(region.Peers, peer)
region.AddPeer(peer)
default:
panic("Unknown operator step")
}
}
}
mc.PutRegion(region)
for id := range region.GetStoreIds() {
mc.UpdateStoreStatus(id)
}
}

// GetOpt mocks method.
Expand Down
10 changes: 10 additions & 0 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,16 @@ func (o *Operator) Desc() string {
return o.desc
}

// SetDesc sets the description for the operator.
func (o *Operator) SetDesc(desc string) {
o.desc = desc
}

// AttachKind attaches an operator kind for the operator.
func (o *Operator) AttachKind(kind OperatorKind) {
o.kind |= kind
}

// RegionID returns the region that operator is targeted.
func (o *Operator) RegionID() uint64 {
return o.regionID
Expand Down
3 changes: 3 additions & 0 deletions server/schedule/operator_kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
OpReplica // Initiated by replica checkers.
OpBalance // Initiated by balancers.
OpMerge // Initiated by merge checkers.
OpRange // Initiated by range scheduler.
opMax
)

Expand All @@ -44,6 +45,7 @@ var flagToName = map[OperatorKind]string{
OpReplica: "replica",
OpBalance: "balance",
OpMerge: "merge",
OpRange: "range",
}

var nameToFlag = map[string]OperatorKind{
Expand All @@ -55,6 +57,7 @@ var nameToFlag = map[string]OperatorKind{
"replica": OpReplica,
"balance": OpBalance,
"merge": OpMerge,
"range": OpRange,
}

func (k OperatorKind) String() string {
Expand Down
25 changes: 13 additions & 12 deletions server/schedule/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,68 +103,69 @@ func (s *testOperatorSuite) TestOperator(c *C) {

func (s *testOperatorSuite) TestInfluence(c *C) {
region := s.newTestRegion(1, 1, [2]uint64{1, 1}, [2]uint64{2, 2})
opInfluence := make(map[uint64]*StoreInfluence)
opInfluence[1] = &StoreInfluence{}
opInfluence[2] = &StoreInfluence{}
opInfluence := OpInfluence{storesInfluence: make(map[uint64]*StoreInfluence)}
storeOpInfluence := opInfluence.storesInfluence
storeOpInfluence[1] = &StoreInfluence{}
storeOpInfluence[2] = &StoreInfluence{}

AddPeer{ToStore: 2, PeerID: 2}.Influence(opInfluence, region)
c.Assert(*opInfluence[2], DeepEquals, StoreInfluence{
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 0,
LeaderCount: 0,
RegionSize: 10,
RegionCount: 1,
})

TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region)
c.Assert(*opInfluence[1], DeepEquals, StoreInfluence{
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderCount: -1,
RegionSize: 0,
RegionCount: 0,
})
c.Assert(*opInfluence[2], DeepEquals, StoreInfluence{
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderCount: 1,
RegionSize: 10,
RegionCount: 1,
})

RemovePeer{FromStore: 1}.Influence(opInfluence, region)
c.Assert(*opInfluence[1], DeepEquals, StoreInfluence{
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderCount: -1,
RegionSize: -10,
RegionCount: -1,
})
c.Assert(*opInfluence[2], DeepEquals, StoreInfluence{
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderCount: 1,
RegionSize: 10,
RegionCount: 1,
})

MergeRegion{IsPassive: false}.Influence(opInfluence, region)
c.Assert(*opInfluence[1], DeepEquals, StoreInfluence{
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderCount: -1,
RegionSize: -10,
RegionCount: -1,
})
c.Assert(*opInfluence[2], DeepEquals, StoreInfluence{
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderCount: 1,
RegionSize: 10,
RegionCount: 1,
})

MergeRegion{IsPassive: true}.Influence(opInfluence, region)
c.Assert(*opInfluence[1], DeepEquals, StoreInfluence{
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderCount: -2,
RegionSize: -10,
RegionCount: -2,
})
c.Assert(*opInfluence[2], DeepEquals, StoreInfluence{
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderCount: 1,
RegionSize: 10,
Expand Down
Loading

0 comments on commit aa811bf

Please sign in to comment.