Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: record resolving locks #473

Merged
merged 10 commits into from
May 17, 2022
1 change: 1 addition & 0 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
}
start := time.Now()
msBeforeExpired, err := c.store.GetLockResolver().ResolveLocks(bo, c.startTS, locks)
defer c.store.GetLockResolver().ResolveLocksDone(c.startTS)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the defer written in prewrite instead of in ResolveLocks? ResolveLocksDone looks missing if resolve lock is triggered by other commands.

Copy link
Member Author

@longfangsong longfangsong May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the defer written in prewrite instead of in ResolveLocks?

It's because we need to keep the resolving infomation during backoffing. From the user's POV, we are doing the lock resolving work during backoffing.

ResolveLocksDone looks missing if resolve lock is triggered by other commands.

I've added them now.

if err != nil {
return err
}
Expand Down
48 changes: 48 additions & 0 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type LockResolver struct {
resolveLockLiteThreshold uint64
mu struct {
sync.RWMutex
// currentStartTS resolving locks
resolving map[uint64][]Lock
// resolved caches resolved txns (FIFO, txn id -> txnStatus).
resolved map[uint64]TxnStatus
recentResolved *list.List
Expand All @@ -76,13 +78,22 @@ type LockResolver struct {
asyncResolveCancel func()
}

// ResolvingLock stands for current resolving locks' information
type ResolvingLock struct {
TxnID uint64
LockTxnID uint64
Key []byte
Primary []byte
}

// NewLockResolver creates a new LockResolver instance.
func NewLockResolver(store storage) *LockResolver {
r := &LockResolver{
store: store,
resolveLockLiteThreshold: config.GetGlobalConfig().TiKVClient.ResolveLockLiteThreshold,
}
r.mu.resolved = make(map[uint64]TxnStatus)
r.mu.resolving = make(map[uint64][]Lock)
r.mu.recentResolved = list.New()
r.asyncResolveCtx, r.asyncResolveCancel = context.WithCancel(context.Background())
return r
Expand Down Expand Up @@ -311,6 +322,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
lr.saveResolvingLocks(locks, callerStartTS)
ttl, _, _, err := lr.resolveLocks(bo, callerStartTS, locks, false, false)
return ttl, err
}
Expand All @@ -319,9 +331,20 @@ func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64,
// Read operations needn't wait for resolve secondary locks and can read through(the lock's transaction is committed
// and its commitTS is less than or equal to callerStartTS) or ignore(the lock's transaction is rolled back or its minCommitTS is pushed) the lock .
func (lr *LockResolver) ResolveLocksForRead(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
lr.saveResolvingLocks(locks, callerStartTS)
return lr.resolveLocks(bo, callerStartTS, locks, true, lite)
}

func (lr *LockResolver) saveResolvingLocks(locks []*Lock, callerStartTS uint64) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if ResolveLocks happens concurrently for multiple regions?

For example, a batch get involves multiple regions and it encounters locks in more than one regions. But the resolving array always only includes the information of one region.

resolving := []Lock{}
for _, lock := range locks {
resolving = append(resolving, *lock)
}
lr.mu.Lock()
lr.mu.resolving[callerStartTS] = resolving
lr.mu.Unlock()
}

func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, forRead bool, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
if lr.testingKnobs.meetLock != nil {
lr.testingKnobs.meetLock(locks)
Expand Down Expand Up @@ -425,6 +448,31 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64,
return msBeforeTxnExpired.value(), canIgnore, canAccess, nil
}

// ResolveLocksDone will remove resolving locks information related with callerStartTS
func (lr *LockResolver) ResolveLocksDone(callerStartTS uint64) {
lr.mu.Lock()
delete(lr.mu.resolving, callerStartTS)
lr.mu.Unlock()
}

// Resolving returns the locks' information we are resolving currently.
func (lr *LockResolver) Resolving() []ResolvingLock {
result := []ResolvingLock{}
lr.mu.RLock()
defer lr.mu.RUnlock()
for txnID, item := range lr.mu.resolving {
for _, lock := range item {
result = append(result, ResolvingLock{
TxnID: txnID,
LockTxnID: lock.TxnID,
Key: lock.Key,
Primary: lock.Primary,
})
}
}
return result
}

type txnExpireTime struct {
initialized bool
txnExpire int64
Expand Down
7 changes: 7 additions & 0 deletions txnkv/txnlock/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,10 @@ func (l LockResolverProbe) IsNonAsyncCommitLock(err error) bool {
_, ok := errors.Cause(err).(*nonAsyncCommitLock)
return ok
}

// SetResolving set the resolving lock status for LockResolver
func (l LockResolverProbe) SetResolving(currentStartTS uint64, locks []Lock) {
l.mu.Lock()
defer l.mu.Unlock()
l.mu.resolving[currentStartTS] = append(l.mu.resolving[currentStartTS], locks...)
}