Skip to content

Commit

Permalink
schedule: actively push operator (#1536)
Browse files Browse the repository at this point in the history
* schedule: actively push operator

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored May 19, 2019
1 parent 65eb45a commit b6150ca
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 26 deletions.
3 changes: 2 additions & 1 deletion server/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pkg/errors"
"go.uber.org/zap"
)
Expand All @@ -39,7 +40,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
return errors.Errorf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta()))
}

c.coordinator.opController.Dispatch(region)
c.coordinator.opController.Dispatch(region, schedule.DispatchFromHeartBeat)
return nil
}

Expand Down
22 changes: 21 additions & 1 deletion server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ func (c *coordinator) patrolRegions() {
}
}

// drivePushOperator is used to push the unfinished operator to the excutor.
func (c *coordinator) drivePushOperator() {
defer logutil.LogPanic()

defer c.wg.Done()
log.Info("coordinator begins to actively drive push operator")
ticker := time.NewTicker(schedule.PushOperatorTickInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
log.Info("drive push operator has been stopped")
return
case <-ticker.C:
c.opController.PushOperators()
}
}
}

func (c *coordinator) checkRegion(region *core.RegionInfo) bool {
// If PD has restarted, it need to check learners added before and promote them.
// Don't check isRaftLearnerEnabled cause it maybe disable learner feature but there are still some learners to promote.
Expand Down Expand Up @@ -228,9 +247,10 @@ func (c *coordinator) run() {
log.Error("cannot persist schedule config", zap.Error(err))
}

c.wg.Add(1)
c.wg.Add(2)
// Starts to patrol regions.
go c.patrolRegions()
go c.drivePushOperator()
}

func (c *coordinator) stop() {
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *m
if err := co.cluster.putRegion(region.Clone()); err != nil {
return err
}
co.opController.Dispatch(region)
co.opController.Dispatch(region, schedule.DispatchFromHeartBeat)
return nil
}

Expand Down
116 changes: 96 additions & 20 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package schedule

import (
"container/heap"
"container/list"
"fmt"
"sync"
Expand All @@ -28,7 +29,20 @@ import (
"go.uber.org/zap"
)

var historyKeepTime = 5 * time.Minute
// The source of dispatched region.
const (
DispatchFromHeartBeat = "heartbeat"
DispatchFromNotifierQueue = "active push"
DispatchFromCreate = "create"
)

var (
historyKeepTime = 5 * time.Minute
slowNotifyInterval = 5 * time.Second
fastNotifyInterval = 2 * time.Second
// PushOperatorTickInterval is the interval try to push the operator.
PushOperatorTickInterval = 500 * time.Millisecond
)

// HeartbeatStreams is an interface of async region heartbeat.
type HeartbeatStreams interface {
Expand All @@ -38,34 +52,36 @@ type HeartbeatStreams interface {
// OperatorController is used to limit the speed of scheduling.
type OperatorController struct {
sync.RWMutex
cluster Cluster
operators map[uint64]*Operator
hbStreams HeartbeatStreams
histories *list.List
counts map[OperatorKind]uint64
opRecords *OperatorRecords
cluster Cluster
operators map[uint64]*Operator
hbStreams HeartbeatStreams
histories *list.List
counts map[OperatorKind]uint64
opRecords *OperatorRecords
opNotifierQueue operatorQueue
}

// NewOperatorController creates a OperatorController.
func NewOperatorController(cluster Cluster, hbStreams HeartbeatStreams) *OperatorController {
return &OperatorController{
cluster: cluster,
operators: make(map[uint64]*Operator),
hbStreams: hbStreams,
histories: list.New(),
counts: make(map[OperatorKind]uint64),
opRecords: NewOperatorRecords(),
cluster: cluster,
operators: make(map[uint64]*Operator),
hbStreams: hbStreams,
histories: list.New(),
counts: make(map[OperatorKind]uint64),
opRecords: NewOperatorRecords(),
opNotifierQueue: make(operatorQueue, 0),
}
}

// Dispatch is used to dispatch the operator of a region.
func (oc *OperatorController) Dispatch(region *core.RegionInfo) {
func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
// Check existed operator.
if op := oc.GetOperator(region.GetID()); op != nil {
timeout := op.IsTimeout()
if step := op.Check(region); step != nil && !timeout {
operatorCounter.WithLabelValues(op.Desc(), "check").Inc()
oc.SendScheduleCommand(region, step)
oc.SendScheduleCommand(region, step, source)
return
}
if op.IsFinish() {
Expand All @@ -83,6 +99,65 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo) {
}
}

func (oc *OperatorController) getNextPushOperatorTime(step OperatorStep, now time.Time) time.Time {
nextTime := slowNotifyInterval
switch step.(type) {
case TransferLeader, PromoteLearner:
nextTime = fastNotifyInterval
}
return now.Add(nextTime)
}

// pollNeedDispatchRegion returns the region need to dispatch,
// "next" is true to indicate that it may exist in next attempt,
// and false is the end for the poll.
func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
oc.Lock()
defer oc.Unlock()
if oc.opNotifierQueue.Len() == 0 {
return nil, false
}
item := heap.Pop(&oc.opNotifierQueue).(*operatorWithTime)
regionID := item.op.regionID
op, ok := oc.operators[regionID]
if !ok || op == nil {
return nil, true
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
return nil, true
}
step := op.Check(r)
if step == nil {
return nil, true
}
now := time.Now()
if now.Before(item.time) {
heap.Push(&oc.opNotifierQueue, item)
return nil, false
}

// pushes with new notify time.
item.time = oc.getNextPushOperatorTime(step, now)
heap.Push(&oc.opNotifierQueue, item)
return r, true
}

// PushOperators periodically pushes the unfinished operator to the executor(TiKV).
func (oc *OperatorController) PushOperators() {
for {
r, next := oc.pollNeedDispatchRegion()
if !next {
break
}
if r == nil {
continue
}

oc.Dispatch(r, DispatchFromNotifierQueue)
}
}

// AddOperator adds operators to the running operators.
func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
oc.Lock()
Expand All @@ -98,7 +173,6 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
for _, op := range ops {
oc.addOperatorLocked(op)
}

return true
}

Expand Down Expand Up @@ -145,12 +219,14 @@ func (oc *OperatorController) addOperatorLocked(op *Operator) bool {
oc.operators[regionID] = op
oc.updateCounts(oc.operators)

var step OperatorStep
if region := oc.cluster.GetRegion(op.RegionID()); region != nil {
if step := op.Check(region); step != nil {
oc.SendScheduleCommand(region, step)
if step = op.Check(region); step != nil {
oc.SendScheduleCommand(region, step, DispatchFromCreate)
}
}

heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op, time: oc.getNextPushOperatorTime(step, time.Now())})
operatorCounter.WithLabelValues(op.Desc(), "create").Inc()
return true
}
Expand Down Expand Up @@ -203,8 +279,8 @@ func (oc *OperatorController) GetOperators() []*Operator {
}

// SendScheduleCommand sends a command to the region.
func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step OperatorStep) {
log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step))
func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step OperatorStep, source string) {
log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), zap.String("source", source))
switch st := step.(type) {
case TransferLeader:
cmd := &pdpb.RegionHeartbeatResponse{
Expand Down
54 changes: 51 additions & 3 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package schedule

import (
"container/heap"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -84,11 +85,58 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) {
op1.createTime = time.Now().Add(-10 * time.Minute)
region2 = tc.ApplyOperatorStep(region2, op2)
tc.PutRegion(region2)
oc.Dispatch(region1)
oc.Dispatch(region2)
oc.Dispatch(region1, "test")
oc.Dispatch(region2, "test")
c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_TIMEOUT)
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_RUNNING)
tc.ApplyOperator(op2)
oc.Dispatch(region2)
oc.Dispatch(region2, "test")
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS)
}

func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) {
opt := NewMockSchedulerOptions()
tc := NewMockCluster(opt)
oc := NewOperatorController(tc, nil)
oc.hbStreams = mockHeadbeatStream{}
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderRegion(1, 1, 2)
tc.AddLeaderRegion(2, 1, 2)
steps := []OperatorStep{
RemovePeer{FromStore: 2},
AddPeer{ToStore: 2, PeerID: 4},
}
op1 := NewOperator("test", 1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
op2 := NewOperator("test", 2, &metapb.RegionEpoch{}, OpRegion, steps...)
region1 := tc.GetRegion(1)
region2 := tc.GetRegion(2)
// Adds operator and pushes to the notifier queue.
{
oc.SetOperator(op1)
oc.SetOperator(op2)
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op1, time: time.Now().Add(100 * time.Millisecond)})
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op2, time: time.Now().Add(500 * time.Millisecond)})
}
// fisrt poll got nil
r, next := oc.pollNeedDispatchRegion()
c.Assert(r, IsNil)
c.Assert(next, IsFalse)

// after wait 100 millisecond, the region1 need to dispatch, but not region2.
time.Sleep(100 * time.Millisecond)
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region1.GetID())
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, IsNil)
c.Assert(next, IsFalse)

// after waiting 500 millseconds, the region2 need to dispatch
time.Sleep(400 * time.Millisecond)
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region2.GetID())
}
49 changes: 49 additions & 0 deletions server/schedule/operator_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule

import "time"

type operatorWithTime struct {
op *Operator
time time.Time
}

type operatorQueue []*operatorWithTime

func (opn operatorQueue) Len() int { return len(opn) }

func (opn operatorQueue) Less(i, j int) bool {
return opn[i].time.Before(opn[j].time)
}

func (opn operatorQueue) Swap(i, j int) {
opn[i], opn[j] = opn[j], opn[i]
}

func (opn *operatorQueue) Push(x interface{}) {
item := x.(*operatorWithTime)
*opn = append(*opn, item)
}

func (opn *operatorQueue) Pop() interface{} {
old := *opn
n := len(old)
if n == 0 {
return nil
}
item := old[n-1]
*opn = old[0 : n-1]
return item
}

0 comments on commit b6150ca

Please sign in to comment.