From 1b0d6589908515c1cde91a2e6b3b4f21ae1dc2c1 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 13 Nov 2018 13:26:26 +0800 Subject: [PATCH 01/11] support batch commit for the TiDB server --- config/config.go | 2 ++ config/config.toml.example | 2 ++ session/session_test.go | 17 ++++++++++++++--- session/tidb.go | 18 +++++++++++------- 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/config/config.go b/config/config.go index 61f6c961f1902..e49fdc6323c62 100644 --- a/config/config.go +++ b/config/config.go @@ -155,6 +155,7 @@ type Performance struct { StatsLease string `toml:"stats-lease" json:"stats-lease"` RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"` StmtCountLimit uint `toml:"stmt-count-limit" json:"stmt-count-limit"` + BatchCommit bool `toml:"batch-commit" json:"batch-commit"` FeedbackProbability float64 `toml:"feedback-probability" json:"feedback-probability"` QueryFeedbackLimit uint `toml:"query-feedback-limit" json:"query-feedback-limit"` PseudoEstimateRatio float64 `toml:"pseudo-estimate-ratio" json:"pseudo-estimate-ratio"` @@ -292,6 +293,7 @@ var defaultConf = Config{ StatsLease: "3s", RunAutoAnalyze: true, StmtCountLimit: 5000, + BatchCommit: false, FeedbackProbability: 0.05, QueryFeedbackLimit: 1024, PseudoEstimateRatio: 0.8, diff --git a/config/config.toml.example b/config/config.toml.example index 71f35f0eb4f1a..d40b89a0d56be 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -124,6 +124,8 @@ metrics-interval = 15 max-procs = 0 # StmtCountLimit limits the max count of statement inside a transaction. stmt-count-limit = 5000 +# If true, the transaction will be committed when it reaches stmt-count-limit and starts a new transaction. +batch-commit = false # Set keep alive option for tcp connection. tcp-keep-alive = true diff --git a/session/session_test.go b/session/session_test.go index 76b9b12272d83..fac32270b4dd7 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -1925,13 +1925,13 @@ func (s *testSessionSuite) TestStatementErrorInTransaction(c *C) { tk.MustQuery("select * from test where a = 1 and b = 11").Check(testkit.Rows()) } -func (s *testSessionSuite) TestStatementCountLimit(c *C) { +func (s *testSessionSuite) TestStatementCountLimitAndBatchCommit(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table stmt_count_limit (id int)") - saved := config.GetGlobalConfig().Performance.StmtCountLimit + savedLimit := config.GetGlobalConfig().Performance.StmtCountLimit config.GetGlobalConfig().Performance.StmtCountLimit = 3 defer func() { - config.GetGlobalConfig().Performance.StmtCountLimit = saved + config.GetGlobalConfig().Performance.StmtCountLimit = savedLimit }() tk.MustExec("begin") tk.MustExec("insert into stmt_count_limit values (1)") @@ -1946,6 +1946,17 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) { tk.MustExec("insert into stmt_count_limit values (3)") _, err = tk.Exec("insert into stmt_count_limit values (4)") c.Assert(err, NotNil) + + savedBatchFlag := config.GetGlobalConfig().Performance.BatchCommit + config.GetGlobalConfig().Performance.BatchCommit = true + defer func() { + config.GetGlobalConfig().Performance.BatchCommit = savedBatchFlag + }() + tk.MustExec("begin") + tk.MustExec("insert into stmt_count_limit values (1)") + tk.MustExec("insert into stmt_count_limit values (2)") + tk.MustExec("insert into stmt_count_limit values (3)") + tk.MustExec("commit") } func (s *testSessionSuite) TestCastTimeToDate(c *C) { diff --git a/session/tidb.go b/session/tidb.go index 32329ec76379f..4b939f8f062c9 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -162,8 +162,8 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) if !se.sessionVars.InTxn() { if err != nil { log.Info("RollbackTxn for ddl/autocommit error.") - err1 := se.RollbackTxn(ctx) - terror.Log(errors.Trace(err1)) + err = se.RollbackTxn(ctx) + terror.Log(errors.Trace(err)) } else { err = se.CommitTxn(ctx) } @@ -171,11 +171,15 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) // If the user insert, insert, insert ... but never commit, TiDB would OOM. // So we limit the statement count in a transaction here. history := GetHistory(sctx) - if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) { - err1 := se.RollbackTxn(ctx) - terror.Log(errors.Trace(err1)) - return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", - history.Count(), sctx.GetSessionVars().IsAutocommit()) + perfConfig := config.GetGlobalConfig().Performance + if history.Count() > int(perfConfig.StmtCountLimit) { + if !perfConfig.BatchCommit { + err = se.RollbackTxn(ctx) + terror.Log(errors.Trace(err)) + return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", + history.Count(), sctx.GetSessionVars().IsAutocommit()) + } + err = se.NewTxn() } } return rs, errors.Trace(err) From 46dcb7cb6a5958dc81005d51ce8a053c785ab86c Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 13 Nov 2018 14:12:29 +0800 Subject: [PATCH 02/11] tiny fix --- session/tidb.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/session/tidb.go b/session/tidb.go index 4b939f8f062c9..e0451ea55f00c 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -162,8 +162,8 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) if !se.sessionVars.InTxn() { if err != nil { log.Info("RollbackTxn for ddl/autocommit error.") - err = se.RollbackTxn(ctx) - terror.Log(errors.Trace(err)) + err1 := se.RollbackTxn(ctx) + terror.Log(errors.Trace(err1)) } else { err = se.CommitTxn(ctx) } @@ -174,8 +174,8 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) perfConfig := config.GetGlobalConfig().Performance if history.Count() > int(perfConfig.StmtCountLimit) { if !perfConfig.BatchCommit { - err = se.RollbackTxn(ctx) - terror.Log(errors.Trace(err)) + err1 := se.RollbackTxn(ctx) + terror.Log(errors.Trace(err1)) return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", history.Count(), sctx.GetSessionVars().IsAutocommit()) } From 6c020f6618553d51d0076019acf5f6c3a36a9b50 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 13 Nov 2018 21:37:25 +0800 Subject: [PATCH 03/11] address comments --- session/session_test.go | 20 +++++++++++++++++--- session/tidb.go | 5 +++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/session/session_test.go b/session/session_test.go index fac32270b4dd7..2015a667996e8 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -1952,11 +1952,25 @@ func (s *testSessionSuite) TestStatementCountLimitAndBatchCommit(c *C) { defer func() { config.GetGlobalConfig().Performance.BatchCommit = savedBatchFlag }() + tk1 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("SET SESSION autocommit = 1") tk.MustExec("begin") - tk.MustExec("insert into stmt_count_limit values (1)") - tk.MustExec("insert into stmt_count_limit values (2)") - tk.MustExec("insert into stmt_count_limit values (3)") + tk.MustExec("insert into stmt_count_limit values (5)") + tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows()) + tk.MustExec("insert into stmt_count_limit values (6)") + tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows()) + tk.MustExec("insert into stmt_count_limit values (7)") + tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("insert into stmt_count_limit values (8)") + tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("insert into stmt_count_limit values (9)") + tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("insert into stmt_count_limit values (10)") + tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7")) tk.MustExec("commit") + tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7", "8", "9", "10")) + tk.MustExec("insert into stmt_count_limit values (11)") + tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7", "8", "9", "10", "11")) } func (s *testSessionSuite) TestCastTimeToDate(c *C) { diff --git a/session/tidb.go b/session/tidb.go index e0451ea55f00c..c514e41f2b97c 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -180,6 +180,11 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) history.Count(), sctx.GetSessionVars().IsAutocommit()) } err = se.NewTxn() + // The transaction does not committed yet, we need to keep it in transaction. + // The last history could not be "commit" statement. + // It means it is impossible to start a new transaction at the end of the transaction (the "commit" statement). + // Because after the server executed "commit" statement, the session is out of the transaction. + se.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } } return rs, errors.Trace(err) From c326322263fa5a32791cbcfcd73e24384af231c8 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Thu, 15 Nov 2018 10:37:05 +0800 Subject: [PATCH 04/11] add test cases --- session/session_test.go | 58 ++++++++++++++++++++++++++++------------- session/tidb.go | 6 ++--- 2 files changed, 43 insertions(+), 21 deletions(-) diff --git a/session/session_test.go b/session/session_test.go index 2015a667996e8..6a0eca21197f1 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -1925,7 +1925,7 @@ func (s *testSessionSuite) TestStatementErrorInTransaction(c *C) { tk.MustQuery("select * from test where a = 1 and b = 11").Check(testkit.Rows()) } -func (s *testSessionSuite) TestStatementCountLimitAndBatchCommit(c *C) { +func (s *testSessionSuite) TestStatementCountLimit(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table stmt_count_limit (id int)") savedLimit := config.GetGlobalConfig().Performance.StmtCountLimit @@ -1946,31 +1946,53 @@ func (s *testSessionSuite) TestStatementCountLimitAndBatchCommit(c *C) { tk.MustExec("insert into stmt_count_limit values (3)") _, err = tk.Exec("insert into stmt_count_limit values (4)") c.Assert(err, NotNil) +} - savedBatchFlag := config.GetGlobalConfig().Performance.BatchCommit +func (s *testSessionSuite) TestBatchCommit(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t (id int)") + saved := config.GetGlobalConfig().Performance + config.GetGlobalConfig().Performance.StmtCountLimit = 3 config.GetGlobalConfig().Performance.BatchCommit = true defer func() { - config.GetGlobalConfig().Performance.BatchCommit = savedBatchFlag + config.GetGlobalConfig().Performance = saved }() tk1 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("SET SESSION autocommit = 1") tk.MustExec("begin") - tk.MustExec("insert into stmt_count_limit values (5)") - tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows()) - tk.MustExec("insert into stmt_count_limit values (6)") - tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows()) - tk.MustExec("insert into stmt_count_limit values (7)") - tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7")) - tk.MustExec("insert into stmt_count_limit values (8)") - tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7")) - tk.MustExec("insert into stmt_count_limit values (9)") - tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7")) - tk.MustExec("insert into stmt_count_limit values (10)") - tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("insert into t values (1)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("insert into t values (2)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("rollback") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + + // The above rollback will not make the session in transaction. + tk.MustExec("insert into t values (1)") + tk1.MustQuery("select * from t").Check(testkit.Rows("1")) + tk.MustExec("delete from t") + + tk.MustExec("begin") + tk.MustExec("insert into t values (5)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("insert into t values (6)") + tk1.MustQuery("select * from t").Check(testkit.Rows()) + tk.MustExec("insert into t values (7)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + + // The session is still in transaction. + tk.MustExec("insert into t values (8)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("insert into t values (9)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) + tk.MustExec("insert into t values (10)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7")) tk.MustExec("commit") - tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7", "8", "9", "10")) - tk.MustExec("insert into stmt_count_limit values (11)") - tk1.MustQuery("select * from stmt_count_limit").Check(testkit.Rows("5", "6", "7", "8", "9", "10", "11")) + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10")) + + // The above commit will not make the session in transaction. + tk.MustExec("insert into t values (11)") + tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10", "11")) } func (s *testSessionSuite) TestCastTimeToDate(c *C) { diff --git a/session/tidb.go b/session/tidb.go index c514e41f2b97c..46f07d82b8773 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -181,9 +181,9 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) } err = se.NewTxn() // The transaction does not committed yet, we need to keep it in transaction. - // The last history could not be "commit" statement. - // It means it is impossible to start a new transaction at the end of the transaction (the "commit" statement). - // Because after the server executed "commit" statement, the session is out of the transaction. + // The last history could not be "commit"/"rollback" statement. + // It means it is impossible to start a new transaction at the end of the transaction. + // Because after the server executed "commit"/"rollback" statement, the session is out of the transaction. se.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } } From 740387dd4c3461edd8ac4f1e26806bba656b7051 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Thu, 15 Nov 2018 10:48:10 +0800 Subject: [PATCH 05/11] tiny fix --- config/config.toml.example | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config/config.toml.example b/config/config.toml.example index d40b89a0d56be..1f8c45ac45b80 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -122,8 +122,10 @@ metrics-interval = 15 [performance] # Max CPUs to use, 0 use number of CPUs in the machine. max-procs = 0 + # StmtCountLimit limits the max count of statement inside a transaction. stmt-count-limit = 5000 + # If true, the transaction will be committed when it reaches stmt-count-limit and starts a new transaction. batch-commit = false From 16a645defc6da0fb205cd16d05823d6af01d849f Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Fri, 16 Nov 2018 14:13:07 +0800 Subject: [PATCH 06/11] add one case --- session/session_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/session/session_test.go b/session/session_test.go index 44a69677c361f..fa918a32b950f 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2044,6 +2044,16 @@ func (s *testSessionSuite) TestBatchCommit(c *C) { // The above commit will not make the session in transaction. tk.MustExec("insert into t values (11)") tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10", "11")) + + tk.MustExec("delete from t") + tk.MustExec("SET SESSION autocommit = 0") + tk.MustExec("insert into t values (1)") + tk.MustExec("insert into t values (2)") + tk.MustExec("insert into t values (3)") + tk.MustExec("rollback") + tk1.MustExec("insert into t values (4)") + tk1.MustExec("insert into t values (5)") + tk.MustQuery("select * from t").Check(testkit.Rows("4", "5")) } func (s *testSessionSuite) TestCastTimeToDate(c *C) { From 8982dd22c2d345a2531b01e17e7311e52711176c Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Mon, 3 Dec 2018 14:19:09 +0800 Subject: [PATCH 07/11] move to a session var --- config/config.go | 2 -- config/config.toml.example | 3 --- session/tidb.go | 3 +-- sessionctx/variable/session.go | 2 ++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 5 +++++ sessionctx/variable/varsutil.go | 2 +- 7 files changed, 10 insertions(+), 8 deletions(-) diff --git a/config/config.go b/config/config.go index 8676e79d03e48..8abadd47d8ead 100644 --- a/config/config.go +++ b/config/config.go @@ -156,7 +156,6 @@ type Performance struct { StatsLease string `toml:"stats-lease" json:"stats-lease"` RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"` StmtCountLimit uint `toml:"stmt-count-limit" json:"stmt-count-limit"` - BatchCommit bool `toml:"batch-commit" json:"batch-commit"` FeedbackProbability float64 `toml:"feedback-probability" json:"feedback-probability"` QueryFeedbackLimit uint `toml:"query-feedback-limit" json:"query-feedback-limit"` PseudoEstimateRatio float64 `toml:"pseudo-estimate-ratio" json:"pseudo-estimate-ratio"` @@ -296,7 +295,6 @@ var defaultConf = Config{ StatsLease: "3s", RunAutoAnalyze: true, StmtCountLimit: 5000, - BatchCommit: false, FeedbackProbability: 0.05, QueryFeedbackLimit: 1024, PseudoEstimateRatio: 0.8, diff --git a/config/config.toml.example b/config/config.toml.example index 3a93b9bf917f8..3e93a13c3f94d 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -129,9 +129,6 @@ max-memory = 0 # StmtCountLimit limits the max count of statement inside a transaction. stmt-count-limit = 5000 -# If true, the transaction will be committed when it reaches stmt-count-limit and starts a new transaction. -batch-commit = false - # Set keep alive option for tcp connection. tcp-keep-alive = true diff --git a/session/tidb.go b/session/tidb.go index ab93b35e95517..7e4e415f58d8b 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -178,8 +178,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) // If the user insert, insert, insert ... but never commit, TiDB would OOM. // So we limit the statement count in a transaction here. history := GetHistory(sctx) - perfConfig := config.GetGlobalConfig().Performance - if history.Count() > int(perfConfig.StmtCountLimit) { + if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) { if !perfConfig.BatchCommit { err1 := se.RollbackTxn(ctx) terror.Log(errors.Trace(err1)) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 060b323aa3bb9..b517b4a5e2d05 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -591,6 +591,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.BatchInsert = TiDBOptOn(val) case TiDBBatchDelete: s.BatchDelete = TiDBOptOn(val) + case TiDBBatchCommit: + s.BatchCommit = TiDBOptOn(val) case TiDBDMLBatchSize: s.DMLBatchSize = tidbOptPositiveInt32(val, DefDMLBatchSize) case TiDBCurrentTS, TiDBConfig: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index d702dd66b70f3..ba61b5d63dbda 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -637,6 +637,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)}, {ScopeSession, TiDBBatchInsert, boolToIntStr(DefBatchInsert)}, {ScopeSession, TiDBBatchDelete, boolToIntStr(DefBatchDelete)}, + {ScopeSession, TiDBBatchCommit, boolToIntStr(DefBatchCommit)}, {ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)}, {ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)}, {ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 6bbfda2d9fc4e..40b0691e33839 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -71,6 +71,10 @@ const ( // split data into multiple batches and use a single txn for each batch. This will be helpful when deleting large data. TiDBBatchDelete = "tidb_batch_delete" + // tidb_batch_commit is used to enable/disable auto-split the transaction. + // If set this option on, the transaction will be committed when it reaches stmt-count-limit and starts a new transaction. + TiDBBatchCommit = "tidb_batch_commit" + // tidb_dml_batch_size is used to split the insert/delete data into small batches. // It only takes effort when tidb_batch_insert/tidb_batch_delete is on. // Its default value is 20000. When the row size is large, 20k rows could be larger than 100MB. @@ -237,6 +241,7 @@ const ( DefOptInSubqToJoinAndAgg = true DefBatchInsert = false DefBatchDelete = false + DefBatchCommit = false DefCurretTS = 0 DefMaxChunkSize = 32 DefDMLBatchSize = 20000 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 79143d0155965..175036b305be2 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -310,7 +310,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, case AutocommitVar, TiDBSkipUTF8Check, TiDBOptAggPushDown, TiDBOptInSubqToJoinAndAgg, TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, - TiDBBatchDelete, TiDBEnableCascadesPlanner: + TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner: if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" { return value, nil } From dec9ba192c35686668340a9b673e7bd9e47ae71f Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Mon, 3 Dec 2018 14:20:22 +0800 Subject: [PATCH 08/11] fix test --- session/session_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session/session_test.go b/session/session_test.go index 9788a0a0935d5..965de14f5ed27 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2002,10 +2002,10 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) { func (s *testSessionSuite) TestBatchCommit(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("set tidb_batch_commit = 1") tk.MustExec("create table t (id int)") saved := config.GetGlobalConfig().Performance config.GetGlobalConfig().Performance.StmtCountLimit = 3 - config.GetGlobalConfig().Performance.BatchCommit = true defer func() { config.GetGlobalConfig().Performance = saved }() From 8cbcd37946298fe211a8148818f0f3abd89a111b Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Mon, 3 Dec 2018 14:36:27 +0800 Subject: [PATCH 09/11] fix CI --- session/tidb.go | 7 ++++--- sessionctx/variable/session.go | 3 +++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/session/tidb.go b/session/tidb.go index 7e4e415f58d8b..2cdb7f843747d 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -152,8 +152,9 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) var rs sqlexec.RecordSet se := sctx.(*session) rs, err = s.Exec(ctx) + sessVars := se.GetSessionVars() // All the history should be added here. - se.GetSessionVars().TxnCtx.StatementCount++ + sessVars.TxnCtx.StatementCount++ if !s.IsReadOnly() { if err == nil { GetHistory(sctx).Add(0, s, se.sessionVars.StmtCtx) @@ -166,7 +167,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) } } } - if !se.sessionVars.InTxn() { + if !sessVars.InTxn() { if err != nil { log.Info("RollbackTxn for ddl/autocommit error.") err1 := se.RollbackTxn(ctx) @@ -179,7 +180,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) // So we limit the statement count in a transaction here. history := GetHistory(sctx) if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) { - if !perfConfig.BatchCommit { + if !sessVars.BatchCommit { err1 := se.RollbackTxn(ctx) terror.Log(errors.Trace(err1)) return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index b517b4a5e2d05..0fc863a615205 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -285,6 +285,9 @@ type SessionVars struct { // BatchDelete indicates if we should split delete data into multiple batches. BatchDelete bool + // BatchCommit indicates if we should split the transaction into multiple batches. + BatchCommit bool + // IDAllocator is provided by kvEncoder, if it is provided, we will use it to alloc auto id instead of using // Table.alloc. IDAllocator autoid.Allocator From 72c1139b026f645dab5ba3fe0c523aa9f2302f18 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Mon, 3 Dec 2018 15:08:45 +0800 Subject: [PATCH 10/11] add a set test --- executor/set_test.go | 9 +++++++++ session/session_test.go | 4 ++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/executor/set_test.go b/executor/set_test.go index f3e6eacbf8a85..2fa9ca4102cea 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -253,6 +253,15 @@ func (s *testSuite) TestSetVar(c *C) { tk.MustQuery("select @@session.tidb_query_log_max_len;").Check(testkit.Rows("20")) _, err = tk.Exec("set global tidb_query_log_max_len = 20") c.Assert(err, NotNil) + + tk.MustExec("set tidb_batch_commit = 0") + tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("0")) + tk.MustExec("set tidb_batch_commit = 1") + tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("1")) + _, err = tk.Exec("set global tidb_batch_commit = 0") + c.Assert(err, NotNil) + _, err = tk.Exec("set global tidb_batch_commit = 2") + c.Assert(err, NotNil) } func (s *testSuite) TestSetCharset(c *C) { diff --git a/session/session_test.go b/session/session_test.go index 965de14f5ed27..cc24b321d6b7d 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -1980,10 +1980,10 @@ func (s *testSessionSuite) TestStatementErrorInTransaction(c *C) { func (s *testSessionSuite) TestStatementCountLimit(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table stmt_count_limit (id int)") - savedLimit := config.GetGlobalConfig().Performance.StmtCountLimit + saved := config.GetGlobalConfig().Performance.StmtCountLimit config.GetGlobalConfig().Performance.StmtCountLimit = 3 defer func() { - config.GetGlobalConfig().Performance.StmtCountLimit = savedLimit + config.GetGlobalConfig().Performance.StmtCountLimit = saved }() tk.MustExec("begin") tk.MustExec("insert into stmt_count_limit values (1)") From 7ef42754505454fdd6933b969e6d12e7ae05c609 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Mon, 10 Dec 2018 17:33:08 +0800 Subject: [PATCH 11/11] resolve conflicts --- session/tidb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session/tidb.go b/session/tidb.go index e28e35e3f6648..de9dacbd3a8dd 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -187,7 +187,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t", history.Count(), sctx.GetSessionVars().IsAutocommit()) } - err = se.NewTxn() + err = se.NewTxn(ctx) // The transaction does not committed yet, we need to keep it in transaction. // The last history could not be "commit"/"rollback" statement. // It means it is impossible to start a new transaction at the end of the transaction.