Skip to content

Commit

Permalink
Merge branch 'master' into add-more-info-to-ts-fallback-log
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jun 28, 2023
2 parents 3a48751 + 01015a6 commit f70e174
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 48 deletions.
1 change: 1 addition & 0 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func BuildForwardContext(ctx context.Context, addr string) context.Context {
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tlsutil.TLSConfig, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
if ok {
// TODO: check the connection state.
return conn.(*grpc.ClientConn), nil
}
tlsConfig, err := tlsCfg.ToTLSConfig()
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() {
suite.NotNil(ops)
suite.True(oc.AddOperator(ops...))
for _, op := range ops {
oc.RemoveOperator(op)
oc.RemoveOperator(op, operator.ExceedStoreLimit)
}
}
regions[2] = regions[2].Clone(
Expand All @@ -498,7 +498,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() {
suite.NotNil(ops)
suite.True(oc.AddOperator(ops...))
for _, op := range ops {
oc.RemoveOperator(op)
oc.RemoveOperator(op, operator.ExceedStoreLimit)
}
}
{
Expand Down
38 changes: 37 additions & 1 deletion pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ const (
// OperatorExpireTime is the duration that when an operator is not started
// after it, the operator will be considered expired.
OperatorExpireTime = 3 * time.Second
cancelReason = "cancel-reason"
)

// CancelReasonType is the type of cancel reason.
type CancelReasonType string

var (
// RegionNotFound is the cancel reason when the region is not found.
RegionNotFound CancelReasonType = "region not found"
// EpochNotMatch is the cancel reason when the region epoch is not match.
EpochNotMatch CancelReasonType = "epoch not match"
// AlreadyExist is the cancel reason when the operator is running.
AlreadyExist CancelReasonType = "already exist"
// AdminStop is the cancel reason when the operator is stopped by adminer.
AdminStop CancelReasonType = "admin stop"
// NotInRunningState is the cancel reason when the operator is not in running state.
NotInRunningState CancelReasonType = "not in running state"
// Succeed is the cancel reason when the operator is finished successfully.
Succeed CancelReasonType = "succeed"
// Timeout is the cancel reason when the operator is timeout.
Timeout CancelReasonType = "timeout"
// Expired is the cancel reason when the operator is expired.
Expired CancelReasonType = "expired"
// NotInCreateStatus is the cancel reason when the operator is not in create status.
NotInCreateStatus CancelReasonType = "not in create status"
// StaleStatus is the cancel reason when the operator is in a stale status.
StaleStatus CancelReasonType = "stale status"
// ExceedStoreLimit is the cancel reason when the operator exceeds the store limit.
ExceedStoreLimit CancelReasonType = "exceed store limit"
// ExceedWaitLimit is the cancel reason when the operator exceeds the waiting queue limit.
ExceedWaitLimit CancelReasonType = "exceed wait limit"
// Unknown is the cancel reason when the operator is cancelled by an unknown reason.
Unknown CancelReasonType = "unknown"
)

// Operator contains execution steps generated by scheduler.
Expand Down Expand Up @@ -227,7 +260,10 @@ func (o *Operator) CheckSuccess() bool {
}

// Cancel marks the operator canceled.
func (o *Operator) Cancel() bool {
func (o *Operator) Cancel(reason CancelReasonType) bool {
if _, ok := o.AdditionalInfos[cancelReason]; !ok {
o.AdditionalInfos[cancelReason] = string(reason)
}
return o.status.To(CANCELED)
}

Expand Down
89 changes: 55 additions & 34 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS
if op.ContainNonWitnessStep() {
recordOpStepWithTTL(op.RegionID())
}
if oc.RemoveOperator(op) {
if oc.RemoveOperator(op, Succeed) {
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-success").Inc()
oc.PromoteWaitingOperator()
}
Expand All @@ -134,7 +134,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS
oc.pushFastOperator(op)
}
case TIMEOUT:
if oc.RemoveOperator(op) {
if oc.RemoveOperator(op, Timeout) {
operatorCounter.WithLabelValues(op.Desc(), "promote-timeout").Inc()
oc.PromoteWaitingOperator()
}
Expand All @@ -150,7 +150,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS
failpoint.Inject("unexpectedOperator", func() {
panic(op)
})
_ = op.Cancel()
_ = op.Cancel(NotInRunningState)
oc.buryOperator(op)
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-unexpected").Inc()
oc.PromoteWaitingOperator()
Expand All @@ -162,7 +162,8 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS
func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core.RegionInfo) bool {
err := step.CheckInProgress(oc.cluster, oc.config, region)
if err != nil {
if oc.RemoveOperator(op, zap.String("reason", err.Error())) {
log.Info("operator is stale", zap.Uint64("region-id", op.RegionID()), errs.ZapError(err))
if oc.RemoveOperator(op, StaleStatus) {
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-stale").Inc()
oc.PromoteWaitingOperator()
Expand All @@ -177,11 +178,13 @@ func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core
latest := region.GetRegionEpoch()
changes := latest.GetConfVer() - origin.GetConfVer()
if changes > op.ConfVerChanged(region) {
log.Info("operator is stale",
zap.Uint64("region-id", op.RegionID()),
zap.Uint64("diff", changes),
zap.Reflect("latest-epoch", region.GetRegionEpoch()))
if oc.RemoveOperator(
op,
zap.String("reason", "stale operator, confver does not meet expectations"),
zap.Reflect("latest-epoch", region.GetRegionEpoch()),
zap.Uint64("diff", changes),
EpochNotMatch,
) {
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-stale").Inc()
Expand Down Expand Up @@ -220,7 +223,7 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
r = oc.cluster.GetRegion(regionID)
if r == nil {
_ = oc.removeOperatorLocked(op)
if op.Cancel() {
if op.Cancel(RegionNotFound) {
log.Warn("remove operator because region disappeared",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
Expand Down Expand Up @@ -285,14 +288,14 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int {
}
isMerge = true
}
if !oc.checkAddOperator(false, op) {
_ = op.Cancel()
if pass, reason := oc.checkAddOperator(false, op); !pass {
_ = op.Cancel(reason)
oc.buryOperator(op)
if isMerge {
// Merge operation have two operators, cancel them all
i++
next := ops[i]
_ = next.Cancel()
_ = next.Cancel(reason)
oc.buryOperator(next)
}
continue
Expand Down Expand Up @@ -327,9 +330,16 @@ func (oc *Controller) AddOperator(ops ...*Operator) bool {
// note: checkAddOperator uses false param for `isPromoting`.
// This is used to keep check logic before fixing issue #4946,
// but maybe user want to add operator when waiting queue is busy
if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(false, ops...) {
if oc.exceedStoreLimitLocked(ops...) {
for _, op := range ops {
_ = op.Cancel()
_ = op.Cancel(ExceedStoreLimit)
oc.buryOperator(op)
}
return false
}
if pass, reason := oc.checkAddOperator(false, ops...); !pass {
for _, op := range ops {
_ = op.Cancel(reason)
oc.buryOperator(op)
}
return false
Expand All @@ -354,11 +364,20 @@ func (oc *Controller) PromoteWaitingOperator() {
return
}
operatorWaitCounter.WithLabelValues(ops[0].Desc(), "get").Inc()
if oc.exceedStoreLimitLocked(ops...) {
for _, op := range ops {
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-canceled").Inc()
_ = op.Cancel(ExceedStoreLimit)
oc.buryOperator(op)
}
oc.wopStatus.ops[ops[0].Desc()]--
continue
}

if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(true, ops...) {
if pass, reason := oc.checkAddOperator(true, ops...); !pass {
for _, op := range ops {
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-canceled").Inc()
_ = op.Cancel()
_ = op.Cancel(reason)
oc.buryOperator(op)
}
oc.wopStatus.ops[ops[0].Desc()]--
Expand All @@ -382,14 +401,14 @@ func (oc *Controller) PromoteWaitingOperator() {
// - The region already has a higher priority or same priority
// - Exceed the max number of waiting operators
// - At least one operator is expired.
func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool {
func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool, CancelReasonType) {
for _, op := range ops {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
log.Debug("region not found, cancel add operator",
zap.Uint64("region-id", op.RegionID()))
operatorWaitCounter.WithLabelValues(op.Desc(), "not-found").Inc()
return false
return false, RegionNotFound
}
if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() ||
region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() {
Expand All @@ -398,14 +417,14 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool
zap.Reflect("old", region.GetRegionEpoch()),
zap.Reflect("new", op.RegionEpoch()))
operatorWaitCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc()
return false
return false, EpochNotMatch
}
if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
log.Debug("already have operator, cancel add operator",
zap.Uint64("region-id", op.RegionID()),
zap.Reflect("old", old))
operatorWaitCounter.WithLabelValues(op.Desc(), "already-have").Inc()
return false
return false, AlreadyExist
}
if op.Status() != CREATED {
log.Error("trying to add operator with unexpected status",
Expand All @@ -416,26 +435,26 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool
panic(op)
})
operatorWaitCounter.WithLabelValues(op.Desc(), "unexpected-status").Inc()
return false
return false, NotInCreateStatus
}
if !isPromoting && oc.wopStatus.ops[op.Desc()] >= oc.config.GetSchedulerMaxWaitingOperator() {
log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator()))
operatorWaitCounter.WithLabelValues(op.Desc(), "exceed-max").Inc()
return false
return false, ExceedWaitLimit
}

if op.SchedulerKind() == OpAdmin || op.IsLeaveJointStateOperator() {
continue
}
}
expired := false
var reason CancelReasonType
for _, op := range ops {
if op.CheckExpired() {
expired = true
reason = Expired
operatorWaitCounter.WithLabelValues(op.Desc(), "expired").Inc()
}
}
return !expired
return reason != Expired, reason
}

func isHigherPriorityOperator(new, old *Operator) bool {
Expand Down Expand Up @@ -521,18 +540,24 @@ func (oc *Controller) ack(op *Operator) {
}

// RemoveOperator removes an operator from the running operators.
func (oc *Controller) RemoveOperator(op *Operator, extraFields ...zap.Field) bool {
func (oc *Controller) RemoveOperator(op *Operator, reasons ...CancelReasonType) bool {
oc.Lock()
removed := oc.removeOperatorLocked(op)
oc.Unlock()
var cancelReason CancelReasonType
if len(reasons) > 0 {
cancelReason = reasons[0]
} else {
cancelReason = Unknown
}
if removed {
if op.Cancel() {
if op.Cancel(cancelReason) {
log.Info("operator removed",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op))
}
oc.buryOperator(op, extraFields...)
oc.buryOperator(op)
}
return removed
}
Expand All @@ -555,7 +580,7 @@ func (oc *Controller) removeOperatorLocked(op *Operator) bool {
return false
}

func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) {
func (oc *Controller) buryOperator(op *Operator) {
st := op.Status()

if !IsEndStatus(st) {
Expand All @@ -567,7 +592,7 @@ func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) {
panic(op)
})
operatorCounter.WithLabelValues(op.Desc(), "unexpected").Inc()
_ = op.Cancel()
_ = op.Cancel(Unknown)
}

switch st {
Expand Down Expand Up @@ -603,15 +628,11 @@ func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) {
zap.String("additional-info", op.GetAdditionalInfo()))
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
case CANCELED:
fields := []zap.Field{
log.Info("operator canceled",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()),
}
fields = append(fields, extraFields...)
log.Info("operator canceled",
fields...,
)
operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (suite *operatorControllerTestSuite) TestCheckAddUnexpectedStatus() {
// finished op canceled
op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
suite.True(oc.checkAddOperator(false, op))
suite.True(op.Cancel())
suite.True(op.Cancel(AdminStop))
suite.False(oc.checkAddOperator(false, op))
}
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa
}
failpoint.Inject("scatterHbStreamsDrain", func() {
r.opController.GetHBStreams().Drain(1)
r.opController.RemoveOperator(op)
r.opController.RemoveOperator(op, operator.AdminStop)
})
}
delete(failures, region.GetID())
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
}
justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := notDoneOpInfluence(region, ty)
infl.op.Cancel()
infl.op.Cancel(operator.AdminStop)
return infl
}
shouldRemoveOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
Expand Down Expand Up @@ -957,7 +957,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 4)
cnt++
if cnt == 3 {
re.True(op.Cancel())
re.True(op.Cancel(operator.AdminStop))
}
default:
re.FailNow("wrong op: " + op.String())
Expand Down Expand Up @@ -1367,14 +1367,14 @@ func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim
op2 := ops[0]
operatorutil.CheckTransferPeer(re, op2, operator.OpHotRegion, 1, 4)
// After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6)
re.True(op2.Cancel())
re.True(op2.Cancel(operator.AdminStop))

ops, _ = hb.Schedule(tc, false)
op2 = ops[0]
operatorutil.CheckTransferPeer(re, op2, operator.OpHotRegion, 1, 4)
// After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5)

re.True(op1.Cancel())
re.True(op1.Cancel(operator.AdminStop))
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5)

ops, _ = hb.Schedule(tc, false)
Expand Down
3 changes: 0 additions & 3 deletions pkg/schedule/schedulers/split_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
Expand Down Expand Up @@ -251,8 +250,6 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op
return nil
}
splitBucketNewOperatorCounter.Inc()
op.AdditionalInfos["region-start-key"] = core.HexRegionKeyStr(region.GetStartKey())
op.AdditionalInfos["region-end-key"] = core.HexRegionKeyStr(region.GetEndKey())
op.AdditionalInfos["hot-degree"] = strconv.FormatInt(int64(splitBucket.HotDegree), 10)
return []*operator.Operator{op}
}
Expand Down
Loading

0 comments on commit f70e174

Please sign in to comment.