diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 796d85695d..9dc5aad4a4 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -112,6 +112,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest)) } lockWaitStartTime := action.WaitStartTime + var resolvingRecordToken *int for { // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit if action.LockWaitTime() > 0 && action.LockWaitTime() != kv.LockAlwaysWait { @@ -226,6 +227,13 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * // Because we already waited on tikv, no need to Backoff here. // tikv default will wait 3s(also the maximum wait value) when lock error occurs startTime = time.Now() + if resolvingRecordToken == nil { + token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS) + resolvingRecordToken = &token + defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken) + } else { + c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken) + } msBeforeTxnExpired, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks) if err != nil { return err diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 4760dfcb1f..6caeab9a7c 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -225,6 +225,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B req := c.buildPrewriteRequest(batch, txnSize) sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient()) + var resolvingRecordToken *int defer func() { if err != nil { // If we fail to receive response for async commit prewrite, it will be undetermined whether this @@ -379,6 +380,13 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B locks = append(locks, lock) } start := time.Now() + if resolvingRecordToken == nil { + token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS) + resolvingRecordToken = &token + defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken) + } else { + c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken) + } msBeforeExpired, err := c.store.GetLockResolver().ResolveLocks(bo, c.startTS, locks) if err != nil { return err diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 14c14629a6..ba1beb9812 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -62,6 +62,13 @@ type LockResolver struct { resolveLockLiteThreshold uint64 mu struct { sync.RWMutex + // These two fields is used to tracking lock resolving information + // currentStartTS -> caller token -> resolving locks + resolving map[uint64][][]Lock + // currentStartTS -> concurrency resolving lock process in progress + // use concurrency counting here to speed up checking + // whether we can free the resource used in `resolving` + resolvingConcurrency map[uint64]int // resolved caches resolved txns (FIFO, txn id -> txnStatus). resolved map[uint64]TxnStatus recentResolved *list.List @@ -76,6 +83,14 @@ type LockResolver struct { asyncResolveCancel func() } +// ResolvingLock stands for current resolving locks' information +type ResolvingLock struct { + TxnID uint64 + LockTxnID uint64 + Key []byte + Primary []byte +} + // NewLockResolver creates a new LockResolver instance. func NewLockResolver(store storage) *LockResolver { r := &LockResolver{ @@ -83,6 +98,8 @@ func NewLockResolver(store storage) *LockResolver { resolveLockLiteThreshold: config.GetGlobalConfig().TiKVClient.ResolveLockLiteThreshold, } r.mu.resolved = make(map[uint64]TxnStatus) + r.mu.resolving = make(map[uint64][][]Lock) + r.mu.resolvingConcurrency = make(map[uint64]int) r.mu.recentResolved = list.New() r.asyncResolveCtx, r.asyncResolveCancel = context.WithCancel(context.Background()) return r @@ -322,6 +339,45 @@ func (lr *LockResolver) ResolveLocksForRead(bo *retry.Backoffer, callerStartTS u return lr.resolveLocks(bo, callerStartTS, locks, true, lite) } +// RecordResolvingLocks records a txn which startTS is callerStartTS tries to resolve locks +// Call this when start trying to resolve locks +// Return a token which is used to call ResolvingLocksDone +func (lr *LockResolver) RecordResolvingLocks(locks []*Lock, callerStartTS uint64) int { + resolving := make([]Lock, 0, len(locks)) + for _, lock := range locks { + resolving = append(resolving, *lock) + } + lr.mu.Lock() + lr.mu.resolvingConcurrency[callerStartTS]++ + token := len(lr.mu.resolving[callerStartTS]) + lr.mu.resolving[callerStartTS] = append(lr.mu.resolving[callerStartTS], resolving) + lr.mu.Unlock() + return token +} + +// UpdateResolvingLocks update the lock resoling information of the txn `callerStartTS` +func (lr *LockResolver) UpdateResolvingLocks(locks []*Lock, callerStartTS uint64, token int) { + resolving := make([]Lock, 0, len(locks)) + for _, lock := range locks { + resolving = append(resolving, *lock) + } + lr.mu.Lock() + lr.mu.resolving[callerStartTS][token] = resolving + lr.mu.Unlock() +} + +// ResolveLocksDone will remove resolving locks information related with callerStartTS +func (lr *LockResolver) ResolveLocksDone(callerStartTS uint64, token int) { + lr.mu.Lock() + lr.mu.resolving[callerStartTS] = nil + lr.mu.resolvingConcurrency[callerStartTS]-- + if lr.mu.resolvingConcurrency[callerStartTS] == 0 { + delete(lr.mu.resolving, callerStartTS) + delete(lr.mu.resolvingConcurrency, callerStartTS) + } + lr.mu.Unlock() +} + func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, forRead bool, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) { if lr.testingKnobs.meetLock != nil { lr.testingKnobs.meetLock(locks) @@ -425,6 +481,26 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, return msBeforeTxnExpired.value(), canIgnore, canAccess, nil } +// Resolving returns the locks' information we are resolving currently. +func (lr *LockResolver) Resolving() []ResolvingLock { + result := []ResolvingLock{} + lr.mu.RLock() + defer lr.mu.RUnlock() + for txnID, items := range lr.mu.resolving { + for _, item := range items { + for _, lock := range item { + result = append(result, ResolvingLock{ + TxnID: txnID, + LockTxnID: lock.TxnID, + Key: lock.Key, + Primary: lock.Primary, + }) + } + } + } + return result +} + type txnExpireTime struct { initialized bool txnExpire int64 diff --git a/txnkv/txnlock/test_probe.go b/txnkv/txnlock/test_probe.go index 127c32684c..3b713b2ecf 100644 --- a/txnkv/txnlock/test_probe.go +++ b/txnkv/txnlock/test_probe.go @@ -100,3 +100,10 @@ func (l LockResolverProbe) IsNonAsyncCommitLock(err error) bool { _, ok := errors.Cause(err).(*nonAsyncCommitLock) return ok } + +// SetResolving set the resolving lock status for LockResolver +func (l LockResolverProbe) SetResolving(currentStartTS uint64, locks []Lock) { + l.mu.Lock() + defer l.mu.Unlock() + l.mu.resolving[currentStartTS] = append(l.mu.resolving[currentStartTS], locks) +} diff --git a/txnkv/txnsnapshot/client_helper.go b/txnkv/txnsnapshot/client_helper.go index a755dc7983..ea699b559f 100644 --- a/txnkv/txnsnapshot/client_helper.go +++ b/txnkv/txnsnapshot/client_helper.go @@ -97,6 +97,21 @@ func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, return msBeforeTxnExpired, nil } +// UpdateResolvingLocks wraps the UpdateResolvingLocks function +func (ch *ClientHelper) UpdateResolvingLocks(locks []*txnlock.Lock, callerStartTS uint64, token int) { + ch.lockResolver.UpdateResolvingLocks(locks, callerStartTS, token) +} + +// RecordResolvingLocks wraps the RecordResolvingLocks function +func (ch *ClientHelper) RecordResolvingLocks(locks []*txnlock.Lock, callerStartTS uint64) int { + return ch.lockResolver.RecordResolvingLocks(locks, callerStartTS) +} + +// ResolveLocksDone wraps the ResolveLocksDone function +func (ch *ClientHelper) ResolveLocksDone(callerStartTS uint64, token int) { + ch.lockResolver.ResolveLocksDone(callerStartTS, token) +} + // SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context. func (ch *ClientHelper) SendReqCtx(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration, et tikvrpc.EndpointType, directStoreAddr string, opts ...locate.StoreSelectorOption) (*tikvrpc.Response, *locate.RPCContext, string, error) { sender := locate.NewRegionRequestSender(ch.regionCache, ch.client) diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index 9035b1d178..84a2dbd0e7 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -200,6 +200,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { sender := locate.NewRegionRequestSender(s.snapshot.store.GetRegionCache(), s.snapshot.store.GetTiKVClient()) var reqEndKey, reqStartKey []byte var loc *locate.KeyLocation + var resolvingRecordToken *int var err error for { if !s.reverse { @@ -293,7 +294,15 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { if err != nil { return err } - msBeforeExpired, err := txnlock.NewLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version, []*txnlock.Lock{lock}) + locks := []*txnlock.Lock{lock} + if resolvingRecordToken == nil { + token := s.snapshot.store.GetLockResolver().RecordResolvingLocks(locks, s.snapshot.version) + resolvingRecordToken = &token + defer s.snapshot.store.GetLockResolver().ResolveLocksDone(s.snapshot.version, *resolvingRecordToken) + } else { + s.snapshot.store.GetLockResolver().UpdateResolvingLocks(locks, s.snapshot.version, *resolvingRecordToken) + } + msBeforeExpired, err := s.snapshot.store.GetLockResolver().ResolveLocks(bo, s.snapshot.version, locks) if err != nil { return err } diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index ea7023cc6f..5b784390a3 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -356,6 +356,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, s.mu.RUnlock() pending := batch.keys + var resolvingRecordToken *int for { s.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{ @@ -450,6 +451,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, s.mergeExecDetail(batchGetResp.ExecDetailsV2) } if len(lockedKeys) > 0 { + if resolvingRecordToken == nil { + token := cli.RecordResolvingLocks(locks, s.version) + resolvingRecordToken = &token + defer cli.ResolveLocksDone(s.version, *resolvingRecordToken) + } else { + cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken) + } msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks) if err != nil { return err @@ -565,6 +573,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] } var firstLock *txnlock.Lock + var resolvingRecordToken *int for { util.EvalFailpoint("beforeSendPointGet") loc, err := s.store.GetRegionCache().LocateKey(bo, k) @@ -617,8 +626,15 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] cli.resolvedLocks.Put(lock.TxnID) continue } - - msBeforeExpired, err := cli.ResolveLocks(bo, s.version, []*txnlock.Lock{lock}) + locks := []*txnlock.Lock{lock} + if resolvingRecordToken == nil { + token := cli.RecordResolvingLocks(locks, s.version) + resolvingRecordToken = &token + defer cli.ResolveLocksDone(s.version, *resolvingRecordToken) + } else { + cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken) + } + msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks) if err != nil { return nil, err }