From d438e103bea6ab98c2406903980e2077cce7652c Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 16 Sep 2019 19:46:12 +0800 Subject: [PATCH] executor: load data statement, separate data preparing routine and commit routine (#11533) --- executor/builder.go | 2 +- executor/executor_test.go | 3 +- executor/load_data.go | 164 +++++++++++++++++++++++++++++++++++--- executor/write_test.go | 6 +- server/conn.go | 103 ++++++++++++++---------- server/server_test.go | 10 +++ server/tidb_test.go | 1 - 7 files changed, 232 insertions(+), 57 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 46f50d3b27ae1..53fd43c2b3582 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -739,8 +739,8 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { Ctx: b.ctx, }, } - var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr + loadDataExec.loadDataInfo.InitQueues() loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt) return loadDataExec diff --git a/executor/executor_test.go b/executor/executor_test.go index 85909b70def01..33f4bdb07c183 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -504,8 +504,9 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2) c.Assert(err1, IsNil) c.Assert(reachLimit, IsFalse) - err1 = ld.CheckAndInsertOneBatch(context.Background()) + err1 = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt()) c.Assert(err1, IsNil) + ld.SetMaxRowsInBatch(20000) if tt.restData == nil { c.Assert(data, HasLen, 0, Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data))) diff --git a/executor/load_data.go b/executor/load_data.go index 4983422c35ba1..939e7eaeeb8d0 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -33,7 +33,8 @@ import ( ) var ( - null = []byte("NULL") + null = []byte("NULL") + taskQueueSize = 64 // the maximum number of pending tasks to commit in queue ) // LoadDataExec represents a load data executor. @@ -101,6 +102,12 @@ func (e *LoadDataExec) Open(ctx context.Context) error { return nil } +// CommitTask is used for fetching data from data preparing routine into committing routine. +type CommitTask struct { + cnt uint64 + rows [][]types.Datum +} + // LoadDataInfo saves the information of loading data operation. type LoadDataInfo struct { *InsertValues @@ -113,17 +120,151 @@ type LoadDataInfo struct { IgnoreLines uint64 Ctx sessionctx.Context rows [][]types.Datum + + commitTaskQueue chan CommitTask + StopCh chan struct{} + QuitCh chan struct{} +} + +// GetRows getter for rows +func (e *LoadDataInfo) GetRows() [][]types.Datum { + return e.rows +} + +// GetCurBatchCnt getter for curBatchCnt +func (e *LoadDataInfo) GetCurBatchCnt() uint64 { + return e.curBatchCnt +} + +// CloseTaskQueue preparing routine to inform commit routine no more data +func (e *LoadDataInfo) CloseTaskQueue() { + close(e.commitTaskQueue) +} + +// InitQueues initialize task queue and error report queue +func (e *LoadDataInfo) InitQueues() { + e.commitTaskQueue = make(chan CommitTask, taskQueueSize) + e.StopCh = make(chan struct{}, 2) + e.QuitCh = make(chan struct{}) +} + +// StartStopWatcher monitor StopCh to force quit +func (e *LoadDataInfo) StartStopWatcher() { + go func() { + <-e.StopCh + close(e.QuitCh) + }() +} + +// ForceQuit let commit quit directly +func (e *LoadDataInfo) ForceQuit() { + e.StopCh <- struct{}{} +} + +// MakeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt +func (e *LoadDataInfo) MakeCommitTask() CommitTask { + return CommitTask{e.curBatchCnt, e.rows} +} + +// EnqOneTask feed one batch commit task to commit work +func (e *LoadDataInfo) EnqOneTask(ctx context.Context) error { + var err error + if e.curBatchCnt > 0 { + sendOk := false + for !sendOk { + select { + case e.commitTaskQueue <- e.MakeCommitTask(): + sendOk = true + case <-e.QuitCh: + err = errors.New("EnqOneTask forced to quit") + logutil.Logger(ctx).Error("EnqOneTask forced to quit, possible commitWork error") + return err + } + } + // reset rows buffer, will reallocate buffer but NOT reuse + e.SetMaxRowsInBatch(e.maxRowsInBatch) + } + return err +} + +// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn +func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error { + var err error + defer func() { + if err != nil { + e.Ctx.StmtRollback() + } + }() + err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt) + if err != nil { + logutil.Logger(ctx).Error("commit error CheckAndInsert", zap.Error(err)) + return err + } + if err = e.Ctx.StmtCommit(); err != nil { + logutil.Logger(ctx).Error("commit error commit", zap.Error(err)) + return err + } + // Make sure that there are no retries when committing. + if err = e.Ctx.RefreshTxnCtx(ctx); err != nil { + logutil.Logger(ctx).Error("commit error refresh", zap.Error(err)) + return err + } + return err +} + +// CommitWork commit batch sequentially +func (e *LoadDataInfo) CommitWork(ctx context.Context) error { + var err error + defer func() { + r := recover() + if r != nil { + logutil.Logger(ctx).Error("CommitWork panicked", + zap.Reflect("r", r), + zap.Stack("stack")) + } + if err != nil || r != nil { + e.ForceQuit() + } + if err != nil { + e.ctx.StmtRollback() + } + }() + var tasks uint64 = 0 + var end = false + for !end { + select { + case <-e.QuitCh: + err = errors.New("commit forced to quit") + logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed") + break + case commitTask, ok := <-e.commitTaskQueue: + if ok { + err = e.CommitOneTask(ctx, commitTask) + if err != nil { + break + } + tasks++ + } else { + end = true + break + } + } + if err != nil { + logutil.Logger(ctx).Error("load data commit work error", zap.Error(err)) + break + } + } + return err } // SetMaxRowsInBatch sets the max number of rows to insert in a batch. func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { e.maxRowsInBatch = limit - if uint64(cap(e.rows)) < limit { - e.rows = make([][]types.Datum, 0, limit) - for i := 0; uint64(i) < limit; i++ { - e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols()))) - } + e.rows = make([][]types.Datum, 0, limit) + for i := 0; uint64(i) < limit; i++ { + e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols()))) } + e.curBatchCnt = 0 } // getValidData returns prevData and curData that starts from starting symbol. @@ -271,7 +412,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) e.curBatchCnt++ if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 { reachLimit = true - logutil.BgLogger().Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize), + logutil.Logger(ctx).Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize), zap.Uint64("totalRows", e.rowCount)) break } @@ -280,17 +421,16 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) } // CheckAndInsertOneBatch is used to commit one transaction batch full filled data -func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context) error { +func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error { var err error - if e.curBatchCnt == 0 { + if cnt == 0 { return err } - e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt) - err = e.batchCheckAndInsert(ctx, e.rows[0:e.curBatchCnt], e.addRecordLD) + e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt) + err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD) if err != nil { return err } - e.curBatchCnt = 0 return err } diff --git a/executor/write_test.go b/executor/write_test.go index 8d1e4d474fa83..0bf79e9d36545 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1845,8 +1845,9 @@ func (s *testSuite4) TestLoadData(c *C) { _, reachLimit, err := ld.InsertData(context.Background(), nil, nil) c.Assert(err, IsNil) c.Assert(reachLimit, IsFalse) - err = ld.CheckAndInsertOneBatch(context.Background()) + err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt()) c.Assert(err, IsNil) + ld.SetMaxRowsInBatch(20000) r := tk.MustQuery(selectSQL) r.Check(nil) @@ -2096,8 +2097,9 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) { _, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n")) c.Assert(err, IsNil) - err = ld.CheckAndInsertOneBatch(context.Background()) + err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt()) c.Assert(err, IsNil) + ld.SetMaxRowsInBatch(20000) ld.SetMessage() err = ctx.StmtCommit() c.Assert(err, IsNil) diff --git a/server/conn.go b/server/conn.go index 9efd534b8bd82..735d6c1ca0b9e 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1044,9 +1044,8 @@ func (cc *clientConn) writeReq(filePath string) error { return cc.flush() } -var defaultLoadDataBatchCnt uint64 = 20000 - -func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDataInfo *executor.LoadDataInfo) ([]byte, error) { +func insertDataWithCommit(ctx context.Context, prevData, + curData []byte, loadDataInfo *executor.LoadDataInfo) ([]byte, error) { var err error var reachLimit bool for { @@ -1057,16 +1056,10 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat if !reachLimit { break } - err := loadDataInfo.CheckAndInsertOneBatch(ctx) + // push into commit task queue + err = loadDataInfo.EnqOneTask(ctx) if err != nil { - return nil, err - } - if err = loadDataInfo.Ctx.StmtCommit(); err != nil { - return nil, err - } - // Make sure that there are no retries when committing. - if err = loadDataInfo.Ctx.RefreshTxnCtx(ctx); err != nil { - return nil, err + return prevData, err } curData = prevData prevData = nil @@ -1074,30 +1067,24 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat return prevData, nil } -// handleLoadData does the additional work after processing the 'load data' query. -// It sends client a file path, then reads the file content from client, inserts data into database. -func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error { - // If the server handles the load data request, the client has to set the ClientLocalFiles capability. - if cc.capability&mysql.ClientLocalFiles == 0 { - return errNotAllowedCommand - } - if loadDataInfo == nil { - return errors.New("load data info is empty") - } - - err := cc.writeReq(loadDataInfo.Path) - if err != nil { - return err - } - +// processStream process input stream from network +func processStream(ctx context.Context, cc *clientConn, loadDataInfo *executor.LoadDataInfo) { + var err error var shouldBreak bool var prevData, curData []byte - // TODO: Make the loadDataRowCnt settable. - loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt) - err = loadDataInfo.Ctx.NewTxn(ctx) - if err != nil { - return err - } + defer func() { + r := recover() + if r != nil { + logutil.Logger(ctx).Error("process routine panicked", + zap.Reflect("r", r), + zap.Stack("stack")) + } + if err != nil || r != nil { + loadDataInfo.ForceQuit() + } else { + loadDataInfo.CloseTaskQueue() + } + }() for { curData, err = cc.readPacket() if err != nil { @@ -1112,6 +1099,15 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor break } } + select { + case <-loadDataInfo.QuitCh: + err = errors.New("processStream forced to quit") + default: + } + if err != nil { + break + } + // prepare batch and enqueue task prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo) if err != nil { break @@ -1120,16 +1116,43 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor break } } - loadDataInfo.SetMessage() - if err != nil { - loadDataInfo.Ctx.StmtRollback() + logutil.Logger(ctx).Error("load data process stream error", zap.Error(err)) } else { - err = loadDataInfo.CheckAndInsertOneBatch(ctx) - if err == nil { - err = loadDataInfo.Ctx.StmtCommit() + err = loadDataInfo.EnqOneTask(ctx) + if err != nil { + logutil.Logger(ctx).Error("load data process stream error", zap.Error(err)) } } +} + +// handleLoadData does the additional work after processing the 'load data' query. +// It sends client a file path, then reads the file content from client, inserts data into database. +func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error { + // If the server handles the load data request, the client has to set the ClientLocalFiles capability. + if cc.capability&mysql.ClientLocalFiles == 0 { + return errNotAllowedCommand + } + if loadDataInfo == nil { + return errors.New("load data info is empty") + } + + err := cc.writeReq(loadDataInfo.Path) + if err != nil { + return err + } + + loadDataInfo.InitQueues() + loadDataInfo.SetMaxRowsInBatch(uint64(loadDataInfo.Ctx.GetSessionVars().DMLBatchSize)) + loadDataInfo.StartStopWatcher() + err = loadDataInfo.Ctx.NewTxn(ctx) + if err != nil { + return err + } + // processStream process input data, enqueue commit task + go processStream(ctx, cc, loadDataInfo) + err = loadDataInfo.CommitWork(ctx) + loadDataInfo.SetMessage() var txn kv.Transaction var err1 error diff --git a/server/server_test.go b/server/server_test.go index c7ef21ae1a3b9..a521865e35e50 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -376,6 +376,7 @@ func runTestLoadData(c *C, server *Server) { config.AllowAllFiles = true config.Strict = false }, "LoadData", func(dbt *DBTest) { + dbt.mustExec("set @@tidb_dml_batch_size = 3") dbt.mustExec("create table test (a varchar(255), b varchar(255) default 'default value', c int not null auto_increment, primary key(c))") rs, err1 := dbt.db.Exec("load data local infile '/tmp/load_data_test.csv' into table test") dbt.Assert(err1, IsNil) @@ -423,6 +424,7 @@ func runTestLoadData(c *C, server *Server) { // specify faileds and lines dbt.mustExec("delete from test") + dbt.mustExec("set @@tidb_dml_batch_size = 3") rs, err = dbt.db.Exec("load data local infile '/tmp/load_data_test.csv' into table test fields terminated by '\t- ' lines starting by 'xxx ' terminated by '\n'") dbt.Assert(err, IsNil) lastID, err = rs.LastInsertId() @@ -462,6 +464,7 @@ func runTestLoadData(c *C, server *Server) { _, err = fp.WriteString(fmt.Sprintf("xxx row%d_col1 - row%d_col2\n", i, i)) dbt.Assert(err, IsNil) } + dbt.mustExec("set @@tidb_dml_batch_size = 3") rs, err = dbt.db.Exec("load data local infile '/tmp/load_data_test.csv' into table test fields terminated by '\t- ' lines starting by 'xxx ' terminated by '\n'") dbt.Assert(err, IsNil) lastID, err = rs.LastInsertId() @@ -474,10 +477,12 @@ func runTestLoadData(c *C, server *Server) { dbt.Check(rows.Next(), IsTrue, Commentf("unexpected data")) // don't support lines terminated is "" + dbt.mustExec("set @@tidb_dml_batch_size = 3") _, err = dbt.db.Exec("load data local infile '/tmp/load_data_test.csv' into table test lines terminated by ''") dbt.Assert(err, NotNil) // infile doesn't exist + dbt.mustExec("set @@tidb_dml_batch_size = 3") _, err = dbt.db.Exec("load data local infile '/tmp/nonexistence.csv' into table test") dbt.Assert(err, NotNil) }) @@ -503,6 +508,7 @@ func runTestLoadData(c *C, server *Server) { config.Strict = false }, "LoadData", func(dbt *DBTest) { dbt.mustExec("create table test (str varchar(10) default null, i int default null)") + dbt.mustExec("set @@tidb_dml_batch_size = 3") _, err1 := dbt.db.Exec(`load data local infile '/tmp/load_data_test.csv' into table test FIELDS TERMINATED BY ',' enclosed by '"'`) dbt.Assert(err1, IsNil) var ( @@ -548,6 +554,7 @@ func runTestLoadData(c *C, server *Server) { config.Strict = false }, "LoadData", func(dbt *DBTest) { dbt.mustExec("create table test (a date, b date, c date not null, d date)") + dbt.mustExec("set @@tidb_dml_batch_size = 3") _, err1 := dbt.db.Exec(`load data local infile '/tmp/load_data_test.csv' into table test FIELDS TERMINATED BY ','`) dbt.Assert(err1, IsNil) var ( @@ -601,6 +608,7 @@ func runTestLoadData(c *C, server *Server) { config.Strict = false }, "LoadData", func(dbt *DBTest) { dbt.mustExec("create table test (a varchar(20), b varchar(20))") + dbt.mustExec("set @@tidb_dml_batch_size = 3") _, err1 := dbt.db.Exec(`load data local infile '/tmp/load_data_test.csv' into table test FIELDS TERMINATED BY ',' enclosed by '"'`) dbt.Assert(err1, IsNil) var ( @@ -645,6 +653,7 @@ func runTestLoadData(c *C, server *Server) { config.Strict = false }, "LoadData", func(dbt *DBTest) { dbt.mustExec("create table test (id INT NOT NULL PRIMARY KEY, b INT, c varchar(10))") + dbt.mustExec("set @@tidb_dml_batch_size = 3") _, err1 := dbt.db.Exec(`load data local infile '/tmp/load_data_test.csv' into table test FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' IGNORE 1 LINES`) dbt.Assert(err1, IsNil) var ( @@ -669,6 +678,7 @@ func runTestLoadData(c *C, server *Server) { config.AllowAllFiles = true }, "LoadData", func(dbt *DBTest) { dbt.mustExec("create table test (a varchar(255), b varchar(255) default 'default value', c int not null auto_increment, primary key(c))") + dbt.mustExec("set @@tidb_dml_batch_size = 3") _, err = dbt.db.Exec("load data local infile '/tmp/load_data_test.csv' into table test") dbt.Assert(err, NotNil) checkErrorCode(c, err, tmysql.ErrNotAllowedCommand) diff --git a/server/tidb_test.go b/server/tidb_test.go index cf7677c46473c..26c16c46a4d0c 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -72,7 +72,6 @@ func (ts *TidbTestSuite) SetUpSuite(c *C) { // Run this test here because parallel would affect the result of it. runTestStmtCount(c) - defaultLoadDataBatchCnt = 3 } func (ts *TidbTestSuite) TearDownSuite(c *C) {