From c1601ca3f2fbcc37b268f16a84b364050da1ed07 Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Wed, 28 Jun 2023 13:29:35 +0800 Subject: [PATCH 1/2] operator: log the cancel reason (#6676) ref tikv/pd#6605, close tikv/pd#6677 log the cancel reason Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/checker/merge_checker_test.go | 4 +- pkg/schedule/operator/operator.go | 38 +++++++- pkg/schedule/operator/operator_controller.go | 89 ++++++++++++------- .../operator/operator_controller_test.go | 2 +- pkg/schedule/scatter/region_scatterer.go | 2 +- pkg/schedule/schedulers/hot_region_test.go | 8 +- pkg/schedule/schedulers/split_bucket.go | 3 - server/handler.go | 2 +- 8 files changed, 101 insertions(+), 47 deletions(-) diff --git a/pkg/schedule/checker/merge_checker_test.go b/pkg/schedule/checker/merge_checker_test.go index f6f2dd69868..9c38d677619 100644 --- a/pkg/schedule/checker/merge_checker_test.go +++ b/pkg/schedule/checker/merge_checker_test.go @@ -484,7 +484,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() { suite.NotNil(ops) suite.True(oc.AddOperator(ops...)) for _, op := range ops { - oc.RemoveOperator(op) + oc.RemoveOperator(op, operator.ExceedStoreLimit) } } regions[2] = regions[2].Clone( @@ -498,7 +498,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() { suite.NotNil(ops) suite.True(oc.AddOperator(ops...)) for _, op := range ops { - oc.RemoveOperator(op) + oc.RemoveOperator(op, operator.ExceedStoreLimit) } } { diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index 4a9f9e8cbbd..02a2f1a8a5f 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -32,6 +32,39 @@ const ( // OperatorExpireTime is the duration that when an operator is not started // after it, the operator will be considered expired. OperatorExpireTime = 3 * time.Second + cancelReason = "cancel-reason" +) + +// CancelReasonType is the type of cancel reason. +type CancelReasonType string + +var ( + // RegionNotFound is the cancel reason when the region is not found. + RegionNotFound CancelReasonType = "region not found" + // EpochNotMatch is the cancel reason when the region epoch is not match. + EpochNotMatch CancelReasonType = "epoch not match" + // AlreadyExist is the cancel reason when the operator is running. + AlreadyExist CancelReasonType = "already exist" + // AdminStop is the cancel reason when the operator is stopped by adminer. + AdminStop CancelReasonType = "admin stop" + // NotInRunningState is the cancel reason when the operator is not in running state. + NotInRunningState CancelReasonType = "not in running state" + // Succeed is the cancel reason when the operator is finished successfully. + Succeed CancelReasonType = "succeed" + // Timeout is the cancel reason when the operator is timeout. + Timeout CancelReasonType = "timeout" + // Expired is the cancel reason when the operator is expired. + Expired CancelReasonType = "expired" + // NotInCreateStatus is the cancel reason when the operator is not in create status. + NotInCreateStatus CancelReasonType = "not in create status" + // StaleStatus is the cancel reason when the operator is in a stale status. + StaleStatus CancelReasonType = "stale status" + // ExceedStoreLimit is the cancel reason when the operator exceeds the store limit. + ExceedStoreLimit CancelReasonType = "exceed store limit" + // ExceedWaitLimit is the cancel reason when the operator exceeds the waiting queue limit. + ExceedWaitLimit CancelReasonType = "exceed wait limit" + // Unknown is the cancel reason when the operator is cancelled by an unknown reason. + Unknown CancelReasonType = "unknown" ) // Operator contains execution steps generated by scheduler. @@ -227,7 +260,10 @@ func (o *Operator) CheckSuccess() bool { } // Cancel marks the operator canceled. -func (o *Operator) Cancel() bool { +func (o *Operator) Cancel(reason CancelReasonType) bool { + if _, ok := o.AdditionalInfos[cancelReason]; !ok { + o.AdditionalInfos[cancelReason] = string(reason) + } return o.status.To(CANCELED) } diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index cbda045130a..b1e40a35e58 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -125,7 +125,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS if op.ContainNonWitnessStep() { recordOpStepWithTTL(op.RegionID()) } - if oc.RemoveOperator(op) { + if oc.RemoveOperator(op, Succeed) { operatorWaitCounter.WithLabelValues(op.Desc(), "promote-success").Inc() oc.PromoteWaitingOperator() } @@ -134,7 +134,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS oc.pushFastOperator(op) } case TIMEOUT: - if oc.RemoveOperator(op) { + if oc.RemoveOperator(op, Timeout) { operatorCounter.WithLabelValues(op.Desc(), "promote-timeout").Inc() oc.PromoteWaitingOperator() } @@ -150,7 +150,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS failpoint.Inject("unexpectedOperator", func() { panic(op) }) - _ = op.Cancel() + _ = op.Cancel(NotInRunningState) oc.buryOperator(op) operatorWaitCounter.WithLabelValues(op.Desc(), "promote-unexpected").Inc() oc.PromoteWaitingOperator() @@ -162,7 +162,8 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core.RegionInfo) bool { err := step.CheckInProgress(oc.cluster, oc.config, region) if err != nil { - if oc.RemoveOperator(op, zap.String("reason", err.Error())) { + log.Info("operator is stale", zap.Uint64("region-id", op.RegionID()), errs.ZapError(err)) + if oc.RemoveOperator(op, StaleStatus) { operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() operatorWaitCounter.WithLabelValues(op.Desc(), "promote-stale").Inc() oc.PromoteWaitingOperator() @@ -177,11 +178,13 @@ func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core latest := region.GetRegionEpoch() changes := latest.GetConfVer() - origin.GetConfVer() if changes > op.ConfVerChanged(region) { + log.Info("operator is stale", + zap.Uint64("region-id", op.RegionID()), + zap.Uint64("diff", changes), + zap.Reflect("latest-epoch", region.GetRegionEpoch())) if oc.RemoveOperator( op, - zap.String("reason", "stale operator, confver does not meet expectations"), - zap.Reflect("latest-epoch", region.GetRegionEpoch()), - zap.Uint64("diff", changes), + EpochNotMatch, ) { operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() operatorWaitCounter.WithLabelValues(op.Desc(), "promote-stale").Inc() @@ -220,7 +223,7 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { r = oc.cluster.GetRegion(regionID) if r == nil { _ = oc.removeOperatorLocked(op) - if op.Cancel() { + if op.Cancel(RegionNotFound) { log.Warn("remove operator because region disappeared", zap.Uint64("region-id", op.RegionID()), zap.Stringer("operator", op)) @@ -285,14 +288,14 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int { } isMerge = true } - if !oc.checkAddOperator(false, op) { - _ = op.Cancel() + if pass, reason := oc.checkAddOperator(false, op); !pass { + _ = op.Cancel(reason) oc.buryOperator(op) if isMerge { // Merge operation have two operators, cancel them all i++ next := ops[i] - _ = next.Cancel() + _ = next.Cancel(reason) oc.buryOperator(next) } continue @@ -327,9 +330,16 @@ func (oc *Controller) AddOperator(ops ...*Operator) bool { // note: checkAddOperator uses false param for `isPromoting`. // This is used to keep check logic before fixing issue #4946, // but maybe user want to add operator when waiting queue is busy - if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(false, ops...) { + if oc.exceedStoreLimitLocked(ops...) { for _, op := range ops { - _ = op.Cancel() + _ = op.Cancel(ExceedStoreLimit) + oc.buryOperator(op) + } + return false + } + if pass, reason := oc.checkAddOperator(false, ops...); !pass { + for _, op := range ops { + _ = op.Cancel(reason) oc.buryOperator(op) } return false @@ -354,11 +364,20 @@ func (oc *Controller) PromoteWaitingOperator() { return } operatorWaitCounter.WithLabelValues(ops[0].Desc(), "get").Inc() + if oc.exceedStoreLimitLocked(ops...) { + for _, op := range ops { + operatorWaitCounter.WithLabelValues(op.Desc(), "promote-canceled").Inc() + _ = op.Cancel(ExceedStoreLimit) + oc.buryOperator(op) + } + oc.wopStatus.ops[ops[0].Desc()]-- + continue + } - if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(true, ops...) { + if pass, reason := oc.checkAddOperator(true, ops...); !pass { for _, op := range ops { operatorWaitCounter.WithLabelValues(op.Desc(), "promote-canceled").Inc() - _ = op.Cancel() + _ = op.Cancel(reason) oc.buryOperator(op) } oc.wopStatus.ops[ops[0].Desc()]-- @@ -382,14 +401,14 @@ func (oc *Controller) PromoteWaitingOperator() { // - The region already has a higher priority or same priority // - Exceed the max number of waiting operators // - At least one operator is expired. -func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool { +func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool, CancelReasonType) { for _, op := range ops { region := oc.cluster.GetRegion(op.RegionID()) if region == nil { log.Debug("region not found, cancel add operator", zap.Uint64("region-id", op.RegionID())) operatorWaitCounter.WithLabelValues(op.Desc(), "not-found").Inc() - return false + return false, RegionNotFound } if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() { @@ -398,14 +417,14 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool zap.Reflect("old", region.GetRegionEpoch()), zap.Reflect("new", op.RegionEpoch())) operatorWaitCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc() - return false + return false, EpochNotMatch } if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) { log.Debug("already have operator, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", old)) operatorWaitCounter.WithLabelValues(op.Desc(), "already-have").Inc() - return false + return false, AlreadyExist } if op.Status() != CREATED { log.Error("trying to add operator with unexpected status", @@ -416,26 +435,26 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool panic(op) }) operatorWaitCounter.WithLabelValues(op.Desc(), "unexpected-status").Inc() - return false + return false, NotInCreateStatus } if !isPromoting && oc.wopStatus.ops[op.Desc()] >= oc.config.GetSchedulerMaxWaitingOperator() { log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) operatorWaitCounter.WithLabelValues(op.Desc(), "exceed-max").Inc() - return false + return false, ExceedWaitLimit } if op.SchedulerKind() == OpAdmin || op.IsLeaveJointStateOperator() { continue } } - expired := false + var reason CancelReasonType for _, op := range ops { if op.CheckExpired() { - expired = true + reason = Expired operatorWaitCounter.WithLabelValues(op.Desc(), "expired").Inc() } } - return !expired + return reason != Expired, reason } func isHigherPriorityOperator(new, old *Operator) bool { @@ -521,18 +540,24 @@ func (oc *Controller) ack(op *Operator) { } // RemoveOperator removes an operator from the running operators. -func (oc *Controller) RemoveOperator(op *Operator, extraFields ...zap.Field) bool { +func (oc *Controller) RemoveOperator(op *Operator, reasons ...CancelReasonType) bool { oc.Lock() removed := oc.removeOperatorLocked(op) oc.Unlock() + var cancelReason CancelReasonType + if len(reasons) > 0 { + cancelReason = reasons[0] + } else { + cancelReason = Unknown + } if removed { - if op.Cancel() { + if op.Cancel(cancelReason) { log.Info("operator removed", zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op)) } - oc.buryOperator(op, extraFields...) + oc.buryOperator(op) } return removed } @@ -555,7 +580,7 @@ func (oc *Controller) removeOperatorLocked(op *Operator) bool { return false } -func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) { +func (oc *Controller) buryOperator(op *Operator) { st := op.Status() if !IsEndStatus(st) { @@ -567,7 +592,7 @@ func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) { panic(op) }) operatorCounter.WithLabelValues(op.Desc(), "unexpected").Inc() - _ = op.Cancel() + _ = op.Cancel(Unknown) } switch st { @@ -603,15 +628,11 @@ func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) { zap.String("additional-info", op.GetAdditionalInfo())) operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc() case CANCELED: - fields := []zap.Field{ + log.Info("operator canceled", zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op), zap.String("additional-info", op.GetAdditionalInfo()), - } - fields = append(fields, extraFields...) - log.Info("operator canceled", - fields..., ) operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc() } diff --git a/pkg/schedule/operator/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go index eb2d69db944..112e5a11f9c 100644 --- a/pkg/schedule/operator/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -248,7 +248,7 @@ func (suite *operatorControllerTestSuite) TestCheckAddUnexpectedStatus() { // finished op canceled op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) suite.True(oc.checkAddOperator(false, op)) - suite.True(op.Cancel()) + suite.True(op.Cancel(AdminStop)) suite.False(oc.checkAddOperator(false, op)) } { diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 54fc291b363..de90228f7f6 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -264,7 +264,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa } failpoint.Inject("scatterHbStreamsDrain", func() { r.opController.GetHBStreams().Drain(1) - r.opController.RemoveOperator(op) + r.opController.RemoveOperator(op, operator.AdminStop) }) } delete(failures, region.GetID()) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 362758d4ebf..e4cf6b121f8 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -145,7 +145,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { } justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { infl := notDoneOpInfluence(region, ty) - infl.op.Cancel() + infl.op.Cancel(operator.AdminStop) return infl } shouldRemoveOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { @@ -957,7 +957,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 4) cnt++ if cnt == 3 { - re.True(op.Cancel()) + re.True(op.Cancel(operator.AdminStop)) } default: re.FailNow("wrong op: " + op.String()) @@ -1367,14 +1367,14 @@ func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim op2 := ops[0] operatorutil.CheckTransferPeer(re, op2, operator.OpHotRegion, 1, 4) // After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6) - re.True(op2.Cancel()) + re.True(op2.Cancel(operator.AdminStop)) ops, _ = hb.Schedule(tc, false) op2 = ops[0] operatorutil.CheckTransferPeer(re, op2, operator.OpHotRegion, 1, 4) // After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5) - re.True(op1.Cancel()) + re.True(op1.Cancel(operator.AdminStop)) // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5) ops, _ = hb.Schedule(tc, false) diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index fef20974ef0..7452d2ceafa 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -23,7 +23,6 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/tikv/pd/pkg/core" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -251,8 +250,6 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op return nil } splitBucketNewOperatorCounter.Inc() - op.AdditionalInfos["region-start-key"] = core.HexRegionKeyStr(region.GetStartKey()) - op.AdditionalInfos["region-end-key"] = core.HexRegionKeyStr(region.GetEndKey()) op.AdditionalInfos["hot-degree"] = strconv.FormatInt(int64(splitBucket.HotDegree), 10) return []*operator.Operator{op} } diff --git a/server/handler.go b/server/handler.go index 1fc543827b2..635901fb04a 100644 --- a/server/handler.go +++ b/server/handler.go @@ -419,7 +419,7 @@ func (h *Handler) RemoveOperator(regionID uint64) error { return ErrOperatorNotFound } - _ = c.RemoveOperator(op) + _ = c.RemoveOperator(op, operator.AdminStop) return nil } From 01015a616aaf416c3e80693f55a27b6e8a142887 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 28 Jun 2023 13:47:34 +0800 Subject: [PATCH 2/2] tools: add keepalive for pd-tso-bench (#6699) close tikv/pd#6681 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/grpcutil/grpcutil.go | 1 + tools/pd-tso-bench/main.go | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/client/grpcutil/grpcutil.go b/client/grpcutil/grpcutil.go index 59b7224f29e..125f1125721 100644 --- a/client/grpcutil/grpcutil.go +++ b/client/grpcutil/grpcutil.go @@ -78,6 +78,7 @@ func BuildForwardContext(ctx context.Context, addr string) context.Context { func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tlsutil.TLSConfig, opt ...grpc.DialOption) (*grpc.ClientConn, error) { conn, ok := clientConns.Load(addr) if ok { + // TODO: check the connection state. return conn.(*grpc.ClientConn), nil } tlsConfig, err := tlsCfg.ToTLSConfig() diff --git a/tools/pd-tso-bench/main.go b/tools/pd-tso-bench/main.go index a0041b9ef3e..e82a9dcabba 100644 --- a/tools/pd-tso-bench/main.go +++ b/tools/pd-tso-bench/main.go @@ -33,6 +33,13 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" pd "github.com/tikv/pd/client" "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" +) + +const ( + keepaliveTime = 10 * time.Second + keepaliveTimeout = 3 * time.Second ) var ( @@ -95,11 +102,18 @@ func bench(mainCtx context.Context) { err error ) + opt := pd.WithGRPCDialOptions( + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: keepaliveTime, + Timeout: keepaliveTimeout, + }), + ) + pdCli, err = pd.NewClientWithContext(mainCtx, []string{*pdAddrs}, pd.SecurityOption{ CAPath: *caPath, CertPath: *certPath, KeyPath: *keyPath, - }) + }, opt) pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval) pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy)