Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: load data statement, separate data preparing routine and commit routine #11533

Merged
merged 9 commits into from
Sep 16, 2019
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,8 +734,8 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
Ctx: b.ctx,
},
}

var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr
loadDataExec.loadDataInfo.InitQueues()
loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)

return loadDataExec
Expand Down
3 changes: 2 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,9 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2)
c.Assert(err1, IsNil)
c.Assert(reachLimit, IsFalse)
err1 = ld.CheckAndInsertOneBatch(context.Background())
err1 = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err1, IsNil)
ld.SetMaxRowsInBatch(20000)
if tt.restData == nil {
c.Assert(data, HasLen, 0,
Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data)))
Expand Down
160 changes: 148 additions & 12 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
)

var (
null = []byte("NULL")
null = []byte("NULL")
taskQueueSize = 64 // the maximum number of pending tasks to commit in queue
)

// LoadDataExec represents a load data executor.
Expand Down Expand Up @@ -101,6 +102,12 @@ func (e *LoadDataExec) Open(ctx context.Context) error {
return nil
}

// CommitTask is used for fetching data from data preparing routine into committing routine.
type CommitTask struct {
cnt uint64
rows [][]types.Datum
}

// LoadDataInfo saves the information of loading data operation.
type LoadDataInfo struct {
*InsertValues
Expand All @@ -113,17 +120,147 @@ type LoadDataInfo struct {
IgnoreLines uint64
Ctx sessionctx.Context
rows [][]types.Datum

// these fields are used for pipeline data prepare and commit
commitTaskQueue chan CommitTask
QuitCommit chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just use a single quit channel ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use two to let process and commit both quit immediately when error occurs in one of them, commit will close QuitProcess and process will close QuitCommit .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do it by only using one signal channel.

QuitProcess chan struct{}
}

// GetRows getter for rows
func (e *LoadDataInfo) GetRows() [][]types.Datum {
return e.rows
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
}

// GetCurBatchCnt getter for curBatchCnt
func (e *LoadDataInfo) GetCurBatchCnt() uint64 {
return e.curBatchCnt
}

// CloseTaskQueue preparing routine to inform commit routine no more data
func (e *LoadDataInfo) CloseTaskQueue() {
close(e.commitTaskQueue)
}

// InitQueues initialize task queue and error report queue
func (e *LoadDataInfo) InitQueues() {
e.commitTaskQueue = make(chan CommitTask, taskQueueSize)
e.QuitCommit = make(chan struct{})
e.QuitProcess = make(chan struct{})
}

// ForceQuitCommit let commit quit directly
func (e *LoadDataInfo) ForceQuitCommit() {
close(e.QuitCommit)
}

// ForceQuitProcess let process quit directly
func (e *LoadDataInfo) ForceQuitProcess() {
close(e.QuitProcess)
}

// MakeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt
func (e *LoadDataInfo) MakeCommitTask() CommitTask {
return CommitTask{e.curBatchCnt, e.rows}
}

// EnqOneTask feed one batch commit task to commit work
func (e *LoadDataInfo) EnqOneTask(ctx context.Context) error {
var err error
if e.curBatchCnt > 0 {
sendOk := false
for !sendOk {
select {
case e.commitTaskQueue <- e.MakeCommitTask():
sendOk = true
case <-e.QuitProcess:
err = errors.New("EnqOneTask forced to quit")
logutil.Logger(ctx).Error("EnqOneTask forced to quit, possible commitWork error")
return err
}
}
// reset rows buffer, will reallocate buffer but NOT reuse
e.SetMaxRowsInBatch(e.maxRowsInBatch)
}
return err
lysu marked this conversation as resolved.
Show resolved Hide resolved
}

// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn
func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error {
var err error
defer func() {
if err != nil {
e.Ctx.StmtRollback()
}
}()
err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt)
if err != nil {
logutil.Logger(ctx).Error("commit error CheckAndInsert", zap.Error(err))
return err
}
if err = e.Ctx.StmtCommit(); err != nil {
logutil.Logger(ctx).Error("commit error commit", zap.Error(err))
return err
}
// Make sure that there are no retries when committing.
if err = e.Ctx.RefreshTxnCtx(ctx); err != nil {
logutil.Logger(ctx).Error("commit error refresh", zap.Error(err))
return err
}
return err
}

// CommitWork commit batch sequentially
func (e *LoadDataInfo) CommitWork(ctx context.Context) error {
var err error
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("CommitWork panicked",
zap.Stack("stack"))
}
if err != nil || r != nil {
e.ForceQuitProcess()
}
if err != nil {
e.ctx.StmtRollback()
}
}()
var tasks uint64 = 0
var end = false
for !end {
select {
case <-e.QuitCommit:
err = errors.New("commit forced to quit")
logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed")
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
break
case commitTask, ok := <-e.commitTaskQueue:
if ok {
err = e.CommitOneTask(ctx, commitTask)
if err != nil {
break
lysu marked this conversation as resolved.
Show resolved Hide resolved
}
tasks++
} else {
end = true
break
}
}
if err != nil {
break
}
}
return err
lysu marked this conversation as resolved.
Show resolved Hide resolved
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.maxRowsInBatch = limit
if uint64(cap(e.rows)) < limit {
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.curBatchCnt = 0
}

// getValidData returns prevData and curData that starts from starting symbol.
Expand Down Expand Up @@ -269,7 +406,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
e.curBatchCnt++
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
reachLimit = true
logutil.BgLogger().Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
logutil.Logger(ctx).Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
zap.Uint64("totalRows", e.rowCount))
break
}
Expand All @@ -278,17 +415,16 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
}

// CheckAndInsertOneBatch is used to commit one transaction batch full filled data
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context) error {
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error {
var err error
if e.curBatchCnt == 0 {
if cnt == 0 {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt)
err = e.batchCheckAndInsert(ctx, e.rows[0:e.curBatchCnt], e.addRecordLD)
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt)
err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD)
if err != nil {
return err
}
e.curBatchCnt = 0
return err
}

Expand Down
6 changes: 4 additions & 2 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,8 +1845,9 @@ func (s *testSuite4) TestLoadData(c *C) {
_, reachLimit, err := ld.InsertData(context.Background(), nil, nil)
c.Assert(err, IsNil)
c.Assert(reachLimit, IsFalse)
err = ld.CheckAndInsertOneBatch(context.Background())
err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
r := tk.MustQuery(selectSQL)
r.Check(nil)

Expand Down Expand Up @@ -2096,8 +2097,9 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) {

_, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n"))
c.Assert(err, IsNil)
err = ld.CheckAndInsertOneBatch(context.Background())
err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
ld.SetMessage()
err = ctx.StmtCommit()
c.Assert(err, IsNil)
Expand Down
100 changes: 60 additions & 40 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,9 +1043,8 @@ func (cc *clientConn) writeReq(filePath string) error {
return cc.flush()
}

var defaultLoadDataBatchCnt uint64 = 20000

func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDataInfo *executor.LoadDataInfo) ([]byte, error) {
func insertDataWithCommit(ctx context.Context, prevData,
curData []byte, loadDataInfo *executor.LoadDataInfo) ([]byte, error) {
var err error
var reachLimit bool
for {
Expand All @@ -1056,47 +1055,33 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat
if !reachLimit {
break
}
err := loadDataInfo.CheckAndInsertOneBatch(ctx)
// push into commit task queue
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
return nil, err
}
if err = loadDataInfo.Ctx.StmtCommit(); err != nil {
return nil, err
}
// Make sure that there are no retries when committing.
if err = loadDataInfo.Ctx.RefreshTxnCtx(ctx); err != nil {
return nil, err
return prevData, err
}
curData = prevData
prevData = nil
}
return prevData, nil
}

// handleLoadData does the additional work after processing the 'load data' query.
// It sends client a file path, then reads the file content from client, inserts data into database.
func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error {
// If the server handles the load data request, the client has to set the ClientLocalFiles capability.
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if loadDataInfo == nil {
return errors.New("load data info is empty")
}

err := cc.writeReq(loadDataInfo.Path)
if err != nil {
return err
}

// processStream process input stream from network
func processStream(ctx context.Context, cc *clientConn, loadDataInfo *executor.LoadDataInfo) {
var err error
var shouldBreak bool
var prevData, curData []byte
// TODO: Make the loadDataRowCnt settable.
loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)
err = loadDataInfo.Ctx.NewTxn(ctx)
if err != nil {
return err
}
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("process routine panicked", zap.Stack("stack"))
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
}
if err != nil || r != nil {
loadDataInfo.ForceQuitCommit()
} else {
loadDataInfo.CloseTaskQueue()
}
}()
for {
curData, err = cc.readPacket()
if err != nil {
Expand All @@ -1111,6 +1096,15 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
break
}
}
select {
case <-loadDataInfo.QuitProcess:
err = errors.New("processStream forced to quit")
default:
}
if err != nil {
break
}
// prepare batch and enqueue task
prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo)
if err != nil {
break
Expand All @@ -1119,16 +1113,42 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
break
}
}
loadDataInfo.SetMessage()

if err != nil {
loadDataInfo.Ctx.StmtRollback()
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
} else {
err = loadDataInfo.CheckAndInsertOneBatch(ctx)
if err == nil {
err = loadDataInfo.Ctx.StmtCommit()
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
}
}
}

// handleLoadData does the additional work after processing the 'load data' query.
// It sends client a file path, then reads the file content from client, inserts data into database.
func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
// If the server handles the load data request, the client has to set the ClientLocalFiles capability.
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if loadDataInfo == nil {
return errors.New("load data info is empty")
}

err := cc.writeReq(loadDataInfo.Path)
if err != nil {
return err
}

loadDataInfo.InitQueues()
loadDataInfo.SetMaxRowsInBatch(uint64(loadDataInfo.Ctx.GetSessionVars().DMLBatchSize))
err = loadDataInfo.Ctx.NewTxn(ctx)
if err != nil {
return err
}
// processStream process input data, enqueue commit task
go processStream(ctx, cc, loadDataInfo)
err = loadDataInfo.CommitWork(ctx)
loadDataInfo.SetMessage()

var txn kv.Transaction
var err1 error
Expand Down
Loading