Skip to content

Commit

Permalink
executor: mutex snapshot with txn (#25468)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored Jun 16, 2021
1 parent 0490590 commit 3c97912
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 27 deletions.
8 changes: 6 additions & 2 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,12 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
sessionVars.TxnReadTS.SetTxnReadTS(oldSnapshotTS)
}
}
if name == variable.TxnIsolationOneShot && sessionVars.InTxn() {
return errors.Trace(ErrCantChangeTxCharacteristics)
if sessionVars.InTxn() {
if name == variable.TxnIsolationOneShot ||
name == variable.TiDBTxnReadTS ||
name == variable.TiDBSnapshot {
return errors.Trace(ErrCantChangeTxCharacteristics)
}
}
err = variable.SetSessionSystemVar(sessionVars, name, valStr)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,11 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetInTxn(true)
vars := e.ctx.GetSessionVars()
if err := vars.SetSystemVar(variable.TiDBSnapshot, ""); err != nil {
return errors.Trace(err)
}
vars.SetInTxn(true)
return nil
}
// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
Expand Down
83 changes: 62 additions & 21 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,31 +291,73 @@ func (s *testStaleTxnSuite) TestStalenessAndHistoryRead(c *C) {
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)
// set @@tidb_snapshot before staleness txn
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`)
// 1599321600000 == 2020-09-06 00:00:00
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")
// set @@tidb_snapshot during staleness txn
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`)
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")

// test mutex
tk.MustExec(`set @@tidb_snapshot="2020-10-08 16:45:26";`)
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(419993151340544000))
time1 := time.Now()
time1TS := oracle.GoTimeToTS(time1)
schemaVer1 := tk.Se.GetInfoSchema().SchemaMetaVersion()
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key);")
tk.MustExec(`drop table if exists t`)
time.Sleep(50 * time.Millisecond)
time2 := time.Now()
time2TS := oracle.GoTimeToTS(time2)
schemaVer2 := tk.Se.GetInfoSchema().SchemaMetaVersion()

// test set txn as of will flush/mutex tidb_snapshot
tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time1.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time1TS)
c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil)
tk.MustExec("SET TRANSACTION READ ONLY AS OF TIMESTAMP '2020-10-08 16:46:26'")
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1)
tk.MustExec(fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time2.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(419993167069184000))
c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil)
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, time2TS)
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2)

tk.MustExec("SET TRANSACTION READ ONLY AS OF TIMESTAMP '2020-10-08 16:46:26'")
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(419993167069184000))
tk.MustExec(`set @@tidb_snapshot="2020-10-08 16:45:26";`)
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(419993151340544000))
// test tidb_snapshot will flush/mutex set txn as of
tk.MustExec(fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time1.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, time1TS)
c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil)
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1)
tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time2.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time2TS)
c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil)
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2)

// test start txn will flush/mutex tidb_snapshot
tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time1.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time1TS)
c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil)
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1)

tk.MustExec(fmt.Sprintf(`START TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time2.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, time2TS)
c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil)
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2)
tk.MustExec("commit")
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))
c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil)
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2)

// test snapshot mutex with txn
tk.MustExec("START TRANSACTION")
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))
c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil)
err := tk.ExecToErr(`set @@tidb_snapshot="2020-10-08 16:45:26";`)
c.Assert(err, ErrorMatches, ".*Transaction characteristics can't be changed while a transaction is in progress")
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))
c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil)
tk.MustExec("commit")

// test set txn as of txn mutex with txn
tk.MustExec("START TRANSACTION")
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))
err = tk.ExecToErr("SET TRANSACTION READ ONLY AS OF TIMESTAMP '2020-10-08 16:46:26'")
c.Assert(err, ErrorMatches, ".*Transaction characteristics can't be changed while a transaction is in progress")
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))
tk.MustExec("commit")
}

func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
Expand Down Expand Up @@ -729,8 +771,7 @@ func (s *testStaleTxnSuite) TestAsOfTimestampCompatibility(c *C) {
for _, testcase := range testcases {
tk.MustExec(testcase.beginSQL)
err := tk.ExecToErr(testcase.sql)
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, ".*as of timestamp can't be set in transaction.*")
c.Assert(err, ErrorMatches, ".*as of timestamp can't be set in transaction.*|.*Transaction characteristics can't be changed while a transaction is in progress")
tk.MustExec("commit")
}
tk.MustExec(`create table test.table1 (id int primary key, a int);`)
Expand Down
3 changes: 0 additions & 3 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,9 +831,6 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeSession, Name: TiDBTxnReadTS, Value: "", Hidden: true, SetSession: func(s *SessionVars, val string) error {
return setTxnReadTS(s, val)
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
if vars.InTxn() {
return "", errors.New("as of timestamp can't be set in transaction")
}
return normalizedValue, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution), SetSession: func(s *SessionVars, val string) error {
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func parseTimeZone(s string) (*time.Location, error) {
func setSnapshotTS(s *SessionVars, sVal string) error {
if sVal == "" {
s.SnapshotTS = 0
s.SnapshotInfoschema = nil
return nil
}

Expand All @@ -384,6 +385,7 @@ func setTxnReadTS(s *SessionVars, sVal string) error {
s.TxnReadTS = NewTxnReadTS(0)
return nil
}

t, err := types.ParseTime(s.StmtCtx, sVal, mysql.TypeTimestamp, types.MaxFsp)
if err != nil {
return err
Expand All @@ -395,6 +397,7 @@ func setTxnReadTS(s *SessionVars, sVal string) error {
s.TxnReadTS = NewTxnReadTS(oracle.GoTimeToTS(t1))
// tx_read_ts should be mutual exclusive with tidb_snapshot
s.SnapshotTS = 0
s.SnapshotInfoschema = nil
return err
}

Expand Down

0 comments on commit 3c97912

Please sign in to comment.