Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add more runtime information for join executor (#19721) #20093

Merged
merged 10 commits into from
Sep 21, 2020
2 changes: 1 addition & 1 deletion executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type hashStatistic struct {
}

func (s *hashStatistic) String() string {
return fmt.Sprintf("probe collision:%v, build:%v", s.probeCollision, s.buildTableElapse)
return fmt.Sprintf("probe_collision:%v, build:%v", s.probeCollision, s.buildTableElapse)
}

// hashRowContainer handles the rows and the hash map of a table.
Expand Down
44 changes: 36 additions & 8 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"hash/fnv"
"runtime/trace"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -28,7 +30,6 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
)
Expand Down Expand Up @@ -66,6 +67,8 @@ type IndexNestedLoopHashJoin struct {
curTask *indexHashJoinTask
// taskCh is only used when `keepOuterOrder` is true.
taskCh chan *indexHashJoinTask

stats *indexLookUpJoinRuntimeStats
}

type indexHashJoinOuterWorker struct {
Expand Down Expand Up @@ -144,12 +147,21 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.startWorkers(ctx)
return nil
}

func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency
if e.stats != nil {
e.stats.concurrency = concurrency
}
workerCtx, cancelFunc := context.WithCancel(ctx)
e.cancelFunc = cancelFunc
innerCh := make(chan *indexHashJoinTask, concurrency)
Expand Down Expand Up @@ -297,12 +309,6 @@ func (e *IndexNestedLoopHashJoin) Close() error {
}
e.taskCh = nil
}
if e.runtimeStats != nil {
concurrency := cap(e.joinChkResourceCh)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
for i := range e.joinChkResourceCh {
close(e.joinChkResourceCh[i])
}
Expand Down Expand Up @@ -404,6 +410,10 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
for _, ran := range e.indexRanges {
copiedRanges = append(copiedRanges, ran.Clone())
}
var innerStats *innerWorkerRuntimeStats
if e.stats != nil {
innerStats = &e.stats.innerWorker
}
iw := &indexHashJoinInnerWorker{
innerWorker: innerWorker{
innerCtx: e.innerCtx,
Expand All @@ -412,6 +422,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
},
taskCh: taskCh,
joiner: e.joiners[workerID],
Expand Down Expand Up @@ -510,6 +521,12 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde
}

func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.build, int64(time.Since(start)))
}()
}
buf, numChks := make([]byte, 1), task.outerResult.NumChunks()
task.lookupMap = newRowHashMap(task.outerResult.Len())
for chkIdx := 0; chkIdx < numChks; chkIdx++ {
Expand Down Expand Up @@ -547,7 +564,6 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task
if err != nil {
return err
}
lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents)
return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents)
}

Expand All @@ -559,6 +575,16 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}
}

func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
var joinStartTime time.Time
if iw.stats != nil {
start := time.Now()
defer func() {
endTime := time.Now()
atomic.AddInt64(&iw.stats.totalTime, int64(endTime.Sub(start)))
atomic.AddInt64(&iw.stats.join, int64(endTime.Sub(joinStartTime)))
}()
}

iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
Expand All @@ -568,6 +594,8 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH
return err
}
iw.wg.Wait()

joinStartTime = time.Now()
if !task.keepOuterOrder {
return iw.doJoinUnordered(ctx, task, joinResult, h, resultCh)
}
Expand Down
108 changes: 101 additions & 7 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package executor

import (
"bytes"
"context"
"runtime"
"runtime/trace"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -77,6 +80,8 @@ type IndexLookUpJoin struct {
lastColHelper *plannercore.ColWithCmpFuncManager

memTracker *memory.Tracker // track memory usage.

stats *indexLookUpJoinRuntimeStats
}

type outerCtx struct {
Expand Down Expand Up @@ -138,6 +143,7 @@ type innerWorker struct {
indexRanges []*ranger.Range
nextColCompareFilters *plannercore.ColWithCmpFuncManager
keyOff2IdxOff []int
stats *innerWorkerRuntimeStats
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -171,12 +177,21 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.startWorkers(ctx)
return nil
}

func (e *IndexLookUpJoin) startWorkers(ctx context.Context) {
concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency
if e.stats != nil {
e.stats.concurrency = concurrency
}
resultCh := make(chan *lookUpJoinTask, concurrency)
e.resultCh = resultCh
workerCtx, cancelFunc := context.WithCancel(ctx)
Expand Down Expand Up @@ -212,6 +227,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork
copiedRanges = append(copiedRanges, ran.Clone())
}

var innerStats *innerWorkerRuntimeStats
if e.stats != nil {
innerStats = &e.stats.innerWorker
}
iw := &innerWorker{
innerCtx: e.innerCtx,
outerCtx: e.outerCtx,
Expand All @@ -220,6 +239,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
}
if e.lastColHelper != nil {
// nextCwf.TmpConstant needs to be reset for every individual
Expand Down Expand Up @@ -250,6 +270,7 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if task == nil {
return nil
}
startTime := time.Now()
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
e.lookUpMatchedInners(task, task.cursor)
e.innerIter = chunk.NewIterator4Slice(task.matchedInners)
Expand Down Expand Up @@ -277,6 +298,9 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error {
task.hasMatch = false
task.hasNull = false
}
if e.stats != nil {
atomic.AddInt64(&e.stats.probe, int64(time.Since(startTime)))
}
if req.IsFull() {
return nil
}
Expand Down Expand Up @@ -475,11 +499,16 @@ type indexJoinLookUpContent struct {
}

func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) error {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.totalTime, int64(time.Since(start)))
}()
}
lookUpContents, err := iw.constructLookupContent(task)
if err != nil {
return err
}
lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents)
err = iw.fetchInnerResults(ctx, task, lookUpContents)
if err != nil {
return err
Expand All @@ -492,6 +521,13 @@ func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) err
}

func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoinLookUpContent, error) {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.task, 1)
atomic.AddInt64(&iw.stats.construct, int64(time.Since(start)))
}()
}
lookUpContents := make([]*indexJoinLookUpContent, 0, task.outerResult.Len())
keyBuf := make([]byte, 0, 64)
for chkIdx := 0; chkIdx < task.outerResult.NumChunks(); chkIdx++ {
Expand Down Expand Up @@ -531,6 +567,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi
for i := range task.encodedLookUpKeys {
task.memTracker.Consume(task.encodedLookUpKeys[i].MemoryUsage())
}
lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents)
return lookUpContents, nil
}

Expand Down Expand Up @@ -612,6 +649,12 @@ func compareRow(sc *stmtctx.StatementContext, left, right []types.Datum) int {
}

func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask, lookUpContent []*indexJoinLookUpContent) error {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.fetch, int64(time.Since(start)))
}()
}
innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters)
if err != nil {
return err
Expand Down Expand Up @@ -641,6 +684,12 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
}

func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.build, int64(time.Since(start)))
}()
}
keyBuf := make([]byte, 0, 64)
valBuf := make([]byte, 8)
for i := 0; i < task.innerResult.NumChunks(); i++ {
Expand Down Expand Up @@ -685,11 +734,56 @@ func (e *IndexLookUpJoin) Close() error {
e.workerWg.Wait()
e.memTracker = nil
e.task = nil
if e.runtimeStats != nil {
concurrency := cap(e.resultCh)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return e.baseExecutor.Close()
}

type indexLookUpJoinRuntimeStats struct {
*execdetails.BasicRuntimeStats
concurrency int
probe int64
innerWorker innerWorkerRuntimeStats
}

type innerWorkerRuntimeStats struct {
totalTime int64
task int64
construct int64
fetch int64
build int64
join int64
}

func (e *indexLookUpJoinRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 16))
if e.BasicRuntimeStats != nil {
buf.WriteString(e.BasicRuntimeStats.String())
}
if e.innerWorker.totalTime > 0 {
buf.WriteString(", inner:{total:")
buf.WriteString(time.Duration(e.innerWorker.totalTime).String())
buf.WriteString(", concurrency:")
if e.concurrency > 0 {
buf.WriteString(strconv.Itoa(e.concurrency))
} else {
buf.WriteString("OFF")
}
buf.WriteString(", task:")
buf.WriteString(strconv.FormatInt(e.innerWorker.task, 10))
buf.WriteString(", construct:")
buf.WriteString(time.Duration(e.innerWorker.construct).String())
buf.WriteString(", fetch:")
buf.WriteString(time.Duration(e.innerWorker.fetch).String())
buf.WriteString(", build:")
buf.WriteString(time.Duration(e.innerWorker.build).String())
if e.innerWorker.join > 0 {
buf.WriteString(", join:")
buf.WriteString(time.Duration(e.innerWorker.join).String())
}
buf.WriteString("}")
}
if e.probe > 0 {
buf.WriteString(", probe:")
buf.WriteString(time.Duration(e.probe).String())
}
return buf.String()
}
Loading