Skip to content

Commit

Permalink
refactor the logic of load data batch insert, make batchCheckGet happ…
Browse files Browse the repository at this point in the history
…en once per transaction
  • Loading branch information
cfzjywxk committed Jul 11, 2019
1 parent 9385c6e commit 6bb8696
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
2 changes: 2 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2)
c.Assert(err1, IsNil)
c.Assert(reachLimit, IsFalse)
err1 = ld.CheckAndInsertOneBatch()
c.Assert(err1, IsNil)
if tt.restData == nil {
c.Assert(data, HasLen, 0,
Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data)))
Expand Down
23 changes: 17 additions & 6 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,13 @@ type LoadDataInfo struct {
LinesInfo *ast.LinesClause
IgnoreLines uint64
Ctx sessionctx.Context
rows [][]types.Datum
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.maxRowsInBatch = limit
e.rows = make([][]types.Datum, 0, limit)
}

// getValidData returns prevData and curData that starts from starting symbol.
Expand Down Expand Up @@ -223,7 +225,6 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
isEOF = true
prevData, curData = curData, prevData
}
rows := make([][]types.Datum, 0, e.maxRowsInBatch)
for len(curData) > 0 {
line, curData, hasStarting = e.getLine(prevData, curData)
prevData = nil
Expand Down Expand Up @@ -252,7 +253,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
if err != nil {
return nil, false, err
}
rows = append(rows, e.colsToRow(ctx, cols))
e.rows = append(e.rows, e.colsToRow(ctx, cols))
e.rowCount++
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
reachLimit = true
Expand All @@ -261,12 +262,22 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
break
}
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(rows)))
err := e.batchCheckAndInsert(rows, e.addRecordLD)
return curData, reachLimit, nil
}

// CheckAndInsertOneBatch is used to commit one transaction batch full filled data
func (e *LoadDataInfo) CheckAndInsertOneBatch() error {
var err error
if len(e.rows) == 0 {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(e.rows)))
err = e.batchCheckAndInsert(e.rows, e.addRecordLD)
if err != nil {
return nil, reachLimit, err
return err
}
return curData, reachLimit, nil
e.rows = e.rows[:0]
return err
}

// SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that
Expand Down
4 changes: 4 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1807,6 +1807,8 @@ func (s *testSuite4) TestLoadData(c *C) {
_, reachLimit, err := ld.InsertData(context.Background(), nil, nil)
c.Assert(err, IsNil)
c.Assert(reachLimit, IsFalse)
err = ld.CheckAndInsertOneBatch()
c.Assert(err, IsNil)
r := tk.MustQuery(selectSQL)
r.Check(nil)

Expand Down Expand Up @@ -2056,6 +2058,8 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) {

_, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n"))
c.Assert(err, IsNil)
err = ld.CheckAndInsertOneBatch()
c.Assert(err, IsNil)
ld.SetMessage()
err = ctx.StmtCommit()
c.Assert(err, IsNil)
Expand Down
9 changes: 8 additions & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,10 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat
if !reachLimit {
break
}
err := loadDataInfo.CheckAndInsertOneBatch()
if err != nil {
return nil, err
}
if err = loadDataInfo.Ctx.StmtCommit(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1113,7 +1117,10 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
if err != nil {
loadDataInfo.Ctx.StmtRollback()
} else {
err = loadDataInfo.Ctx.StmtCommit()
err = loadDataInfo.CheckAndInsertOneBatch()
if err == nil {
err = loadDataInfo.Ctx.StmtCommit()
}
}

var txn kv.Transaction
Expand Down

0 comments on commit 6bb8696

Please sign in to comment.