Skip to content

Commit

Permalink
Merge branch 'master' into cop-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
SpadeA-Tang authored Jul 4, 2023
2 parents b341d39 + c0cf773 commit 4dc05c3
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 23 deletions.
61 changes: 45 additions & 16 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,52 +1472,81 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {

s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`))
s.NoError(failpoint.Enable("tikvclient/onRollback", `return("skipRollbackPessimisticLock")`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
s.NoError(failpoint.Disable("tikvclient/beforeCommitSecondaries"))
s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
s.NoError(failpoint.Disable("tikvclient/onRollback"))
}()

k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
k1, k2, k3, k4 := []byte("k1"), []byte("k2"), []byte("k3"), []byte("k4")
v2, v3 := []byte("v2"), []byte("v3")

ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))

txn, err := s.store.Begin()
txn1, err := s.store.Begin()
s.NoError(err)
txn.SetPessimistic(true)
txn1.SetPessimistic(true)

{
// Produce write conflict on key k2
txn2, err := s.store.Begin()
helperTxn, err := s.store.Begin()
s.NoError(err)
s.NoError(txn2.Set(k2, []byte("v0")))
s.NoError(txn2.Commit(ctx))
s.NoError(helperTxn.Set(k2, []byte("v0")))
s.NoError(helperTxn.Commit(ctx))
}

lockCtx := kv.NewLockCtx(txn.StartTS(), 200, time.Now())
err = txn.LockKeys(ctx, lockCtx, k1, k2)
lockCtx := kv.NewLockCtx(txn1.StartTS(), 200, time.Now())
err = txn1.LockKeys(ctx, lockCtx, k1, k2)
s.IsType(&tikverr.ErrWriteConflict{}, errors.Cause(err))

// k1 has txn's stale pessimistic lock now.
// k1 has txn1's stale pessimistic lock now.

forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now())
s.NoError(txn.LockKeys(ctx, lockCtx, k2, k3))
s.NoError(txn1.LockKeys(ctx, lockCtx, k2, k3))

s.NoError(txn.Set(k2, v2))
s.NoError(txn.Set(k3, v3))
s.NoError(txn.Commit(ctx))
s.NoError(txn1.Set(k2, v2))
s.NoError(txn1.Set(k3, v3))
s.NoError(txn1.Commit(ctx))

// k3 has txn's stale prewrite lock now.
// k3 has txn1's stale prewrite lock now.

// Perform ScanLock - BatchResolveLock.
txn2, err := s.store.Begin()
txn2.SetPessimistic(true)
s.NoError(err)
lockCtx = kv.NewLockCtx(txn1.StartTS(), 200, time.Now())
err = txn2.LockKeys(ctx, lockCtx, k4)
s.NoError(err)
s.NoError(txn2.Rollback())

// k4 has txn2's stale primary pessimistic lock now.
currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)

remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
s.NoError(err)

s.Len(remainingLocks, 3)
s.Equal(remainingLocks[0].Key, k1)
s.Equal(remainingLocks[0].LockType, kvrpcpb.Op_PessimisticLock)
s.Equal(remainingLocks[1].Key, k3)
s.Equal(remainingLocks[1].LockType, kvrpcpb.Op_Put)
s.Equal(remainingLocks[2].Key, k4)
s.Equal(remainingLocks[2].LockType, kvrpcpb.Op_PessimisticLock)
s.Equal(remainingLocks[2].Primary, k4)

// Perform ScanLock - BatchResolveLock.
s.NoError(err)
s.NoError(s.store.GCResolveLockPhase(ctx, currentTS, 1))

// Do ScanLock again to make sure no locks are left.
remainingLocks, err = s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
s.NoError(err)
s.Empty(remainingLocks)

// Check data consistency
readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
snapshot := s.store.GetSnapshot(readTS)
Expand Down
37 changes: 37 additions & 0 deletions tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
package tikv

import (
"bytes"
"context"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -114,6 +115,42 @@ func (s StoreProbe) GCResolveLockPhase(ctx context.Context, safepoint uint64, co
return s.resolveLocks(ctx, safepoint, concurrency)
}

func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxVersion uint64) ([]*txnlock.Lock, error) {
bo := NewGcResolveLockMaxBackoffer(ctx)
const limit = 1024

var result []*txnlock.Lock

outerLoop:
for {
locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit)
if err != nil {
return nil, err
}
for _, l := range locks {
if bytes.Compare(endKey, l.Key) <= 0 {
// Finished scanning the given range.
break outerLoop
}
result = append(result, l)
}

if len(locks) < limit {
if len(loc.EndKey) == 0 {
// Scanned to the very end.
break outerLoop
}
// The current region is completely scanned.
startKey = loc.EndKey
} else {
// The current region may still have more locks.
startKey = append(locks[len(locks)-1].Key, 0)
}
}

return result, nil
}

// LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose.
type LockResolverProbe struct {
*txnlock.LockResolverProbe
Expand Down
19 changes: 18 additions & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,10 +556,27 @@ func (txn *KVTxn) Rollback() error {
txn.CancelAggressiveLocking(context.Background())
}

// `skipPessimisticRollback` may be true only when set by failpoint in tests.
skipPessimisticRollback := false
if val, err := util.EvalFailpoint("onRollback"); err == nil {
if s, ok := val.(string); ok {
if s == "skipRollbackPessimisticLock" {
logutil.BgLogger().Info("[failpoint] injected skip pessimistic rollback on explicit rollback",
zap.Uint64("txnStartTS", txn.startTS))
skipPessimisticRollback = true
} else {
panic(fmt.Sprintf("unknown instruction %s for failpoint \"onRollback\"", s))
}
}
}

start := time.Now()
// Clean up pessimistic lock.
if txn.IsPessimistic() && txn.committer != nil {
err := txn.rollbackPessimisticLocks()
var err error
if !skipPessimisticRollback {
err = txn.rollbackPessimisticLocks()
}
txn.committer.ttlManager.close()
if err != nil {
logutil.BgLogger().Error(err.Error())
Expand Down
17 changes: 11 additions & 6 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,29 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
}
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}

if l.LockType == kvrpcpb.Op_PessimisticLock {
// BatchResolveLocks forces resolving the locks ignoring whether whey are expired.
// For pessimistic locks, committing them makes no sense, but it won't affect transaction
// correctness if we always roll back them.
// Pessimistic locks needs special handling logic because their primary may not point
// to the real primary of that transaction, and their state cannot be put in `txnInfos`.
// (see: https://github.com/pingcap/tidb/issues/42937).
//
// `resolvePessimisticLock` should be called after calling `getTxnStatus`.
// See: https://github.com/pingcap/tidb/issues/45134
err := lr.resolvePessimisticLock(bo, l)
if err != nil {
return false, err
}
continue
}

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}

// If the transaction uses async commit, CheckTxnStatus will reject rolling back the primary lock.
// Then we need to check the secondary locks to determine the final status of the transaction.
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit {
Expand Down Expand Up @@ -1173,6 +1176,8 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
}
}

// resolvePessimisticLock handles pessimistic locks after checking txn status.
// Note that this function assumes `CheckTxnStatus` is done (or `getTxnStatusFromLock` has been called) on the lock.
func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error {
metrics.LockResolverCountWithResolveLocks.Inc()
// The lock has been resolved by getTxnStatusFromLock.
Expand Down

0 comments on commit 4dc05c3

Please sign in to comment.