diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index 583b6eb8239..e6ae4a3fb12 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -34,6 +34,16 @@ const ( OperatorExpireTime = 3 * time.Second ) +// CancelReasonType is the type of cancel reason. +type CancelReasonType string + +var ( + // RegionNotFound is the cancel reason when the region is not found. + RegionNotFound CancelReasonType = "region not found" + // EpochNotMatch is the cancel reason when the region epoch is not match. + EpochNotMatch CancelReasonType = "epoch not match" +) + // Operator contains execution steps generated by scheduler. // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. type Operator struct { diff --git a/pkg/schedule/operator_controller.go b/pkg/schedule/operator_controller.go index 692584cc14b..68ad13bf64e 100644 --- a/pkg/schedule/operator_controller.go +++ b/pkg/schedule/operator_controller.go @@ -213,8 +213,10 @@ func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next if !ok || op == nil { return nil, true } - r = oc.cluster.GetRegion(regionID) - if r == nil { + // Check the operator lightly. It cant't dispatch the op for some scenario. + var reason operator.CancelReasonType + r, reason = oc.checkOperatorLightly(op) + if len(reason) != 0 { _ = oc.removeOperatorLocked(op) if op.Cancel() { log.Warn("remove operator because region disappeared", @@ -297,6 +299,7 @@ func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int if isMerge { // count two merge operators as one, so wopStatus.ops[desc] should // not be updated here + // TODO: call checkAddOperator ... i++ added++ oc.wop.PutOperator(ops[i]) @@ -434,6 +437,27 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato return !expired } +// checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion. +// The operators can't be dispatched for some scenarios, such as region disappeared, region changed ... +// `region` is the target region of `op`. +func (oc *OperatorController) checkOperatorLightly(op *operator.Operator) (*core.RegionInfo, operator.CancelReasonType) { + region := oc.cluster.GetRegion(op.RegionID()) + if region == nil { + operatorCounter.WithLabelValues(op.Desc(), "not-found").Inc() + return nil, operator.RegionNotFound + } + + // It may be suitable for all kinds of operator but not merge-region. + // But to be cautions, it only takes effect on merge-region currently. + // If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed. + // The changing for conf_version of epoch doesn't modify the region key range, skip it. + if (op.Kind()&operator.OpMerge != 0) && region.GetRegionEpoch().GetVersion() > op.RegionEpoch().GetVersion() { + operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc() + return nil, operator.EpochNotMatch + } + return region, "" +} + func isHigherPriorityOperator(new, old *operator.Operator) bool { return new.GetPriorityLevel() > old.GetPriorityLevel() } diff --git a/pkg/schedule/operator_controller_test.go b/pkg/schedule/operator_controller_test.go index aa65d74d284..3d2f01cab82 100644 --- a/pkg/schedule/operator_controller_test.go +++ b/pkg/schedule/operator_controller_test.go @@ -398,6 +398,134 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() { suite.False(next) } +// issue #7992 +func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() { + re := suite.Require() + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(suite.ctx, opts) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) + controller := NewOperatorController(suite.ctx, cluster, stream) + cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + + source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(source) + target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(target) + + ops, err := operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, 2) + re.Equal(2, controller.AddWaitingOperator(ops...)) + // Change next push time to now, it's used to make test case faster. + controller.opNotifierQueue[0].time = time.Now() + + // first poll gets source region op. + r, next := controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, source) + + // second poll gets target region op. + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, target) + + // third poll removes the two merge-region ops. + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Len(controller.opNotifierQueue, 1) + re.Len(controller.operators, 1) + re.Empty(controller.wop.ListOperator()) + re.NotNil(controller.opRecords.Get(101)) + + // fourth poll removes target region op from opNotifierQueue + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, target) + re.Len(controller.opNotifierQueue, 1) + delete(controller.operators, 101) + delete(controller.operators, 102) + _ = heap.Pop(&controller.opNotifierQueue).(*operatorWithTime) + re.Len(controller.opNotifierQueue, 0) + + // Add the two ops to waiting operators again. + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0} + controller.opRecords.ttl.Remove(101) + controller.opRecords.ttl.Remove(102) + ops, err = operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge) + re.NoError(err) + re.Equal(2, controller.AddWaitingOperator(ops...)) + re.Len(controller.opNotifierQueue, 2) + // change the target RegionEpoch + // first poll gets source region from opNotifierQueue + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, source) + + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Len(controller.opNotifierQueue, 1) + re.Len(controller.operators, 1) + re.Empty(controller.wop.ListOperator()) + re.NotNil(controller.opRecords.Get(102)) + + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, source) + re.Len(controller.opNotifierQueue, 1) +} + +func (suite *operatorControllerTestSuite) TestCheckOperatorLightly() { + re := suite.Require() + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(suite.ctx, opts) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) + controller := NewOperatorController(suite.ctx, cluster, stream) + cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + + source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(source) + target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(target) + + ops, err := operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, 2) + + // check successfully + r, reason := controller.checkOperatorLightly(ops[0]) + re.Empty(reason) + re.Equal(r, source) + + // check failed because of region disappeared + cluster.RemoveRegion(target) + r, reason = controller.checkOperatorLightly(ops[1]) + re.Nil(r) + re.Equal(reason, operator.RegionNotFound) + + // check failed because of verions of region epoch changed + cluster.PutRegion(target) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + r, reason = controller.checkOperatorLightly(ops[0]) + re.Nil(r) + re.Equal(reason, operator.EpochNotMatch) +} + func (suite *operatorControllerTestSuite) TestStoreLimit() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt)