Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Isolation: Refactor resetting RCReadCheckTS internal flag #38174

Merged
merged 7 commits into from
Oct 17, 2022
60 changes: 60 additions & 0 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/parser/ast"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -2632,6 +2633,65 @@ func TestRcReadCheckTSConflict(t *testing.T) {
tk.MustExec("drop table t")
}

func TestRcReadCheckTSConflictExtra(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/sessiontxn/isolation/CallOnStmtRetry", "return"))
defer func() {
defer require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/sessiontxn/isolation/CallOnStmtRetry"))
}()
store := testkit.CreateMockStore(t)

ctx := context.Background()
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_rc_read_check_ts = ON")

se := tk.Session()
cc.setCtx(&TiDBContext{Session: se, stmts: make(map[int]*TiDBStatement)})

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(id1 int, id2 int, id3 int, PRIMARY KEY(id1), UNIQUE KEY udx_id2 (id2))")
tk.MustExec("insert into t1 values (1, 1, 1)")
tk.MustExec("insert into t1 values (10, 10, 10)")
require.Equal(t, "ON", tk.MustQuery("show variables like 'tidb_rc_read_check_ts'").Rows()[0][1])

tk.MustExec("set transaction_isolation = 'READ-COMMITTED'")
tk2.MustExec("set transaction_isolation = 'READ-COMMITTED'")

// Execute in text protocol
se.SetValue(sessiontxn.CallOnStmtRetryCount, 0)
tk.MustExec("begin pessimistic")
tk2.MustExec("update t1 set id3 = id3 + 1 where id1 = 1")
err := cc.handleQuery(ctx, "select * from t1 where id1 = 1")
require.NoError(t, err)
tk.MustExec("commit")
count, ok := se.Value(sessiontxn.CallOnStmtRetryCount).(int)
require.Equal(t, true, ok)
require.Equal(t, 1, count)

// Execute in prepare binary protocol
se.SetValue(sessiontxn.CallOnStmtRetryCount, 0)
tk.MustExec("begin pessimistic")
tk2.MustExec("update t1 set id3 = id3 + 1 where id1 = 1")
require.NoError(t, cc.handleStmtPrepare(ctx, "select * from t1 where id1 = 1"))
require.NoError(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}))
tk.MustExec("commit")
count, ok = se.Value(sessiontxn.CallOnStmtRetryCount).(int)
require.Equal(t, true, ok)
require.Equal(t, 1, count)

tk.MustExec("drop table t1")
}

func TestRcReadCheckTS(t *testing.T) {
ts := createTidbTestSuite(t)

Expand Down
14 changes: 14 additions & 0 deletions sessiontxn/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ var TsoWaitCount stringutil.StringerStr = "tsoWaitCount"
// TsoUseConstantCount is the key for constant tso counter
var TsoUseConstantCount stringutil.StringerStr = "tsoUseConstantCount"

// CallOnStmtRetryCount is the key for recording calling OnStmtRetry at RC isolation level
var CallOnStmtRetryCount stringutil.StringerStr = "callOnStmtRetryCount"

// AssertLockErr is used to record the lock errors we encountered
// Only for test
var AssertLockErr stringutil.StringerStr = "assertLockError"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caen they be defined as const variables?

Expand Down Expand Up @@ -153,6 +156,17 @@ func TsoUseConstantCountInc(sctx sessionctx.Context) {
sctx.SetValue(TsoUseConstantCount, count)
}

// OnStmtRetryCountInc is used only for test.
// When it is called, there is calling `(p *PessimisticRCTxnContextProvider) OnStmtRetry`.
func OnStmtRetryCountInc(sctx sessionctx.Context) {
count, ok := sctx.Value(CallOnStmtRetryCount).(int)
if !ok {
count = 0
}
count++
sctx.SetValue(CallOnStmtRetryCount, count)
}

// ExecTestHook is used only for test. It consumes hookKey in session wait do what it gets from it.
func ExecTestHook(sctx sessionctx.Context, hookKey fmt.Stringer) {
c := sctx.Value(hookKey)
Expand Down
7 changes: 4 additions & 3 deletions sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func (p *PessimisticRCTxnContextProvider) OnStmtRetry(ctx context.Context) error
if err := p.baseTxnContextProvider.OnStmtRetry(ctx); err != nil {
return err
}
failpoint.Inject("CallOnStmtRetry", func() {
sessiontxn.OnStmtRetryCountInc(p.sctx)
})
p.latestOracleTSValid = false
p.checkTSInWriteStmt = false
return p.prepareStmt(false)
}
Expand Down Expand Up @@ -193,15 +197,12 @@ func (p *PessimisticRCTxnContextProvider) handleAfterQueryError(queryErr error)
return sessiontxn.NoIdea()
}

p.latestOracleTSValid = false

logutil.Logger(p.ctx).Info("RC read with ts checking has failed, retry RC read",
zap.String("sql", sessVars.StmtCtx.OriginalSQL), zap.Error(queryErr))
return sessiontxn.RetryReady()
}

func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockErr error) (sessiontxn.StmtErrorAction, error) {
p.latestOracleTSValid = false
txnCtx := p.sctx.GetSessionVars().TxnCtx
retryable := false
if deadlock, ok := errors.Cause(lockErr).(*tikverr.ErrDeadlock); ok && deadlock.IsRetryable {
Expand Down