From 5f95ee5c29c08d3e12b9b47ba8a98d4eec003f12 Mon Sep 17 00:00:00 2001 From: pingcap-github-bot Date: Mon, 12 Aug 2019 16:52:50 +0800 Subject: [PATCH] executor: fix auto retry when transaction has select for update statement (#11718) --- executor/executor.go | 12 ++++++++---- executor/point_get.go | 6 +----- executor/point_get_test.go | 15 +++++++++++++++ session/session.go | 2 +- session/tidb.go | 3 ++- 5 files changed, 27 insertions(+), 11 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index c8c60886d9235..675f25a9c93ab 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -735,7 +735,6 @@ func (e *SelectLockExec) Open(ctx context.Context) error { } txnCtx := e.ctx.GetSessionVars().TxnCtx - txnCtx.ForUpdate = true for id := range e.Schema().TblID2Handle { // This operation is only for schema validator check. txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{}) @@ -770,13 +769,18 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } return nil } + return doLockKeys(ctx, e.ctx, e.keys...) +} + +func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) error { + se.GetSessionVars().TxnCtx.ForUpdate = true // Lock keys only once when finished fetching all results. - txn, err := e.ctx.Txn(true) + txn, err := se.Txn(true) if err != nil { return err } - forUpdateTS := e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() - return txn.LockKeys(ctx, forUpdateTS, e.keys...) + forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS() + return txn.LockKeys(ctx, forUpdateTS, keys...) } // LimitExec represents limit executor diff --git a/executor/point_get.go b/executor/point_get.go index 6e5f78d97e226..d480f1ac1f913 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -145,11 +145,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error { if e.lock { - txn, err := e.ctx.Txn(true) - if err != nil { - return err - } - return txn.LockKeys(ctx, e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS(), kv.Key(key)) + return doLockKeys(ctx, e.ctx, key) } return nil } diff --git a/executor/point_get_test.go b/executor/point_get_test.go index e4df3c7a2c1bc..400b51fe85668 100644 --- a/executor/point_get_test.go +++ b/executor/point_get_test.go @@ -402,3 +402,18 @@ func (s *testPointGetSuite) TestIssue10677(c *C) { tk.MustQuery("desc select * from t where pk = '1.0'").Check(testkit.Rows("Point_Get_1 1.00 root table:t, handle:1")) tk.MustQuery("select * from t where pk = '1.0'").Check(testkit.Rows("1")) } + +func (s *testPointGetSuite) TestForUpdateRetry(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.Exec("drop table if exists t") + tk.MustExec("create table t(pk int primary key, c int)") + tk.MustExec("insert into t values (1, 1), (2, 2)") + tk.MustExec("set @@tidb_disable_txn_auto_retry = 0") + tk.MustExec("begin") + tk.MustQuery("select * from t where pk = 1 for update") + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk2.MustExec("update t set c = c + 1 where pk = 1") + tk.MustExec("update t set c = c + 1 where pk = 2") + _, err := tk.Exec("commit") + c.Assert(session.ErrForUpdateCantRetry.Equal(err), IsTrue) +} diff --git a/session/session.go b/session/session.go index fd8e34d92ebd0..a1f2024afa0d8 100644 --- a/session/session.go +++ b/session/session.go @@ -579,7 +579,7 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { connID := s.sessionVars.ConnectionID s.sessionVars.RetryInfo.Retrying = true if s.sessionVars.TxnCtx.ForUpdate { - err = errForUpdateCantRetry.GenWithStackByArgs(connID) + err = ErrForUpdateCantRetry.GenWithStackByArgs(connID) return err } diff --git a/session/tidb.go b/session/tidb.go index bc46721b7886c..1bcfed18ee25f 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -327,8 +327,9 @@ func IsQuery(sql string) bool { return false } +// Session errors. var ( - errForUpdateCantRetry = terror.ClassSession.New(codeForUpdateCantRetry, + ErrForUpdateCantRetry = terror.ClassSession.New(codeForUpdateCantRetry, mysql.MySQLErrName[mysql.ErrForUpdateCantRetry]) )