From b6150ca12678f89b4f01f87e434d5b148c6c35d2 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Sun, 19 May 2019 15:16:47 +0800 Subject: [PATCH] schedule: actively push operator (#1536) * schedule: actively push operator Signed-off-by: nolouch --- server/cluster_worker.go | 3 +- server/coordinator.go | 22 +++- server/coordinator_test.go | 2 +- server/schedule/operator_controller.go | 116 ++++++++++++++++---- server/schedule/operator_controller_test.go | 54 ++++++++- server/schedule/operator_queue.go | 49 +++++++++ 6 files changed, 220 insertions(+), 26 deletions(-) create mode 100644 server/schedule/operator_queue.go diff --git a/server/cluster_worker.go b/server/cluster_worker.go index b1b5ba22c1f..175452697ba 100644 --- a/server/cluster_worker.go +++ b/server/cluster_worker.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/schedule" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -39,7 +40,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return errors.Errorf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta())) } - c.coordinator.opController.Dispatch(region) + c.coordinator.opController.Dispatch(region, schedule.DispatchFromHeartBeat) return nil } diff --git a/server/coordinator.go b/server/coordinator.go index df2463eecd6..2e5d03e6c3a 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -131,6 +131,25 @@ func (c *coordinator) patrolRegions() { } } +// drivePushOperator is used to push the unfinished operator to the excutor. +func (c *coordinator) drivePushOperator() { + defer logutil.LogPanic() + + defer c.wg.Done() + log.Info("coordinator begins to actively drive push operator") + ticker := time.NewTicker(schedule.PushOperatorTickInterval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + log.Info("drive push operator has been stopped") + return + case <-ticker.C: + c.opController.PushOperators() + } + } +} + func (c *coordinator) checkRegion(region *core.RegionInfo) bool { // If PD has restarted, it need to check learners added before and promote them. // Don't check isRaftLearnerEnabled cause it maybe disable learner feature but there are still some learners to promote. @@ -228,9 +247,10 @@ func (c *coordinator) run() { log.Error("cannot persist schedule config", zap.Error(err)) } - c.wg.Add(1) + c.wg.Add(2) // Starts to patrol regions. go c.patrolRegions() + go c.drivePushOperator() } func (c *coordinator) stop() { diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 9e3ef73ce30..224eef41de8 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -259,7 +259,7 @@ func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *m if err := co.cluster.putRegion(region.Clone()); err != nil { return err } - co.opController.Dispatch(region) + co.opController.Dispatch(region, schedule.DispatchFromHeartBeat) return nil } diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 1c023e84c22..adb78be4f20 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -14,6 +14,7 @@ package schedule import ( + "container/heap" "container/list" "fmt" "sync" @@ -28,7 +29,20 @@ import ( "go.uber.org/zap" ) -var historyKeepTime = 5 * time.Minute +// The source of dispatched region. +const ( + DispatchFromHeartBeat = "heartbeat" + DispatchFromNotifierQueue = "active push" + DispatchFromCreate = "create" +) + +var ( + historyKeepTime = 5 * time.Minute + slowNotifyInterval = 5 * time.Second + fastNotifyInterval = 2 * time.Second + // PushOperatorTickInterval is the interval try to push the operator. + PushOperatorTickInterval = 500 * time.Millisecond +) // HeartbeatStreams is an interface of async region heartbeat. type HeartbeatStreams interface { @@ -38,34 +52,36 @@ type HeartbeatStreams interface { // OperatorController is used to limit the speed of scheduling. type OperatorController struct { sync.RWMutex - cluster Cluster - operators map[uint64]*Operator - hbStreams HeartbeatStreams - histories *list.List - counts map[OperatorKind]uint64 - opRecords *OperatorRecords + cluster Cluster + operators map[uint64]*Operator + hbStreams HeartbeatStreams + histories *list.List + counts map[OperatorKind]uint64 + opRecords *OperatorRecords + opNotifierQueue operatorQueue } // NewOperatorController creates a OperatorController. func NewOperatorController(cluster Cluster, hbStreams HeartbeatStreams) *OperatorController { return &OperatorController{ - cluster: cluster, - operators: make(map[uint64]*Operator), - hbStreams: hbStreams, - histories: list.New(), - counts: make(map[OperatorKind]uint64), - opRecords: NewOperatorRecords(), + cluster: cluster, + operators: make(map[uint64]*Operator), + hbStreams: hbStreams, + histories: list.New(), + counts: make(map[OperatorKind]uint64), + opRecords: NewOperatorRecords(), + opNotifierQueue: make(operatorQueue, 0), } } // Dispatch is used to dispatch the operator of a region. -func (oc *OperatorController) Dispatch(region *core.RegionInfo) { +func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { // Check existed operator. if op := oc.GetOperator(region.GetID()); op != nil { timeout := op.IsTimeout() if step := op.Check(region); step != nil && !timeout { operatorCounter.WithLabelValues(op.Desc(), "check").Inc() - oc.SendScheduleCommand(region, step) + oc.SendScheduleCommand(region, step, source) return } if op.IsFinish() { @@ -83,6 +99,65 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo) { } } +func (oc *OperatorController) getNextPushOperatorTime(step OperatorStep, now time.Time) time.Time { + nextTime := slowNotifyInterval + switch step.(type) { + case TransferLeader, PromoteLearner: + nextTime = fastNotifyInterval + } + return now.Add(nextTime) +} + +// pollNeedDispatchRegion returns the region need to dispatch, +// "next" is true to indicate that it may exist in next attempt, +// and false is the end for the poll. +func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { + oc.Lock() + defer oc.Unlock() + if oc.opNotifierQueue.Len() == 0 { + return nil, false + } + item := heap.Pop(&oc.opNotifierQueue).(*operatorWithTime) + regionID := item.op.regionID + op, ok := oc.operators[regionID] + if !ok || op == nil { + return nil, true + } + r = oc.cluster.GetRegion(regionID) + if r == nil { + return nil, true + } + step := op.Check(r) + if step == nil { + return nil, true + } + now := time.Now() + if now.Before(item.time) { + heap.Push(&oc.opNotifierQueue, item) + return nil, false + } + + // pushes with new notify time. + item.time = oc.getNextPushOperatorTime(step, now) + heap.Push(&oc.opNotifierQueue, item) + return r, true +} + +// PushOperators periodically pushes the unfinished operator to the executor(TiKV). +func (oc *OperatorController) PushOperators() { + for { + r, next := oc.pollNeedDispatchRegion() + if !next { + break + } + if r == nil { + continue + } + + oc.Dispatch(r, DispatchFromNotifierQueue) + } +} + // AddOperator adds operators to the running operators. func (oc *OperatorController) AddOperator(ops ...*Operator) bool { oc.Lock() @@ -98,7 +173,6 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool { for _, op := range ops { oc.addOperatorLocked(op) } - return true } @@ -145,12 +219,14 @@ func (oc *OperatorController) addOperatorLocked(op *Operator) bool { oc.operators[regionID] = op oc.updateCounts(oc.operators) + var step OperatorStep if region := oc.cluster.GetRegion(op.RegionID()); region != nil { - if step := op.Check(region); step != nil { - oc.SendScheduleCommand(region, step) + if step = op.Check(region); step != nil { + oc.SendScheduleCommand(region, step, DispatchFromCreate) } } + heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op, time: oc.getNextPushOperatorTime(step, time.Now())}) operatorCounter.WithLabelValues(op.Desc(), "create").Inc() return true } @@ -203,8 +279,8 @@ func (oc *OperatorController) GetOperators() []*Operator { } // SendScheduleCommand sends a command to the region. -func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step OperatorStep) { - log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step)) +func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step OperatorStep, source string) { + log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), zap.String("source", source)) switch st := step.(type) { case TransferLeader: cmd := &pdpb.RegionHeartbeatResponse{ diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index 7010be46cde..56f52edb51a 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -14,6 +14,7 @@ package schedule import ( + "container/heap" "time" . "github.com/pingcap/check" @@ -84,11 +85,58 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) { op1.createTime = time.Now().Add(-10 * time.Minute) region2 = tc.ApplyOperatorStep(region2, op2) tc.PutRegion(region2) - oc.Dispatch(region1) - oc.Dispatch(region2) + oc.Dispatch(region1, "test") + oc.Dispatch(region2, "test") c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_TIMEOUT) c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_RUNNING) tc.ApplyOperator(op2) - oc.Dispatch(region2) + oc.Dispatch(region2, "test") c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS) } + +func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) { + opt := NewMockSchedulerOptions() + tc := NewMockCluster(opt) + oc := NewOperatorController(tc, nil) + oc.hbStreams = mockHeadbeatStream{} + tc.AddLeaderStore(1, 2) + tc.AddLeaderStore(2, 0) + tc.AddLeaderRegion(1, 1, 2) + tc.AddLeaderRegion(2, 1, 2) + steps := []OperatorStep{ + RemovePeer{FromStore: 2}, + AddPeer{ToStore: 2, PeerID: 4}, + } + op1 := NewOperator("test", 1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) + op2 := NewOperator("test", 2, &metapb.RegionEpoch{}, OpRegion, steps...) + region1 := tc.GetRegion(1) + region2 := tc.GetRegion(2) + // Adds operator and pushes to the notifier queue. + { + oc.SetOperator(op1) + oc.SetOperator(op2) + heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op1, time: time.Now().Add(100 * time.Millisecond)}) + heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op2, time: time.Now().Add(500 * time.Millisecond)}) + } + // fisrt poll got nil + r, next := oc.pollNeedDispatchRegion() + c.Assert(r, IsNil) + c.Assert(next, IsFalse) + + // after wait 100 millisecond, the region1 need to dispatch, but not region2. + time.Sleep(100 * time.Millisecond) + r, next = oc.pollNeedDispatchRegion() + c.Assert(r, NotNil) + c.Assert(next, IsTrue) + c.Assert(r.GetID(), Equals, region1.GetID()) + r, next = oc.pollNeedDispatchRegion() + c.Assert(r, IsNil) + c.Assert(next, IsFalse) + + // after waiting 500 millseconds, the region2 need to dispatch + time.Sleep(400 * time.Millisecond) + r, next = oc.pollNeedDispatchRegion() + c.Assert(r, NotNil) + c.Assert(next, IsTrue) + c.Assert(r.GetID(), Equals, region2.GetID()) +} diff --git a/server/schedule/operator_queue.go b/server/schedule/operator_queue.go new file mode 100644 index 00000000000..22f8d5e0a10 --- /dev/null +++ b/server/schedule/operator_queue.go @@ -0,0 +1,49 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedule + +import "time" + +type operatorWithTime struct { + op *Operator + time time.Time +} + +type operatorQueue []*operatorWithTime + +func (opn operatorQueue) Len() int { return len(opn) } + +func (opn operatorQueue) Less(i, j int) bool { + return opn[i].time.Before(opn[j].time) +} + +func (opn operatorQueue) Swap(i, j int) { + opn[i], opn[j] = opn[j], opn[i] +} + +func (opn *operatorQueue) Push(x interface{}) { + item := x.(*operatorWithTime) + *opn = append(*opn, item) +} + +func (opn *operatorQueue) Pop() interface{} { + old := *opn + n := len(old) + if n == 0 { + return nil + } + item := old[n-1] + *opn = old[0 : n-1] + return item +}