From 596c0b21870ded1b4636decffc0ab213982b71d9 Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 16 Jun 2021 12:01:54 +0800 Subject: [PATCH 1/5] executor: mutex snapshot with txn Signed-off-by: xhe --- executor/set.go | 8 ++++-- executor/simple.go | 6 ++++- executor/stale_txn_test.go | 46 +++++++++++++++++++++++++-------- sessionctx/variable/varsutil.go | 3 +++ 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/executor/set.go b/executor/set.go index 825f79e985fc9..9f9e70157146d 100644 --- a/executor/set.go +++ b/executor/set.go @@ -146,8 +146,12 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e sessionVars.TxnReadTS.SetTxnReadTS(oldSnapshotTS) } } - if name == variable.TxnIsolationOneShot && sessionVars.InTxn() { - return errors.Trace(ErrCantChangeTxCharacteristics) + if sessionVars.InTxn() { + if name == variable.TxnIsolationOneShot || + name == variable.TiDBTxnReadTS || + name == variable.TiDBSnapshot { + return errors.Trace(ErrCantChangeTxCharacteristics) + } } err = variable.SetSessionSystemVar(sessionVars, name, valStr) if err != nil { diff --git a/executor/simple.go b/executor/simple.go index e14d64f04a319..4d8bd82a0e60d 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -587,7 +587,11 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { // With START TRANSACTION, autocommit remains disabled until you end // the transaction with COMMIT or ROLLBACK. The autocommit mode then // reverts to its previous state. - e.ctx.GetSessionVars().SetInTxn(true) + vars := e.ctx.GetSessionVars() + if err := vars.SetSystemVar(variable.TiDBSnapshot, ""); err != nil { + return errors.Trace(err) + } + vars.SetInTxn(true) return nil } // If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 4575e60ef29b5..eecd701c19a3d 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/testkit" "github.com/tikv/client-go/v2/oracle" ) @@ -291,18 +292,8 @@ func (s *testStaleTxnSuite) TestStalenessAndHistoryRead(c *C) { ON DUPLICATE KEY UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) tk.MustExec(updateSafePoint) - // set @@tidb_snapshot before staleness txn - tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`) - // 1599321600000 == 2020-09-06 00:00:00 - c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000)) - tk.MustExec("commit") - // set @@tidb_snapshot during staleness txn - tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`) - tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`) - c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000)) - tk.MustExec("commit") - // test mutex + // test set txn as of will flush/mutex tidb_snapshot tk.MustExec(`set @@tidb_snapshot="2020-10-08 16:45:26";`) c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(419993151340544000)) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) @@ -310,12 +301,45 @@ func (s *testStaleTxnSuite) TestStalenessAndHistoryRead(c *C) { c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(419993167069184000)) + // test tidb_snapshot will flush/mutex set txn as of tk.MustExec("SET TRANSACTION READ ONLY AS OF TIMESTAMP '2020-10-08 16:46:26'") c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(419993167069184000)) tk.MustExec(`set @@tidb_snapshot="2020-10-08 16:45:26";`) c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(419993151340544000)) + c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0)) + + c.Assert(tk.Se.GetSessionVars().SetSystemVar(variable.TiDBTxnReadTS, ""), IsNil) + + // test start txn will flush/mutex tidb_snapshot + tk.MustExec(`set @@tidb_snapshot="2020-10-08 16:45:26";`) + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(419993151340544000)) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) + tk.MustExec("START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-10-08 16:46:26'") + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) + c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil) + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, uint64(419993167069184000)) + tk.MustExec("commit") + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) + c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil) + + // test snapshot mutex with txn + tk.MustExec("START TRANSACTION") + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) + c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil) + err := tk.ExecToErr(`set @@tidb_snapshot="2020-10-08 16:45:26";`) + c.Assert(err, ErrorMatches, ".*Transaction characteristics can't be changed while a transaction is in progress") + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) + c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil) + tk.MustExec("commit") + + // test set txn as of txn mutex with txn + tk.MustExec("START TRANSACTION") + c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0)) + err = tk.ExecToErr("SET TRANSACTION READ ONLY AS OF TIMESTAMP '2020-10-08 16:46:26'") + c.Assert(err, ErrorMatches, ".*Transaction characteristics can't be changed while a transaction is in progress") + c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0)) + tk.MustExec("commit") } func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 2763e00fdfe33..b0e13fe972b77 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -359,6 +359,7 @@ func parseTimeZone(s string) (*time.Location, error) { func setSnapshotTS(s *SessionVars, sVal string) error { if sVal == "" { s.SnapshotTS = 0 + s.SnapshotInfoschema = nil return nil } @@ -384,6 +385,7 @@ func setTxnReadTS(s *SessionVars, sVal string) error { s.TxnReadTS = NewTxnReadTS(0) return nil } + t, err := types.ParseTime(s.StmtCtx, sVal, mysql.TypeTimestamp, types.MaxFsp) if err != nil { return err @@ -395,6 +397,7 @@ func setTxnReadTS(s *SessionVars, sVal string) error { s.TxnReadTS = NewTxnReadTS(oracle.GoTimeToTS(t1)) // tx_read_ts should be mutual exclusive with tidb_snapshot s.SnapshotTS = 0 + s.SnapshotInfoschema = nil return err } From 82836270e333302cfb6591f0a3921e8faeaee606 Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 16 Jun 2021 12:14:42 +0800 Subject: [PATCH 2/5] executor: fix fmt Signed-off-by: xhe --- executor/stale_txn_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index eecd701c19a3d..6d0683478203f 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -21,8 +21,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/testkit" "github.com/tikv/client-go/v2/oracle" ) From f9854734d668882920ea8c8e59290718e595f4fb Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 16 Jun 2021 12:25:20 +0800 Subject: [PATCH 3/5] executor: fix tests Signed-off-by: xhe --- executor/stale_txn_test.go | 3 +-- sessionctx/variable/sysvar.go | 3 --- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 6d0683478203f..6d835ad38df74 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -753,8 +753,7 @@ func (s *testStaleTxnSuite) TestAsOfTimestampCompatibility(c *C) { for _, testcase := range testcases { tk.MustExec(testcase.beginSQL) err := tk.ExecToErr(testcase.sql) - c.Assert(err, NotNil) - c.Assert(err.Error(), Matches, ".*as of timestamp can't be set in transaction.*") + c.Assert(err, ErrorMatches, ".*as of timestamp can't be set in transaction.*|.*Transaction characteristics can't be changed while a transaction is in progress") tk.MustExec("commit") } tk.MustExec(`create table test.table1 (id int primary key, a int);`) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 6bafe04d33a46..2ac1e0b403d7d 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -831,9 +831,6 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeSession, Name: TiDBTxnReadTS, Value: "", Hidden: true, SetSession: func(s *SessionVars, val string) error { return setTxnReadTS(s, val) }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { - if vars.InTxn() { - return "", errors.New("as of timestamp can't be set in transaction") - } return normalizedValue, nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution), SetSession: func(s *SessionVars, val string) error { From 032d3a839e2bbaa370f981a9193f5f415bc817cb Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 16 Jun 2021 21:11:21 +0800 Subject: [PATCH 4/5] executor: refine tests Signed-off-by: xhe --- executor/stale_txn_test.go | 48 ++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 6d835ad38df74..bc3bf00ed1b87 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/testkit" "github.com/tikv/client-go/v2/oracle" @@ -293,35 +292,54 @@ func (s *testStaleTxnSuite) TestStalenessAndHistoryRead(c *C) { UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) tk.MustExec(updateSafePoint) + time1 := time.Now() + time1_ts := oracle.GoTimeToTS(time1) + schemaVer1 := tk.Se.GetInfoSchema().SchemaMetaVersion() + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key);") + tk.MustExec(`drop table if exists t`) + time.Sleep(50 * time.Millisecond) + time2 := time.Now() + time2_ts := oracle.GoTimeToTS(time2) + schemaVer2 := tk.Se.GetInfoSchema().SchemaMetaVersion() + // test set txn as of will flush/mutex tidb_snapshot - tk.MustExec(`set @@tidb_snapshot="2020-10-08 16:45:26";`) - c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(419993151340544000)) + tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time1.Format("2006-1-2 15:04:05.000"))) + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time1_ts) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) - tk.MustExec("SET TRANSACTION READ ONLY AS OF TIMESTAMP '2020-10-08 16:46:26'") + c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1) + tk.MustExec(fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time2.Format("2006-1-2 15:04:05.000"))) c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) - c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(419993167069184000)) + c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) + c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, time2_ts) + c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2) // test tidb_snapshot will flush/mutex set txn as of - tk.MustExec("SET TRANSACTION READ ONLY AS OF TIMESTAMP '2020-10-08 16:46:26'") - c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(419993167069184000)) - tk.MustExec(`set @@tidb_snapshot="2020-10-08 16:45:26";`) - c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(419993151340544000)) + tk.MustExec(fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time1.Format("2006-1-2 15:04:05.000"))) + c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, time1_ts) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) + c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1) + tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time2.Format("2006-1-2 15:04:05.000"))) c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0)) - - c.Assert(tk.Se.GetSessionVars().SetSystemVar(variable.TiDBTxnReadTS, ""), IsNil) + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time2_ts) + c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) + c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2) // test start txn will flush/mutex tidb_snapshot - tk.MustExec(`set @@tidb_snapshot="2020-10-08 16:45:26";`) - c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(419993151340544000)) + tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time1.Format("2006-1-2 15:04:05.000"))) + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time1_ts) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) - tk.MustExec("START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-10-08 16:46:26'") + c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1) + + tk.MustExec(fmt.Sprintf(`START TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time2.Format("2006-1-2 15:04:05.000"))) c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, time2_ts) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil) - c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, uint64(419993167069184000)) + c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2) tk.MustExec("commit") c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil) + c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2) // test snapshot mutex with txn tk.MustExec("START TRANSACTION") From 9e2cf6454803545014357354319c7ea5eaa828a7 Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 16 Jun 2021 21:44:01 +0800 Subject: [PATCH 5/5] executor: fix check dev Signed-off-by: xhe --- executor/stale_txn_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index bc3bf00ed1b87..bc76e1492c10f 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -293,47 +293,47 @@ func (s *testStaleTxnSuite) TestStalenessAndHistoryRead(c *C) { tk.MustExec(updateSafePoint) time1 := time.Now() - time1_ts := oracle.GoTimeToTS(time1) + time1TS := oracle.GoTimeToTS(time1) schemaVer1 := tk.Se.GetInfoSchema().SchemaMetaVersion() tk.MustExec("drop table if exists t") tk.MustExec("create table t (id int primary key);") tk.MustExec(`drop table if exists t`) time.Sleep(50 * time.Millisecond) time2 := time.Now() - time2_ts := oracle.GoTimeToTS(time2) + time2TS := oracle.GoTimeToTS(time2) schemaVer2 := tk.Se.GetInfoSchema().SchemaMetaVersion() // test set txn as of will flush/mutex tidb_snapshot tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time1.Format("2006-1-2 15:04:05.000"))) - c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time1_ts) + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time1TS) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1) tk.MustExec(fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time2.Format("2006-1-2 15:04:05.000"))) c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) - c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, time2_ts) + c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, time2TS) c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2) // test tidb_snapshot will flush/mutex set txn as of tk.MustExec(fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time1.Format("2006-1-2 15:04:05.000"))) - c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, time1_ts) + c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, time1TS) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1) tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time2.Format("2006-1-2 15:04:05.000"))) c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0)) - c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time2_ts) + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time2TS) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2) // test start txn will flush/mutex tidb_snapshot tk.MustExec(fmt.Sprintf(`set @@tidb_snapshot="%s"`, time1.Format("2006-1-2 15:04:05.000"))) - c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time1_ts) + c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, time1TS) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, NotNil) c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1) tk.MustExec(fmt.Sprintf(`START TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time2.Format("2006-1-2 15:04:05.000"))) c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0)) - c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, time2_ts) + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, time2TS) c.Assert(tk.Se.GetSessionVars().SnapshotInfoschema, IsNil) c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2) tk.MustExec("commit")