Skip to content

Commit

Permalink
cherry-pick commits to release-2.0 (#1096)
Browse files Browse the repository at this point in the history
* server: fix the issue that panic when collecting hot-cache metrics (#1091)

* server: fix the issue that panic when collecting hot-cache metrics

* server, schedule: check region epoch before adding operators. (#1095)

* server, schedule: check region epoch before adding operators.

* add test.
  • Loading branch information
nolouch authored and siddontang committed May 31, 2018
1 parent 7e98269 commit f388273
Show file tree
Hide file tree
Showing 15 changed files with 125 additions and 70 deletions.
2 changes: 2 additions & 0 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ func (c *clusterInfo) collectMetrics() {
defer c.RUnlock()
c.regionStats.Collect()
c.labelLevelStats.Collect()
// collect hot cache metrics
c.HotCache.CollectMetrics(c.Stores)
}

func (c *clusterInfo) GetRegionStatsByType(typ regionStatisticType) []*core.RegionInfo {
Expand Down
50 changes: 24 additions & 26 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (c *coordinator) checkRegion(region *core.RegionInfo) bool {
ToStore: p.GetStoreId(),
PeerID: p.GetId(),
}
op := schedule.NewOperator("promoteLearner", region.GetId(), schedule.OpRegion, step)
op := schedule.NewOperator("promoteLearner", region.GetId(), region.GetRegionEpoch(), schedule.OpRegion, step)
if c.addOperator(op) {
return true
}
Expand All @@ -188,7 +188,7 @@ func (c *coordinator) checkRegion(region *core.RegionInfo) bool {
if c.limiter.OperatorCount(schedule.OpMerge) < c.cluster.GetMergeScheduleLimit() {
if op1, op2 := c.mergeChecker.Check(region); op1 != nil && op2 != nil {
// make sure two operators can add successfully altogether
if c.addOperators(op1, op2) {
if c.addOperator(op1, op2) {
return true
}
}
Expand Down Expand Up @@ -365,8 +365,6 @@ func (c *coordinator) collectHotSpotMetrics() {
}
}

// collect hot cache metrics
c.cluster.HotCache.CollectMetrics(c.cluster.Stores)
}

func (c *coordinator) shouldRun() bool {
Expand Down Expand Up @@ -430,11 +428,7 @@ func (c *coordinator) runScheduler(s *scheduleController) {
}
opInfluence := schedule.NewOpInfluence(c.getOperators(), c.cluster)
if op := s.Schedule(c.cluster, opInfluence); op != nil {
if len(op) == 1 {
c.addOperator(op[0])
} else {
c.addOperators(op...)
}
c.addOperator(op...)
}

case <-s.Ctx().Done():
Expand All @@ -449,14 +443,9 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool {

log.Infof("[region %v] add operator: %s", regionID, op)

// If the new operator passed in has higher priorities than the old one,
// then replace the old operator.
// If there is an old operator, replace it. The priority should be checked
// already.
if old, ok := c.operators[regionID]; ok {
if !isHigherPriorityOperator(op, old) {
log.Infof("[region %v] cancel add operator, old: %s", regionID, old)
operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc()
return false
}
log.Infof("[region %v] replace old operator: %s", regionID, old)
operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc()
c.removeOperatorLocked(old)
Expand All @@ -475,20 +464,12 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool {
return true
}

func (c *coordinator) addOperator(op *schedule.Operator) bool {
c.Lock()
defer c.Unlock()

return c.addOperatorLocked(op)
}

func (c *coordinator) addOperators(ops ...*schedule.Operator) bool {
func (c *coordinator) addOperator(ops ...*schedule.Operator) bool {
c.Lock()
defer c.Unlock()

for _, op := range ops {
if old := c.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
log.Infof("[region %v] cancel add operators, old: %s", op.RegionID(), old)
if !c.checkAddOperator(op) {
operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc()
return false
}
Expand All @@ -500,6 +481,23 @@ func (c *coordinator) addOperators(ops ...*schedule.Operator) bool {
return true
}

func (c *coordinator) checkAddOperator(op *schedule.Operator) bool {
region := c.cluster.GetRegion(op.RegionID())
if region == nil {
log.Debugf("[region %v] region not found, cancel add operator", op.RegionID())
return false
}
if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() {
log.Debugf("[region %v] region epoch not match, %v vs %v, cancel add operator", op.RegionID(), region.GetRegionEpoch(), op.RegionEpoch())
return false
}
if old := c.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
log.Debugf("[region %v] already have operator %s, cancel add operator", op.RegionID(), old)
return false
}
return true
}

func isHigherPriorityOperator(new, old *schedule.Operator) bool {
return new.GetPriorityLevel() < old.GetPriorityLevel()
}
Expand Down
78 changes: 63 additions & 15 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"fmt"
"math/rand"
"time"

. "github.com/pingcap/check"
Expand All @@ -28,8 +29,8 @@ import (
"github.com/pingcap/pd/server/schedulers"
)

func newTestOperator(regionID uint64, kind schedule.OperatorKind) *schedule.Operator {
return schedule.NewOperator("test", regionID, kind)
func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind) *schedule.Operator {
return schedule.NewOperator("test", regionID, regionEpoch, kind)
}

func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption) {
Expand All @@ -53,9 +54,10 @@ func newTestClusterInfo(opt *scheduleOption) *testClusterInfo {

func newTestRegionMeta(regionID uint64) *metapb.Region {
return &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1},
}
}

Expand Down Expand Up @@ -134,13 +136,15 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
l := co.limiter

op1 := newTestOperator(1, schedule.OpLeader)
tc.addLeaderRegion(1, 1)

op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpLeader)
co.addOperator(op1)
c.Assert(l.OperatorCount(op1.Kind()), Equals, uint64(1))
c.Assert(co.getOperator(1).RegionID(), Equals, op1.RegionID())

// Region 1 already has an operator, cannot add another one.
op2 := newTestOperator(1, schedule.OpRegion)
op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion)
co.addOperator(op2)
c.Assert(l.OperatorCount(op2.Kind()), Equals, uint64(0))

Expand Down Expand Up @@ -234,6 +238,29 @@ func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *m
co.dispatch(region)
}

func (s *testCoordinatorSuite) TestCollectMetrics(c *C) {
_, opt := newTestScheduleConfig()
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
co.run()
// Make sure there are no problem when concurrent write and read
for i := 0; i <= 10; i++ {
go func(i int) {
for j := 0; j < 10000; j++ {
tc.addRegionStore(uint64(i%5), rand.Intn(200))
}
}(i)
}
for i := 0; i < 1000; i++ {
co.collectHotSpotMetrics()
co.collectSchedulerMetrics()
co.cluster.collectMetrics()
}
}

func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
cfg, opt := newTestScheduleConfig()
cfg.EnableRaftLearner = true
Expand Down Expand Up @@ -615,21 +642,21 @@ func (s *testScheduleLimiterSuite) TestOperatorCount(c *C) {

operators := make(map[uint64]*schedule.Operator)

operators[1] = newTestOperator(1, schedule.OpLeader)
operators[1] = newTestOperator(1, nil, schedule.OpLeader)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(1)) // 1:leader
operators[2] = newTestOperator(2, schedule.OpLeader)
operators[2] = newTestOperator(2, nil, schedule.OpLeader)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(2)) // 1:leader, 2:leader
delete(operators, 1)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(1)) // 2:leader

operators[1] = newTestOperator(1, schedule.OpRegion)
operators[1] = newTestOperator(1, nil, schedule.OpRegion)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpRegion), Equals, uint64(1)) // 1:region 2:leader
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(1))
operators[2] = newTestOperator(2, schedule.OpRegion)
operators[2] = newTestOperator(2, nil, schedule.OpRegion)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpRegion), Equals, uint64(2)) // 1:region 2:region
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(0))
Expand Down Expand Up @@ -657,6 +684,9 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
hbStreams := newHeartbeatStreams(tc.getClusterID())
defer hbStreams.Close()

tc.addLeaderRegion(1, 1)
tc.addLeaderRegion(2, 2)

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
scheduler, err := schedule.CreateScheduler("balance-leader", co.limiter)
c.Assert(err, IsNil)
Expand All @@ -676,11 +706,11 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
lb.limit = 2
// count = 0
c.Assert(sc.AllowSchedule(), IsTrue)
op1 := newTestOperator(1, schedule.OpLeader)
op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpLeader)
c.Assert(co.addOperator(op1), IsTrue)
// count = 1
c.Assert(sc.AllowSchedule(), IsTrue)
op2 := newTestOperator(2, schedule.OpLeader)
op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), schedule.OpLeader)
c.Assert(co.addOperator(op2), IsTrue)
// count = 2
c.Assert(sc.AllowSchedule(), IsFalse)
Expand All @@ -689,7 +719,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
c.Assert(sc.AllowSchedule(), IsTrue)

// add a PriorityKind operator will remove old operator
op3 := newTestOperator(2, schedule.OpHotRegion)
op3 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), schedule.OpHotRegion)
op3.SetPriorityLevel(core.HighPriority)
c.Assert(co.addOperator(op1), IsTrue)
c.Assert(sc.AllowSchedule(), IsFalse)
Expand All @@ -700,10 +730,28 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
// add a admin operator will remove old operator
c.Assert(co.addOperator(op2), IsTrue)
c.Assert(sc.AllowSchedule(), IsFalse)
op4 := newTestOperator(2, schedule.OpAdmin)
op4 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), schedule.OpAdmin)
op4.SetPriorityLevel(core.HighPriority)
c.Assert(co.addOperator(op4), IsTrue)
c.Assert(sc.AllowSchedule(), IsTrue)
co.removeOperator(op4)

// test wrong region id.
op5 := newTestOperator(3, &metapb.RegionEpoch{}, schedule.OpHotRegion)
c.Assert(co.addOperator(op5), IsFalse)

// test wrong region epoch.
co.removeOperator(op1)
epoch := &metapb.RegionEpoch{
Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1,
ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(),
}
op6 := newTestOperator(1, epoch, schedule.OpLeader)
c.Assert(co.addOperator(op6), IsFalse)
epoch.Version--
op6 = newTestOperator(1, epoch, schedule.OpLeader)
c.Assert(co.addOperator(op6), IsTrue)
co.removeOperator(op6)
}

func (s *testScheduleControllerSuite) TestInterval(c *C) {
Expand Down
10 changes: 5 additions & 5 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err
}

step := schedule.TransferLeader{FromStore: region.Leader.GetStoreId(), ToStore: newLeader.GetStoreId()}
op := schedule.NewOperator("adminTransferLeader", regionID, schedule.OpAdmin|schedule.OpLeader, step)
op := schedule.NewOperator("adminTransferLeader", regionID, region.GetRegionEpoch(), schedule.OpAdmin|schedule.OpLeader, step)
if ok := c.addOperator(op); !ok {
return errors.Trace(errAddOperator)
}
Expand Down Expand Up @@ -358,7 +358,7 @@ func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64
steps = append(steps, schedule.RemovePeer{FromStore: peer.GetStoreId()})
}

op := schedule.NewOperator("adminMoveRegion", regionID, schedule.OpAdmin|schedule.OpRegion, steps...)
op := schedule.NewOperator("adminMoveRegion", regionID, region.GetRegionEpoch(), schedule.OpAdmin|schedule.OpRegion, steps...)
if ok := c.addOperator(op); !ok {
return errors.Trace(errAddOperator)
}
Expand Down Expand Up @@ -432,7 +432,7 @@ func (h *Handler) AddAddPeerOperator(regionID uint64, toStoreID uint64) error {
schedule.AddPeer{ToStore: toStoreID, PeerID: newPeer.GetId()},
}
}
op := schedule.NewOperator("adminAddPeer", regionID, schedule.OpAdmin|schedule.OpRegion, steps...)
op := schedule.NewOperator("adminAddPeer", regionID, region.GetRegionEpoch(), schedule.OpAdmin|schedule.OpRegion, steps...)
if ok := c.addOperator(op); !ok {
return errors.Trace(errAddOperator)
}
Expand Down Expand Up @@ -499,7 +499,7 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error
if err != nil {
return errors.Trace(err)
}
if ok := c.addOperators(op1, op2); !ok {
if ok := c.addOperator(op1, op2); !ok {
return errors.Trace(ErrAddOperator)
}
return nil
Expand All @@ -518,7 +518,7 @@ func (h *Handler) AddSplitRegionOperator(regionID uint64) error {
}

step := schedule.SplitRegion{StartKey: region.StartKey, EndKey: region.EndKey}
op := schedule.NewOperator("adminSplitRegion", regionID, schedule.OpAdmin, step)
op := schedule.NewOperator("adminSplitRegion", regionID, region.GetRegionEpoch(), schedule.OpAdmin, step)
if ok := c.addOperator(op); !ok {
return errors.Trace(errAddOperator)
}
Expand Down
Loading

0 comments on commit f388273

Please sign in to comment.