From 6bb869649f70b2dac7dc5389dbcbe06a3d1f2bcb Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 8 Jul 2019 18:15:30 +0800 Subject: [PATCH] refactor the logic of load data batch insert, make batchCheckGet happen once per transaction --- executor/executor_test.go | 2 ++ executor/load_data.go | 23 +++++++++++++++++------ executor/write_test.go | 4 ++++ server/conn.go | 9 ++++++++- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 02b80cba46a6d..5783192fb6ee0 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -443,6 +443,8 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2) c.Assert(err1, IsNil) c.Assert(reachLimit, IsFalse) + err1 = ld.CheckAndInsertOneBatch() + c.Assert(err1, IsNil) if tt.restData == nil { c.Assert(data, HasLen, 0, Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data))) diff --git a/executor/load_data.go b/executor/load_data.go index df49b777cfc34..98ef7f3e62d59 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -106,11 +106,13 @@ type LoadDataInfo struct { LinesInfo *ast.LinesClause IgnoreLines uint64 Ctx sessionctx.Context + rows [][]types.Datum } // SetMaxRowsInBatch sets the max number of rows to insert in a batch. func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { e.maxRowsInBatch = limit + e.rows = make([][]types.Datum, 0, limit) } // getValidData returns prevData and curData that starts from starting symbol. @@ -223,7 +225,6 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) isEOF = true prevData, curData = curData, prevData } - rows := make([][]types.Datum, 0, e.maxRowsInBatch) for len(curData) > 0 { line, curData, hasStarting = e.getLine(prevData, curData) prevData = nil @@ -252,7 +253,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) if err != nil { return nil, false, err } - rows = append(rows, e.colsToRow(ctx, cols)) + e.rows = append(e.rows, e.colsToRow(ctx, cols)) e.rowCount++ if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 { reachLimit = true @@ -261,12 +262,22 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) break } } - e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(rows))) - err := e.batchCheckAndInsert(rows, e.addRecordLD) + return curData, reachLimit, nil +} + +// CheckAndInsertOneBatch is used to commit one transaction batch full filled data +func (e *LoadDataInfo) CheckAndInsertOneBatch() error { + var err error + if len(e.rows) == 0 { + return err + } + e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(e.rows))) + err = e.batchCheckAndInsert(e.rows, e.addRecordLD) if err != nil { - return nil, reachLimit, err + return err } - return curData, reachLimit, nil + e.rows = e.rows[:0] + return err } // SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that diff --git a/executor/write_test.go b/executor/write_test.go index a98f7729451b0..dfa1d936bbe86 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1807,6 +1807,8 @@ func (s *testSuite4) TestLoadData(c *C) { _, reachLimit, err := ld.InsertData(context.Background(), nil, nil) c.Assert(err, IsNil) c.Assert(reachLimit, IsFalse) + err = ld.CheckAndInsertOneBatch() + c.Assert(err, IsNil) r := tk.MustQuery(selectSQL) r.Check(nil) @@ -2056,6 +2058,8 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) { _, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n")) c.Assert(err, IsNil) + err = ld.CheckAndInsertOneBatch() + c.Assert(err, IsNil) ld.SetMessage() err = ctx.StmtCommit() c.Assert(err, IsNil) diff --git a/server/conn.go b/server/conn.go index c80116e01584c..391cd779f4da7 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1049,6 +1049,10 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat if !reachLimit { break } + err := loadDataInfo.CheckAndInsertOneBatch() + if err != nil { + return nil, err + } if err = loadDataInfo.Ctx.StmtCommit(); err != nil { return nil, err } @@ -1113,7 +1117,10 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor if err != nil { loadDataInfo.Ctx.StmtRollback() } else { - err = loadDataInfo.Ctx.StmtCommit() + err = loadDataInfo.CheckAndInsertOneBatch() + if err == nil { + err = loadDataInfo.Ctx.StmtCommit() + } } var txn kv.Transaction