Skip to content

Commit

Permalink
executor: load data statement, separate data preparing routine and co…
Browse files Browse the repository at this point in the history
…mmit routine (#11533)
  • Loading branch information
cfzjywxk authored and jackysp committed Sep 16, 2019
1 parent 4e545cf commit d438e10
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 57 deletions.
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,8 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
Ctx: b.ctx,
},
}

var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr
loadDataExec.loadDataInfo.InitQueues()
loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)

return loadDataExec
Expand Down
3 changes: 2 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,9 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2)
c.Assert(err1, IsNil)
c.Assert(reachLimit, IsFalse)
err1 = ld.CheckAndInsertOneBatch(context.Background())
err1 = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err1, IsNil)
ld.SetMaxRowsInBatch(20000)
if tt.restData == nil {
c.Assert(data, HasLen, 0,
Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data)))
Expand Down
164 changes: 152 additions & 12 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
)

var (
null = []byte("NULL")
null = []byte("NULL")
taskQueueSize = 64 // the maximum number of pending tasks to commit in queue
)

// LoadDataExec represents a load data executor.
Expand Down Expand Up @@ -101,6 +102,12 @@ func (e *LoadDataExec) Open(ctx context.Context) error {
return nil
}

// CommitTask is used for fetching data from data preparing routine into committing routine.
type CommitTask struct {
cnt uint64
rows [][]types.Datum
}

// LoadDataInfo saves the information of loading data operation.
type LoadDataInfo struct {
*InsertValues
Expand All @@ -113,17 +120,151 @@ type LoadDataInfo struct {
IgnoreLines uint64
Ctx sessionctx.Context
rows [][]types.Datum

commitTaskQueue chan CommitTask
StopCh chan struct{}
QuitCh chan struct{}
}

// GetRows getter for rows
func (e *LoadDataInfo) GetRows() [][]types.Datum {
return e.rows
}

// GetCurBatchCnt getter for curBatchCnt
func (e *LoadDataInfo) GetCurBatchCnt() uint64 {
return e.curBatchCnt
}

// CloseTaskQueue preparing routine to inform commit routine no more data
func (e *LoadDataInfo) CloseTaskQueue() {
close(e.commitTaskQueue)
}

// InitQueues initialize task queue and error report queue
func (e *LoadDataInfo) InitQueues() {
e.commitTaskQueue = make(chan CommitTask, taskQueueSize)
e.StopCh = make(chan struct{}, 2)
e.QuitCh = make(chan struct{})
}

// StartStopWatcher monitor StopCh to force quit
func (e *LoadDataInfo) StartStopWatcher() {
go func() {
<-e.StopCh
close(e.QuitCh)
}()
}

// ForceQuit let commit quit directly
func (e *LoadDataInfo) ForceQuit() {
e.StopCh <- struct{}{}
}

// MakeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt
func (e *LoadDataInfo) MakeCommitTask() CommitTask {
return CommitTask{e.curBatchCnt, e.rows}
}

// EnqOneTask feed one batch commit task to commit work
func (e *LoadDataInfo) EnqOneTask(ctx context.Context) error {
var err error
if e.curBatchCnt > 0 {
sendOk := false
for !sendOk {
select {
case e.commitTaskQueue <- e.MakeCommitTask():
sendOk = true
case <-e.QuitCh:
err = errors.New("EnqOneTask forced to quit")
logutil.Logger(ctx).Error("EnqOneTask forced to quit, possible commitWork error")
return err
}
}
// reset rows buffer, will reallocate buffer but NOT reuse
e.SetMaxRowsInBatch(e.maxRowsInBatch)
}
return err
}

// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn
func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error {
var err error
defer func() {
if err != nil {
e.Ctx.StmtRollback()
}
}()
err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt)
if err != nil {
logutil.Logger(ctx).Error("commit error CheckAndInsert", zap.Error(err))
return err
}
if err = e.Ctx.StmtCommit(); err != nil {
logutil.Logger(ctx).Error("commit error commit", zap.Error(err))
return err
}
// Make sure that there are no retries when committing.
if err = e.Ctx.RefreshTxnCtx(ctx); err != nil {
logutil.Logger(ctx).Error("commit error refresh", zap.Error(err))
return err
}
return err
}

// CommitWork commit batch sequentially
func (e *LoadDataInfo) CommitWork(ctx context.Context) error {
var err error
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("CommitWork panicked",
zap.Reflect("r", r),
zap.Stack("stack"))
}
if err != nil || r != nil {
e.ForceQuit()
}
if err != nil {
e.ctx.StmtRollback()
}
}()
var tasks uint64 = 0
var end = false
for !end {
select {
case <-e.QuitCh:
err = errors.New("commit forced to quit")
logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed")
break
case commitTask, ok := <-e.commitTaskQueue:
if ok {
err = e.CommitOneTask(ctx, commitTask)
if err != nil {
break
}
tasks++
} else {
end = true
break
}
}
if err != nil {
logutil.Logger(ctx).Error("load data commit work error", zap.Error(err))
break
}
}
return err
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.maxRowsInBatch = limit
if uint64(cap(e.rows)) < limit {
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.curBatchCnt = 0
}

// getValidData returns prevData and curData that starts from starting symbol.
Expand Down Expand Up @@ -271,7 +412,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
e.curBatchCnt++
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
reachLimit = true
logutil.BgLogger().Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
logutil.Logger(ctx).Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
zap.Uint64("totalRows", e.rowCount))
break
}
Expand All @@ -280,17 +421,16 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
}

// CheckAndInsertOneBatch is used to commit one transaction batch full filled data
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context) error {
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error {
var err error
if e.curBatchCnt == 0 {
if cnt == 0 {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt)
err = e.batchCheckAndInsert(ctx, e.rows[0:e.curBatchCnt], e.addRecordLD)
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt)
err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD)
if err != nil {
return err
}
e.curBatchCnt = 0
return err
}

Expand Down
6 changes: 4 additions & 2 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,8 +1845,9 @@ func (s *testSuite4) TestLoadData(c *C) {
_, reachLimit, err := ld.InsertData(context.Background(), nil, nil)
c.Assert(err, IsNil)
c.Assert(reachLimit, IsFalse)
err = ld.CheckAndInsertOneBatch(context.Background())
err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
r := tk.MustQuery(selectSQL)
r.Check(nil)

Expand Down Expand Up @@ -2096,8 +2097,9 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) {

_, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n"))
c.Assert(err, IsNil)
err = ld.CheckAndInsertOneBatch(context.Background())
err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
ld.SetMessage()
err = ctx.StmtCommit()
c.Assert(err, IsNil)
Expand Down
103 changes: 63 additions & 40 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,9 +1044,8 @@ func (cc *clientConn) writeReq(filePath string) error {
return cc.flush()
}

var defaultLoadDataBatchCnt uint64 = 20000

func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDataInfo *executor.LoadDataInfo) ([]byte, error) {
func insertDataWithCommit(ctx context.Context, prevData,
curData []byte, loadDataInfo *executor.LoadDataInfo) ([]byte, error) {
var err error
var reachLimit bool
for {
Expand All @@ -1057,47 +1056,35 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat
if !reachLimit {
break
}
err := loadDataInfo.CheckAndInsertOneBatch(ctx)
// push into commit task queue
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
return nil, err
}
if err = loadDataInfo.Ctx.StmtCommit(); err != nil {
return nil, err
}
// Make sure that there are no retries when committing.
if err = loadDataInfo.Ctx.RefreshTxnCtx(ctx); err != nil {
return nil, err
return prevData, err
}
curData = prevData
prevData = nil
}
return prevData, nil
}

// handleLoadData does the additional work after processing the 'load data' query.
// It sends client a file path, then reads the file content from client, inserts data into database.
func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error {
// If the server handles the load data request, the client has to set the ClientLocalFiles capability.
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if loadDataInfo == nil {
return errors.New("load data info is empty")
}

err := cc.writeReq(loadDataInfo.Path)
if err != nil {
return err
}

// processStream process input stream from network
func processStream(ctx context.Context, cc *clientConn, loadDataInfo *executor.LoadDataInfo) {
var err error
var shouldBreak bool
var prevData, curData []byte
// TODO: Make the loadDataRowCnt settable.
loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)
err = loadDataInfo.Ctx.NewTxn(ctx)
if err != nil {
return err
}
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("process routine panicked",
zap.Reflect("r", r),
zap.Stack("stack"))
}
if err != nil || r != nil {
loadDataInfo.ForceQuit()
} else {
loadDataInfo.CloseTaskQueue()
}
}()
for {
curData, err = cc.readPacket()
if err != nil {
Expand All @@ -1112,6 +1099,15 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
break
}
}
select {
case <-loadDataInfo.QuitCh:
err = errors.New("processStream forced to quit")
default:
}
if err != nil {
break
}
// prepare batch and enqueue task
prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo)
if err != nil {
break
Expand All @@ -1120,16 +1116,43 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
break
}
}
loadDataInfo.SetMessage()

if err != nil {
loadDataInfo.Ctx.StmtRollback()
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
} else {
err = loadDataInfo.CheckAndInsertOneBatch(ctx)
if err == nil {
err = loadDataInfo.Ctx.StmtCommit()
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
}
}
}

// handleLoadData does the additional work after processing the 'load data' query.
// It sends client a file path, then reads the file content from client, inserts data into database.
func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error {
// If the server handles the load data request, the client has to set the ClientLocalFiles capability.
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if loadDataInfo == nil {
return errors.New("load data info is empty")
}

err := cc.writeReq(loadDataInfo.Path)
if err != nil {
return err
}

loadDataInfo.InitQueues()
loadDataInfo.SetMaxRowsInBatch(uint64(loadDataInfo.Ctx.GetSessionVars().DMLBatchSize))
loadDataInfo.StartStopWatcher()
err = loadDataInfo.Ctx.NewTxn(ctx)
if err != nil {
return err
}
// processStream process input data, enqueue commit task
go processStream(ctx, cc, loadDataInfo)
err = loadDataInfo.CommitWork(ctx)
loadDataInfo.SetMessage()

var txn kv.Transaction
var err1 error
Expand Down
Loading

0 comments on commit d438e10

Please sign in to comment.