diff --git a/executor/builder.go b/executor/builder.go index fee88b522a325..e11e0e8fc2dda 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1622,7 +1622,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) filter: outerFilter, }, innerCtx: innerCtx{ - readerBuilder: &dataReaderBuilder{innerPlan, b}, + readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b}, rowTypes: innerTypes, }, workerWg: new(sync.WaitGroup), @@ -1851,6 +1851,8 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo type dataReaderBuilder struct { plannercore.Plan *executorBuilder + + selectResultHook // for testing } func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, datums [][]types.Datum, @@ -1892,7 +1894,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex return nil, errors.Trace(err) } e.resultHandler = &tableResultHandler{} - result, err := distsql.Select(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback) + result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback) if err != nil { return nil, errors.Trace(err) } @@ -1921,11 +1923,11 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context if err != nil { return nil, errors.Trace(err) } - kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff) + e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff) if err != nil { return nil, errors.Trace(err) } - err = e.open(ctx, kvRanges) + err = e.open(ctx) return e, errors.Trace(err) } diff --git a/executor/distsql.go b/executor/distsql.go index f11a1f89e20bb..3fd656e38c13d 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -233,6 +233,8 @@ type IndexReaderExecutor struct { idxCols []*expression.Column colLens []int plans []plannercore.PhysicalPlan + + selectResultHook // for testing } // Close clears all resources hold by current object. @@ -294,7 +296,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.feedback.Invalidate() return errors.Trace(err) } - e.result, err = distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) + e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) if err != nil { e.feedback.Invalidate() return errors.Trace(err) @@ -328,6 +330,9 @@ type IndexLookUpExecutor struct { tblWorkerWg sync.WaitGroup finished chan struct{} + kvRanges []kv.KeyRange + workerStarted bool + resultCh chan *lookupTableTask resultCurr *lookupTableTask feedback *statistics.QueryFeedback @@ -356,19 +361,19 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { return errors.Trace(err) } } - kvRanges, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback) + e.kvRanges, err = distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback) if err != nil { e.feedback.Invalidate() return errors.Trace(err) } - err = e.open(ctx, kvRanges) + err = e.open(ctx) if err != nil { e.feedback.Invalidate() } return errors.Trace(err) } -func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { +func (e *IndexLookUpExecutor) open(ctx context.Context) error { // We have to initialize "memTracker" and other execution resources in here // instead of in function "Open", because this "IndexLookUpExecutor" may be // constructed by a "IndexLookUpJoin" and "Open" will not be called in that @@ -393,20 +398,22 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) return errors.Trace(err) } } + return nil +} +func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error { // indexWorker will write to workCh and tableWorker will read from workCh, // so fetching index and getting table data can run concurrently. workCh := make(chan *lookupTableTask, 1) - err = e.startIndexWorker(ctx, kvRanges, workCh) - if err != nil { + if err := e.startIndexWorker(ctx, e.kvRanges, workCh, initBatchSize); err != nil { return errors.Trace(err) } e.startTableWorker(ctx, workCh) + e.workerStarted = true return nil } -// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh. -func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask) error { +func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, initBatchSize int) error { var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -425,11 +432,12 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k } result.Fetch(ctx) worker := &indexWorker{ + idxLookup: e, workCh: workCh, finished: e.finished, resultCh: e.resultCh, keepOrder: e.keepOrder, - batchSize: e.maxChunkSize, + batchSize: initBatchSize, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, } @@ -503,7 +511,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in // Close implements Exec Close interface. func (e *IndexLookUpExecutor) Close() error { - if e.finished == nil { + if !e.workerStarted || e.finished == nil { return nil } @@ -515,6 +523,7 @@ func (e *IndexLookUpExecutor) Close() error { e.idxWorkerWg.Wait() e.tblWorkerWg.Wait() e.finished = nil + e.workerStarted = false e.memTracker.Detach() e.memTracker = nil return nil @@ -526,6 +535,11 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error start := time.Now() defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } + if !e.workerStarted { + if err := e.startWorkers(ctx, chk.RequiredRows()); err != nil { + return errors.Trace(err) + } + } chk.Reset() for { resultTask, err := e.getResultTask() @@ -538,7 +552,7 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error for resultTask.cursor < len(resultTask.rows) { chk.AppendRow(resultTask.rows[resultTask.cursor]) resultTask.cursor++ - if chk.NumRows() >= e.maxChunkSize { + if chk.IsFull() { return nil } } @@ -567,6 +581,7 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) { // indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines. type indexWorker struct { + idxLookup *IndexLookUpExecutor workCh chan<- *lookupTableTask finished <-chan struct{} resultCh chan<- *lookupTableTask @@ -599,7 +614,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } } }() - chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.maxChunkSize) + chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) for { handles, err := w.extractTaskHandles(ctx, chk, result) if err != nil { @@ -628,6 +643,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (handles []int64, err error) { handles = make([]int64, 0, w.batchSize) for len(handles) < w.batchSize { + chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { return handles, err diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 7b51c6932a82e..b16ee95404c09 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -202,6 +202,7 @@ func defaultCtx() sessionctx.Context { ctx.GetSessionVars().MaxChunkSize = 1024 ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker("", ctx.GetSessionVars().MemQuotaQuery) + ctx.GetSessionVars().SnapshotTS = uint64(1) return ctx } diff --git a/executor/table_reader.go b/executor/table_reader.go index 9e3a94c9092dd..45aeef858860d 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -19,9 +19,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/ranger" tipb "github.com/pingcap/tipb/go-tipb" @@ -31,6 +34,20 @@ import ( // make sure `TableReaderExecutor` implements `Executor`. var _ Executor = &TableReaderExecutor{} +// selectResultHook is used to hack distsql.SelectWithRuntimeStats safely for testing. +type selectResultHook struct { + selectResultFunc func(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) +} + +func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) { + if sr.selectResultFunc == nil { + return distsql.Select(ctx, sctx, kvReq, fieldTypes, fb) + } + return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb) +} + // TableReaderExecutor sends DAG request and reads table data from kv layer. type TableReaderExecutor struct { baseExecutor @@ -55,6 +72,8 @@ type TableReaderExecutor struct { // corColInAccess tells whether there's correlated column in access conditions. corColInAccess bool plans []plannercore.PhysicalPlan + + selectResultHook // for testing } // Open initialzes necessary variables for using this executor. @@ -132,7 +151,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra if err != nil { return nil, errors.Trace(err) } - result, err := distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) + result, err := e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/table_readers_required_rows_test.go b/executor/table_readers_required_rows_test.go new file mode 100644 index 0000000000000..e1fe396056d77 --- /dev/null +++ b/executor/table_readers_required_rows_test.go @@ -0,0 +1,241 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "fmt" + "math/rand" + + "github.com/cznic/mathutil" + . "github.com/pingcap/check" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tipb/go-tipb" +) + +type requiredRowsSelectResult struct { + retTypes []*types.FieldType + totalRows int + count int + expectedRowsRet []int + numNextCalled int +} + +func (r *requiredRowsSelectResult) Fetch(context.Context) {} +func (r *requiredRowsSelectResult) NextRaw(context.Context) ([]byte, error) { return nil, nil } +func (r *requiredRowsSelectResult) Close() error { return nil } + +func (r *requiredRowsSelectResult) Next(ctx context.Context, chk *chunk.Chunk) error { + defer func() { + if r.numNextCalled >= len(r.expectedRowsRet) { + return + } + rowsRet := chk.NumRows() + expected := r.expectedRowsRet[r.numNextCalled] + if rowsRet != expected { + panic(fmt.Sprintf("unexpected number of rows returned, obtain: %v, expected: %v", rowsRet, expected)) + } + r.numNextCalled++ + }() + chk.Reset() + if r.count > r.totalRows { + return nil + } + required := mathutil.Min(chk.RequiredRows(), r.totalRows-r.count) + for i := 0; i < required; i++ { + chk.AppendRow(r.genOneRow()) + } + r.count += required + return nil +} + +func (r *requiredRowsSelectResult) genOneRow() chunk.Row { + row := chunk.MutRowFromTypes(r.retTypes) + for i := range r.retTypes { + row.SetValue(i, r.genValue(r.retTypes[i])) + } + return row.ToRow() +} + +func (r *requiredRowsSelectResult) genValue(valType *types.FieldType) interface{} { + switch valType.Tp { + case mysql.TypeLong, mysql.TypeLonglong: + return int64(rand.Int()) + case mysql.TypeDouble: + return rand.Float64() + default: + panic("not implement") + } +} + +func mockDistsqlSelectCtxSet(totalRows int, expectedRowsRet []int) context.Context { + ctx := context.Background() + ctx = context.WithValue(ctx, "totalRows", totalRows) + ctx = context.WithValue(ctx, "expectedRowsRet", expectedRowsRet) + return ctx +} + +func mockDistsqlSelectCtxGet(ctx context.Context) (totalRows int, expectedRowsRet []int) { + totalRows = ctx.Value("totalRows").(int) + expectedRowsRet = ctx.Value("expectedRowsRet").([]int) + return +} + +func mockSelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) { + totalRows, expectedRowsRet := mockDistsqlSelectCtxGet(ctx) + return &requiredRowsSelectResult{ + retTypes: fieldTypes, + totalRows: totalRows, + expectedRowsRet: expectedRowsRet, + }, nil +} + +func mockSelectResultWithoutCheck(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) { + return &requiredRowsSelectResult{retTypes: fieldTypes}, nil +} + +func buildTableReader(sctx sessionctx.Context) Executor { + e := &TableReaderExecutor{ + baseExecutor: buildMockBaseExec(sctx), + table: &tables.Table{}, + dagPB: buildMockDAGRequest(sctx), + selectResultHook: selectResultHook{mockSelectResult}, + } + return e +} + +func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest { + builder := newExecutorBuilder(sctx, nil) + req, _, err := builder.constructDAGReq(nil) + if err != nil { + panic(err) + } + return req +} + +func buildMockBaseExec(sctx sessionctx.Context) baseExecutor { + retTypes := []*types.FieldType{types.NewFieldType(mysql.TypeDouble), types.NewFieldType(mysql.TypeLonglong)} + cols := make([]*expression.Column, len(retTypes)) + for i := range retTypes { + cols[i] = &expression.Column{Index: i, RetType: retTypes[i]} + } + schema := expression.NewSchema(cols...) + baseExec := newBaseExecutor(sctx, schema, "") + return baseExec +} + +func (s *testExecSuite) TestTableReaderRequiredRows(c *C) { + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + requiredRows []int + expectedRows []int + expectedRowsDS []int + }{ + { + totalRows: 10, + requiredRows: []int{1, 5, 3, 10}, + expectedRows: []int{1, 5, 3, 1}, + expectedRowsDS: []int{1, 5, 3, 1}, + }, + { + totalRows: maxChunkSize + 1, + requiredRows: []int{1, 5, 3, 10, maxChunkSize}, + expectedRows: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10}, + expectedRowsDS: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10}, + }, + { + totalRows: 3*maxChunkSize + 1, + requiredRows: []int{3, 10, maxChunkSize}, + expectedRows: []int{3, 10, maxChunkSize}, + expectedRowsDS: []int{3, 10, maxChunkSize}, + }, + } + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := mockDistsqlSelectCtxSet(testCase.totalRows, testCase.expectedRowsDS) + exec := buildTableReader(sctx) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) + c.Assert(exec.Next(ctx, chk), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + } + c.Assert(exec.Close(), IsNil) + } +} + +func buildIndexReader(sctx sessionctx.Context) Executor { + e := &IndexReaderExecutor{ + baseExecutor: buildMockBaseExec(sctx), + dagPB: buildMockDAGRequest(sctx), + index: &model.IndexInfo{}, + selectResultHook: selectResultHook{mockSelectResult}, + } + return e +} + +func (s *testExecSuite) TestIndexReaderRequiredRows(c *C) { + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + requiredRows []int + expectedRows []int + expectedRowsDS []int + }{ + { + totalRows: 10, + requiredRows: []int{1, 5, 3, 10}, + expectedRows: []int{1, 5, 3, 1}, + expectedRowsDS: []int{1, 5, 3, 1}, + }, + { + totalRows: maxChunkSize + 1, + requiredRows: []int{1, 5, 3, 10, maxChunkSize}, + expectedRows: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10}, + expectedRowsDS: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10}, + }, + { + totalRows: 3*maxChunkSize + 1, + requiredRows: []int{3, 10, maxChunkSize}, + expectedRows: []int{3, 10, maxChunkSize}, + expectedRowsDS: []int{3, 10, maxChunkSize}, + }, + } + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := mockDistsqlSelectCtxSet(testCase.totalRows, testCase.expectedRowsDS) + exec := buildIndexReader(sctx) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) + c.Assert(exec.Next(ctx, chk), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + } + c.Assert(exec.Close(), IsNil) + } +}