diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 5506c9d497ac2..ef3bcee5719ed 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -176,7 +176,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V if req.KeepOrder || req.Desc { return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")} } - ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs) + ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) ranges := toTiKVKeyRanges(req.KeyRanges) tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType) @@ -386,7 +386,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b return nil } - if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil { + if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil { return errors.Trace(err) } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index cb799edb16f70..f32146bea82ee 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -73,7 +73,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa logutil.BgLogger().Debug("send batch requests") return c.sendBatch(ctx, req, vars) } - ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs) + ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) ranges := toTiKVKeyRanges(req.KeyRanges) tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req) @@ -830,11 +830,14 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti return nil, nil } - boRPCType := tikv.BoTiKVRPC + err1 := errors.Errorf("recv stream response error: %v, task: %s", err, task) if task.storeType == kv.TiFlash { - boRPCType = tikv.BoTiFlashRPC + err1 = bo.Backoff(tikv.BoTiFlashRPC, err1) + } else { + err1 = bo.b.BackoffTiKVRPC(err1) } - if err1 := bo.Backoff(boRPCType, errors.Errorf("recv stream response error: %v, task: %s", err, task)); err1 != nil { + + if err1 != nil { return nil, errors.Trace(err) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 3ea07e744f9b9..812638411fd3d 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -56,7 +56,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta { // ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns. func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) { - ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTS) + ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS) bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil) if req.KeyRanges == nil { return c.selectAllTiFlashStore(), nil @@ -344,7 +344,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques return } - if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v", err)); err1 != nil { + if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil { if errors.Cause(err) == context.Canceled { logutil.BgLogger().Info("stream recv timeout", zap.Error(err)) } else { diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index 3bfd616929aec..bc09651e0d379 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/store/tikv/mockstore/mocktikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/oracle/oracles" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" pd "github.com/tikv/pd/client" ) @@ -412,7 +413,7 @@ func (s *testGCWorkerSuite) TestStatusVars(c *C) { func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { ctx := context.Background() - bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil) + bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil) loc, err := s.tikvStore.GetRegionCache().LocateKey(bo, []byte("")) c.Assert(err, IsNil) var regionErr *errorpb.Error @@ -943,7 +944,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionM mCluster.Merge(s.initRegion.regionID, region2) regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID) err := s.tikvStore.GetRegionCache().OnRegionEpochNotMatch( - tikv.NewNoopBackoff(context.Background()), + retry.NewNoopBackoff(context.Background()), &tikv.RPCContext{Region: regionID, Store: &tikv.Store{}}, []*metapb.Region{regionMeta}) c.Assert(err, IsNil) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index c471ccc16167a..c622e21d2ee5d 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -29,7 +29,6 @@ type BackoffType = retry.BackoffType // Back off types. const ( BoRegionMiss = retry.BoRegionMiss - BoTiKVRPC = retry.BoTiKVRPC BoTiFlashRPC = retry.BoTiFlashRPC BoTxnLockFast = retry.BoTxnLockFast BoTxnLock = retry.BoTxnLock diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 70f1cf27ccacc..e5ec039fc6911 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -673,7 +673,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b break } - err2 := b.Backoff(retry.BoTiKVRPC, err1) + err2 := b.BackoffTiKVRPC(err1) // As timeout is set to math.MaxUint32, err2 should always be nil. // This line is added to make the 'make errcheck' pass. terror.Log(err2) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 9aa55baa64cfb..ec5b92f81c5ad 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -580,7 +580,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash { err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx)) } else { - err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) + err = bo.BackoffTiKVRPC(errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) } return errors.Trace(err) } diff --git a/store/tikv/retry/backoff.go b/store/tikv/retry/backoff.go index b2e0137902bce..a1a7c0619537f 100644 --- a/store/tikv/retry/backoff.go +++ b/store/tikv/retry/backoff.go @@ -49,7 +49,7 @@ const ( func (t BackoffType) metric() prometheus.Observer { switch t { // TODO: distinguish tikv and tiflash in metrics - case BoTiKVRPC, BoTiFlashRPC: + case boTiKVRPC, BoTiFlashRPC: return metrics.BackoffHistogramRPC case BoTxnLock: return metrics.BackoffHistogramLock @@ -121,7 +121,7 @@ type BackoffType int // Back off types. const ( - BoTiKVRPC BackoffType = iota + boTiKVRPC BackoffType = iota BoTiFlashRPC BoTxnLock BoTxnLockFast @@ -139,7 +139,7 @@ func (t BackoffType) createFn(vars *kv.Variables) func(context.Context, int) int vars.Hook(t.String(), vars) } switch t { - case BoTiKVRPC, BoTiFlashRPC: + case boTiKVRPC, BoTiFlashRPC: return NewBackoffFn(100, 2000, EqualJitter) case BoTxnLock: return NewBackoffFn(200, 3000, EqualJitter) @@ -164,7 +164,7 @@ func (t BackoffType) createFn(vars *kv.Variables) func(context.Context, int) int func (t BackoffType) String() string { switch t { - case BoTiKVRPC: + case boTiKVRPC: return "tikvRPC" case BoTiFlashRPC: return "tiflashRPC" @@ -193,7 +193,7 @@ func (t BackoffType) String() string { // TError returns pingcap/error of the backoff type. func (t BackoffType) TError() error { switch t { - case BoTiKVRPC: + case boTiKVRPC: return tikverr.ErrTiKVServerTimeout case BoTiFlashRPC: return tikverr.ErrTiFlashServerTimeout @@ -279,6 +279,11 @@ func (b *Backoffer) Backoff(typ BackoffType, err error) error { return b.BackoffWithMaxSleep(typ, -1, err) } +// BackoffTiKVRPC calls Backoff with boTiKVRPC. +func (b *Backoffer) BackoffTiKVRPC(err error) error { + return b.Backoff(boTiKVRPC, err) +} + // BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message // and never sleep more than maxSleepMs for each sleep. func (b *Backoffer) BackoffWithMaxSleep(typ BackoffType, maxSleepMs int, err error) error {