Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: control Chunk size for TableReader&IndexReader&IndexLookup #10169

Merged
merged 5 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
filter: outerFilter,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{innerPlan, b},
readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b},
rowTypes: innerTypes,
},
workerWg: new(sync.WaitGroup),
Expand Down Expand Up @@ -1851,6 +1851,8 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
type dataReaderBuilder struct {
plannercore.Plan
*executorBuilder

selectResultHook // for testing
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, datums [][]types.Datum,
Expand Down Expand Up @@ -1892,7 +1894,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
return nil, errors.Trace(err)
}
e.resultHandler = &tableResultHandler{}
result, err := distsql.Select(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback)
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1921,11 +1923,11 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
if err != nil {
return nil, errors.Trace(err)
}
kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff)
e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff)
if err != nil {
return nil, errors.Trace(err)
}
err = e.open(ctx, kvRanges)
err = e.open(ctx)
return e, errors.Trace(err)
}

Expand Down
40 changes: 28 additions & 12 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ type IndexReaderExecutor struct {
idxCols []*expression.Column
colLens []int
plans []plannercore.PhysicalPlan

selectResultHook // for testing
}

// Close clears all resources hold by current object.
Expand Down Expand Up @@ -294,7 +296,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.feedback.Invalidate()
return errors.Trace(err)
}
e.result, err = distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
if err != nil {
e.feedback.Invalidate()
return errors.Trace(err)
Expand Down Expand Up @@ -328,6 +330,9 @@ type IndexLookUpExecutor struct {
tblWorkerWg sync.WaitGroup
finished chan struct{}

kvRanges []kv.KeyRange
workerStarted bool

resultCh chan *lookupTableTask
resultCurr *lookupTableTask
feedback *statistics.QueryFeedback
Expand Down Expand Up @@ -356,19 +361,19 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
return errors.Trace(err)
}
}
kvRanges, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback)
e.kvRanges, err = distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback)
if err != nil {
e.feedback.Invalidate()
return errors.Trace(err)
}
err = e.open(ctx, kvRanges)
err = e.open(ctx)
if err != nil {
e.feedback.Invalidate()
}
return errors.Trace(err)
}

func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
func (e *IndexLookUpExecutor) open(ctx context.Context) error {
// We have to initialize "memTracker" and other execution resources in here
// instead of in function "Open", because this "IndexLookUpExecutor" may be
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
Expand All @@ -393,20 +398,22 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
return errors.Trace(err)
}
}
return nil
}

func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error {
// indexWorker will write to workCh and tableWorker will read from workCh,
// so fetching index and getting table data can run concurrently.
workCh := make(chan *lookupTableTask, 1)
err = e.startIndexWorker(ctx, kvRanges, workCh)
if err != nil {
if err := e.startIndexWorker(ctx, e.kvRanges, workCh, initBatchSize); err != nil {
return errors.Trace(err)
}
e.startTableWorker(ctx, workCh)
e.workerStarted = true
return nil
}

// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh.
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask) error {
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, initBatchSize int) error {
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
Expand All @@ -425,11 +432,12 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
}
result.Fetch(ctx)
worker := &indexWorker{
idxLookup: e,
workCh: workCh,
finished: e.finished,
resultCh: e.resultCh,
keepOrder: e.keepOrder,
batchSize: e.maxChunkSize,
batchSize: initBatchSize,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
maxChunkSize: e.maxChunkSize,
}
Expand Down Expand Up @@ -503,7 +511,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.finished == nil {
if !e.workerStarted || e.finished == nil {
return nil
}

Expand All @@ -515,6 +523,7 @@ func (e *IndexLookUpExecutor) Close() error {
e.idxWorkerWg.Wait()
e.tblWorkerWg.Wait()
e.finished = nil
e.workerStarted = false
e.memTracker.Detach()
e.memTracker = nil
return nil
Expand All @@ -526,6 +535,11 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if !e.workerStarted {
if err := e.startWorkers(ctx, chk.RequiredRows()); err != nil {
return errors.Trace(err)
}
}
chk.Reset()
for {
resultTask, err := e.getResultTask()
Expand All @@ -538,7 +552,7 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error
for resultTask.cursor < len(resultTask.rows) {
chk.AppendRow(resultTask.rows[resultTask.cursor])
resultTask.cursor++
if chk.NumRows() >= e.maxChunkSize {
if chk.IsFull() {
return nil
}
}
Expand Down Expand Up @@ -567,6 +581,7 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {

// indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines.
type indexWorker struct {
idxLookup *IndexLookUpExecutor
workCh chan<- *lookupTableTask
finished <-chan struct{}
resultCh chan<- *lookupTableTask
Expand Down Expand Up @@ -599,7 +614,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
}
}()
chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.maxChunkSize)
chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize)
for {
handles, err := w.extractTaskHandles(ctx, chk, result)
if err != nil {
Expand Down Expand Up @@ -628,6 +643,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (handles []int64, err error) {
handles = make([]int64, 0, w.batchSize)
for len(handles) < w.batchSize {
chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize)
err = errors.Trace(idxResult.Next(ctx, chk))
if err != nil {
return handles, err
Expand Down
1 change: 1 addition & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func defaultCtx() sessionctx.Context {
ctx.GetSessionVars().MaxChunkSize = 1024
ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker("", ctx.GetSessionVars().MemQuotaQuery)
ctx.GetSessionVars().SnapshotTS = uint64(1)
return ctx
}

Expand Down
21 changes: 20 additions & 1 deletion executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/ranger"
tipb "github.com/pingcap/tipb/go-tipb"
Expand All @@ -31,6 +34,20 @@ import (
// make sure `TableReaderExecutor` implements `Executor`.
var _ Executor = &TableReaderExecutor{}

// selectResultHook is used to hack distsql.SelectWithRuntimeStats safely for testing.
type selectResultHook struct {
selectResultFunc func(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error)
}

func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) {
if sr.selectResultFunc == nil {
return distsql.Select(ctx, sctx, kvReq, fieldTypes, fb)
}
return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb)
}

// TableReaderExecutor sends DAG request and reads table data from kv layer.
type TableReaderExecutor struct {
baseExecutor
Expand All @@ -55,6 +72,8 @@ type TableReaderExecutor struct {
// corColInAccess tells whether there's correlated column in access conditions.
corColInAccess bool
plans []plannercore.PhysicalPlan

selectResultHook // for testing
}

// Open initialzes necessary variables for using this executor.
Expand Down Expand Up @@ -132,7 +151,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
if err != nil {
return nil, errors.Trace(err)
}
result, err := distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
result, err := e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading