Skip to content

Commit

Permalink
executor: tiny refactor the Executor interface (#10846) (#10876)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and coocood committed Jun 20, 2019
1 parent a9298ee commit 264b20c
Show file tree
Hide file tree
Showing 30 changed files with 181 additions and 203 deletions.
8 changes: 4 additions & 4 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
defer span1.Finish()
}

err := a.executor.Next(ctx, req)
err := Next(ctx, a.executor, req)
if err != nil {
a.lastErr = err
return err
Expand All @@ -126,7 +126,7 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {

// NewRecordBatch create a recordBatch base on top-level executor's newFirstChunk().
func (a *recordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(a.executor.newFirstChunk())
return chunk.NewRecordBatch(newFirstChunk(a.executor))
}

func (a *recordSet) Close() error {
Expand Down Expand Up @@ -307,7 +307,7 @@ func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) er
}

func (c *chunkRowRecordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(c.e.newFirstChunk())
return chunk.NewRecordBatch(newFirstChunk(c.e))
}

func (c *chunkRowRecordSet) Close() error {
Expand Down Expand Up @@ -385,7 +385,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
a.logAudit()
}()

err = e.Next(ctx, chunk.NewRecordBatch(e.newFirstChunk()))
err = Next(ctx, e, chunk.NewRecordBatch(newFirstChunk(e)))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
FieldType: *colTypeForHandle,
})

e.srcChunk = e.newFirstChunk()
e.srcChunk = newFirstChunk(e)
dagPB, err := e.buildDAGPB()
if err != nil {
return err
Expand Down
16 changes: 8 additions & 8 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (e *HashAggExec) initForUnparallelExec() {
e.partialResultMap = make(aggPartialResultMapper)
e.groupKeyBuffer = make([]byte, 0, 8)
e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer))
e.childResult = e.children[0].newFirstChunk()
e.childResult = newFirstChunk(e.children[0])
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
Expand Down Expand Up @@ -275,12 +275,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
partialResultsMap: make(aggPartialResultMapper),
groupByItems: e.GroupByItems,
groupValDatums: make([]types.Datum, 0, len(e.GroupByItems)),
chk: e.children[0].newFirstChunk(),
chk: newFirstChunk(e.children[0]),
}

e.partialWorkers[i] = w
e.inputCh <- &HashAggInput{
chk: e.children[0].newFirstChunk(),
chk: newFirstChunk(e.children[0]),
giveBackCh: w.inputCh,
}
}
Expand All @@ -295,7 +295,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
outputCh: e.finalOutputCh,
finalResultHolderCh: e.finalInputCh,
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(e.retTypes()),
mutableRow: chunk.MutRowFromTypes(retTypes(e)),
}
}
}
Expand Down Expand Up @@ -555,7 +555,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context) {
}
chk = input.chk
}
err = e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
err = Next(ctx, e.children[0], chunk.NewRecordBatch(chk))
if err != nil {
e.finalOutputCh <- &AfFinalResult{err: err}
return
Expand Down Expand Up @@ -681,7 +681,7 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childResult)
for {
err := e.children[0].Next(ctx, chunk.NewRecordBatch(e.childResult))
err := Next(ctx, e.children[0], chunk.NewRecordBatch(e.childResult))
if err != nil {
return err
}
Expand Down Expand Up @@ -772,7 +772,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
e.childResult = e.children[0].newFirstChunk()
e.childResult = newFirstChunk(e.children[0])
e.executed = false
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
Expand Down Expand Up @@ -870,7 +870,7 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
return err
}

err = e.children[0].Next(ctx, chunk.NewRecordBatch(e.childResult))
err = Next(ctx, e.children[0], chunk.NewRecordBatch(e.childResult))
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,20 @@ func (mds *mockDataSource) Next(ctx context.Context, req *chunk.RecordBatch) err
func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
baseExec := newBaseExecutor(opt.ctx, opt.schema, nil)
m := &mockDataSource{baseExec, opt, nil, nil, 0}
types := m.retTypes()
types := retTypes(m)
colData := make([][]interface{}, len(types))
for i := 0; i < len(types); i++ {
colData[i] = m.genColDatums(i)
}

m.genData = make([]*chunk.Chunk, (m.p.rows+m.initCap-1)/m.initCap)
for i := range m.genData {
m.genData[i] = chunk.NewChunkWithCapacity(m.retTypes(), m.ctx.GetSessionVars().MaxChunkSize)
m.genData[i] = chunk.NewChunkWithCapacity(retTypes(m), m.ctx.GetSessionVars().MaxChunkSize)
}

for i := 0; i < m.p.rows; i++ {
idx := i / m.maxChunkSize
retTypes := m.retTypes()
retTypes := retTypes(m)
for colIdx := 0; colIdx < len(types); colIdx++ {
switch retTypes[colIdx].Tp {
case mysql.TypeLong, mysql.TypeLonglong:
Expand Down Expand Up @@ -259,7 +259,7 @@ func benchmarkAggExecWithCase(b *testing.B, casTest *aggTestCase) {
b.StopTimer() // prepare a new agg-executor
aggExec := buildAggExecutor(b, casTest, dataSource)
tmpCtx := context.Background()
chk := aggExec.newFirstChunk()
chk := newFirstChunk(aggExec)
dataSource.prepareChunks()

b.StartTimer()
Expand Down
20 changes: 10 additions & 10 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,8 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
v.JoinType == plannercore.RightOuterJoin,
defaultValues,
v.OtherConditions,
leftExec.retTypes(),
rightExec.retTypes(),
retTypes(leftExec),
retTypes(rightExec),
),
isOuterJoin: v.JoinType.IsOuterJoin(),
}
Expand Down Expand Up @@ -946,7 +946,7 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
}

defaultValues := v.DefaultValues
lhsTypes, rhsTypes := leftExec.retTypes(), rightExec.retTypes()
lhsTypes, rhsTypes := retTypes(leftExec), retTypes(rightExec)
if v.InnerChildIdx == 0 {
if len(v.LeftConditions) > 0 {
b.err = errors.Annotate(ErrBuildExecutor, "join's inner condition should be empty")
Expand Down Expand Up @@ -1020,7 +1020,7 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(e.retTypes(), 1)
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
for _, aggDesc := range v.AggFuncs {
if aggDesc.HasDistinct {
Expand Down Expand Up @@ -1079,7 +1079,7 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu
if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(e.retTypes(), 1)
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
for i, aggDesc := range v.AggFuncs {
aggFunc := aggfuncs.Build(b.ctx, aggDesc, i)
Expand Down Expand Up @@ -1220,7 +1220,7 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) *NestedLoopAp
defaultValues = make([]types.Datum, v.Children()[v.InnerChildIdx].Schema().Len())
}
tupleJoiner := newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0,
defaultValues, otherConditions, leftChild.retTypes(), rightChild.retTypes())
defaultValues, otherConditions, retTypes(leftChild), retTypes(rightChild))
outerExec, innerExec := leftChild, rightChild
outerFilter, innerFilter := v.LeftConditions, v.RightConditions
if v.InnerChildIdx == 0 {
Expand Down Expand Up @@ -1703,7 +1703,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
if b.err != nil {
return nil
}
outerTypes := outerExec.retTypes()
outerTypes := retTypes(outerExec)
innerPlan := v.Children()[1-v.OuterIndex]
innerTypes := make([]*types.FieldType, innerPlan.Schema().Len())
for i, col := range innerPlan.Schema().Columns {
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
innerKeyCols[i] = v.InnerJoinKeys[i].Index
}
e.innerCtx.keyCols = innerKeyCols
e.joinResult = e.newFirstChunk()
e.joinResult = newFirstChunk(e)
executorCounterIndexLookUpJoin.Inc()
return e
}
Expand Down Expand Up @@ -2015,7 +2015,7 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context
return nil, err
}
us := e.(*UnionScanExec)
us.snapshotChunkBuffer = us.newFirstChunk()
us.snapshotChunkBuffer = newFirstChunk(us)
return us, nil
}

Expand Down Expand Up @@ -2050,7 +2050,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
return nil, err
}
e.resultHandler = &tableResultHandler{}
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
// If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode.
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn()
batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
for {
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
err := Next(ctx, e.children[0], chunk.NewRecordBatch(chk))
if err != nil {
return err
}
Expand Down Expand Up @@ -183,11 +183,11 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
e.initialMultiTableTblMap()
colPosInfos := e.getColPosInfos(e.children[0].Schema())
tblRowMap := make(tableRowMapType)
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
for {
iter := chunk.NewIterator4Chunk(chk)
err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
err := Next(ctx, e.children[0], chunk.NewRecordBatch(chk))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.feedback.Invalidate()
return err
}
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
if err != nil {
e.feedback.Invalidate()
return err
Expand Down Expand Up @@ -794,7 +794,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := tableReader.newFirstChunk()
chk := newFirstChunk(tableReader)
err = tableReader.Next(ctx, chunk.NewRecordBatch(chk))
if err != nil {
logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err))
Expand Down
2 changes: 2 additions & 0 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
ErrWrongObject = terror.ClassExecutor.New(mysql.ErrWrongObject, mysql.MySQLErrName[mysql.ErrWrongObject])
ErrRoleNotGranted = terror.ClassPrivilege.New(mysql.ErrRoleNotGranted, mysql.MySQLErrName[mysql.ErrRoleNotGranted])
ErrDeadlock = terror.ClassExecutor.New(mysql.ErrLockDeadlock, mysql.MySQLErrName[mysql.ErrLockDeadlock])
ErrQueryInterrupted = terror.ClassExecutor.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted])
)

func init() {
Expand All @@ -69,6 +70,7 @@ func init() {
mysql.ErrBadDB: mysql.ErrBadDB,
mysql.ErrWrongObject: mysql.ErrWrongObject,
mysql.ErrLockDeadlock: mysql.ErrLockDeadlock,
mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted,
}
terror.ErrClassToMySQLCodes[terror.ClassExecutor] = tableMySQLErrCodes
}
Loading

0 comments on commit 264b20c

Please sign in to comment.