Skip to content

Commit

Permalink
executor: fix auto retry when transaction has select for update state…
Browse files Browse the repository at this point in the history
…ment (#11718)
  • Loading branch information
sre-bot authored and ngaut committed Aug 12, 2019
1 parent bfead96 commit 5f95ee5
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 11 deletions.
12 changes: 8 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,6 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
}

txnCtx := e.ctx.GetSessionVars().TxnCtx
txnCtx.ForUpdate = true
for id := range e.Schema().TblID2Handle {
// This operation is only for schema validator check.
txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{})
Expand Down Expand Up @@ -770,13 +769,18 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
return doLockKeys(ctx, e.ctx, e.keys...)
}

func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) error {
se.GetSessionVars().TxnCtx.ForUpdate = true
// Lock keys only once when finished fetching all results.
txn, err := e.ctx.Txn(true)
txn, err := se.Txn(true)
if err != nil {
return err
}
forUpdateTS := e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, forUpdateTS, e.keys...)
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, forUpdateTS, keys...)
}

// LimitExec represents limit executor
Expand Down
6 changes: 1 addition & 5 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if e.lock {
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
return txn.LockKeys(ctx, e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS(), kv.Key(key))
return doLockKeys(ctx, e.ctx, key)
}
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,18 @@ func (s *testPointGetSuite) TestIssue10677(c *C) {
tk.MustQuery("desc select * from t where pk = '1.0'").Check(testkit.Rows("Point_Get_1 1.00 root table:t, handle:1"))
tk.MustQuery("select * from t where pk = '1.0'").Check(testkit.Rows("1"))
}

func (s *testPointGetSuite) TestForUpdateRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.Exec("drop table if exists t")
tk.MustExec("create table t(pk int primary key, c int)")
tk.MustExec("insert into t values (1, 1), (2, 2)")
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustQuery("select * from t where pk = 1 for update")
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustExec("update t set c = c + 1 where pk = 1")
tk.MustExec("update t set c = c + 1 where pk = 2")
_, err := tk.Exec("commit")
c.Assert(session.ErrForUpdateCantRetry.Equal(err), IsTrue)
}
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {
connID := s.sessionVars.ConnectionID
s.sessionVars.RetryInfo.Retrying = true
if s.sessionVars.TxnCtx.ForUpdate {
err = errForUpdateCantRetry.GenWithStackByArgs(connID)
err = ErrForUpdateCantRetry.GenWithStackByArgs(connID)
return err
}

Expand Down
3 changes: 2 additions & 1 deletion session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,9 @@ func IsQuery(sql string) bool {
return false
}

// Session errors.
var (
errForUpdateCantRetry = terror.ClassSession.New(codeForUpdateCantRetry,
ErrForUpdateCantRetry = terror.ClassSession.New(codeForUpdateCantRetry,
mysql.MySQLErrName[mysql.ErrForUpdateCantRetry])
)

Expand Down

0 comments on commit 5f95ee5

Please sign in to comment.