Skip to content

Commit

Permalink
*: make explain support explain anaylze (#7827)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and jackysp committed Oct 12, 2018
1 parent 5efcacb commit d21f294
Show file tree
Hide file tree
Showing 19 changed files with 275 additions and 32 deletions.
5 changes: 3 additions & 2 deletions ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ func (n *TraceStmt) Accept(v Visitor) (Node, bool) {
type ExplainStmt struct {
stmtNode

Stmt StmtNode
Format string
Stmt StmtNode
Format string
Analyze bool
}

// Accept implements Node Accept interface.
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,12 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
if sessVars.InRestrictedSQL {
internal = "[INTERNAL] "
}
execDetail := sessVars.StmtCtx.GetExecDetails()
if costTime < threshold {
logutil.SlowQueryLogger.Debugf(
"[QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
} else {
execDetail := sessVars.StmtCtx.GetExecDetails()
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
Expand Down
10 changes: 9 additions & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"sync"
"time"

"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -501,6 +502,10 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.isUnparallelExec {
return errors.Trace(e.unparallelExec(ctx, chk))
Expand Down Expand Up @@ -756,8 +761,11 @@ func (e *StreamAggExec) Close() error {

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()

for !e.executed && chk.NumRows() < e.maxChunkSize {
err := e.consumeOneGroup(ctx, chk)
if err != nil {
Expand Down
31 changes: 29 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -448,13 +449,12 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &PrepareExec{
return &PrepareExec{
baseExecutor: base,
is: b.is,
name: v.Name,
sqlText: v.SQLText,
}
return e
}

func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
Expand Down Expand Up @@ -659,6 +659,33 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor {

// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.
func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
if v.Analyze {
stmt := &ExecStmt{
InfoSchema: GetInfoSchema(b.ctx),
Plan: v.ExecPlan,
StmtNode: v.ExecStmt,
Ctx: b.ctx,
}
b.ctx.GetSessionVars().StmtCtx.RuntimeStats = execdetails.NewRuntimeStats()
ctx := context.Background()
rs, err := stmt.Exec(ctx)
if err != nil {
return nil
}
if rs != nil {
chk := rs.NewChunk()
for {
err := rs.Next(ctx, chk)
if err != nil {
return nil
}
if chk.NumRows() == 0 {
break
}
}
}
}
v.PrepareRows()
e := &ExplainExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
}
Expand Down
10 changes: 10 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/tidb/distsql"
Expand Down Expand Up @@ -243,6 +244,10 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
err := e.result.Next(ctx, chk)
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -453,6 +458,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
e.tblWorkerWg.Add(lookupConcurrencyLimit)
e.baseExecutor.ctx.GetSessionVars().StmtCtx.RuntimeStats.GetRuntimeStat(e.id + "_tableReader")
for i := 0; i < lookupConcurrencyLimit; i++ {
worker := &tableWorker{
workCh: workCh,
Expand Down Expand Up @@ -512,6 +518,10 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
resultTask, err := e.getResultTask()
Expand Down
33 changes: 33 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/tidb/ast"
Expand All @@ -38,13 +39,15 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

var (
_ Executor = &baseExecutor{}
_ Executor = &CheckTableExec{}
_ Executor = &HashAggExec{}
_ Executor = &LimitExec{}
Expand Down Expand Up @@ -75,6 +78,7 @@ type baseExecutor struct {
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
runtimeStat *execdetails.RuntimeStat
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
Expand Down Expand Up @@ -130,6 +134,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
schema: schema,
initCap: ctx.GetSessionVars().MaxChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
runtimeStat: ctx.GetSessionVars().StmtCtx.RuntimeStats.GetRuntimeStat(id),
}
if schema != nil {
cols := schema.Columns
Expand Down Expand Up @@ -172,6 +177,10 @@ type CancelDDLJobsExec struct {

// Next implements the Executor Next interface.
func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobIDs) {
return nil
Expand Down Expand Up @@ -614,6 +623,10 @@ type LimitExec struct {

// Next implements the Executor Next interface.
func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.cursor >= e.end {
return nil
Expand Down Expand Up @@ -733,6 +746,10 @@ func (e *TableDualExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.numReturned >= e.numDualRows {
return nil
Expand Down Expand Up @@ -784,6 +801,10 @@ func (e *SelectionExec) Close() error {

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)

if !e.batched {
Expand Down Expand Up @@ -859,6 +880,10 @@ type TableScanExec struct {

// Next implements the Executor Next interface.
func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isVirtualTable {
return errors.Trace(e.nextChunk4InfoSchema(ctx, chk))
Expand Down Expand Up @@ -959,6 +984,10 @@ func (e *MaxOneRowExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.evaluated {
return nil
Expand Down Expand Up @@ -1101,6 +1130,10 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) {

// Next implements the Executor Next interface.
func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if !e.initialized {
e.initialize(ctx)
Expand Down
5 changes: 5 additions & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"runtime"
"sort"
"sync"
"time"
"unsafe"

"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -189,6 +190,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
e.joinResult.Reset()
for {
Expand Down
9 changes: 9 additions & 0 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -508,6 +509,10 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
// step 1. fetch data from inner child and build a hash table;
// step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers.
func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if !e.prepared {
e.innerFinished = make(chan error, 1)
go util.WithRecovery(func() { e.fetchInnerAndBuildHashTable(ctx) }, e.finishFetchInnerAndBuildHashTable)
Expand Down Expand Up @@ -721,6 +726,10 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

// Next implements the Executor interface.
func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
Expand Down
6 changes: 6 additions & 0 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package executor

import (
"time"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -261,6 +263,10 @@ func (e *MergeJoinExec) prepare(ctx context.Context, chk *chunk.Chunk) error {

// Next implements the Executor Next interface.
func (e *MergeJoinExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.prepared {
if err := e.prepare(ctx, chk); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package executor

import (
"time"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -139,6 +141,10 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
// +------------------------------+ +----------------------+
//
func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isUnparallelExec() {
return errors.Trace(e.unParallelExecute(ctx, chk))
Expand Down
9 changes: 9 additions & 0 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"container/heap"
"sort"
"time"

"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -73,6 +74,10 @@ func (e *SortExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.fetched {
err := e.fetchRowChunks(ctx)
Expand Down Expand Up @@ -296,6 +301,10 @@ func (e *TopNExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TopNExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.fetched {
e.totalLimit = int(e.limit.Offset + e.limit.Count)
Expand Down
Loading

0 comments on commit d21f294

Please sign in to comment.