Skip to content

Commit

Permalink
store/tikv: retry TSO RPC (#24682) (#24733)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored May 31, 2021
1 parent d34be8a commit e0e4d62
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 16 deletions.
14 changes: 5 additions & 9 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,15 +731,11 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
return
}
bo := NewBackofferWithVars(context.Background(), pessimisticLockMaxBackoff, c.txn.vars)
now, err := c.store.GetOracle().GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
now, err := c.store.getTimestampWithRetry(bo, oracle.GlobalTxnScope)
if err != nil {
err1 := bo.Backoff(BoPDRPC, err)
if err1 != nil {
logutil.Logger(bo.ctx).Warn("keepAlive get tso fail",
zap.Error(err))
return
}
continue
logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail",
zap.Error(err))
return
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
Expand Down Expand Up @@ -993,7 +989,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
// from PD as our MinCommitTS.
if commitTSMayBeCalculated && c.needLinearizability() {
failpoint.Inject("getMinCommitTSFromTSO", nil)
minCommitTS, err := c.store.oracle.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
minCommitTS, err := c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetUnionStore().GetOption(kv.TxnScope).(string))
// If we fail to get a timestamp from PD, we just propagate the failure
// instead of falling back to the normal 2PC because a normal 2PC will
// also be likely to fail due to the same timestamp issue.
Expand Down
9 changes: 2 additions & 7 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,6 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// locks have been cleaned before GC.
expiredLocks := locks

callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return false, errors.Trace(err)
}

txnInfos := make(map[uint64]uint64)
startTime := time.Now()
for _, l := range expiredLocks {
Expand All @@ -241,7 +236,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false, l)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}
Expand All @@ -255,7 +250,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
continue
}
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, true, l)
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, true, l)
if err != nil {
return false, err
}
Expand Down

0 comments on commit e0e4d62

Please sign in to comment.