Skip to content

Commit

Permalink
executor: add more runtime information for join executor (#19721)
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 authored Sep 7, 2020
1 parent 3d33bdf commit 221223b
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 29 deletions.
2 changes: 1 addition & 1 deletion executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type hashStatistic struct {
}

func (s *hashStatistic) String() string {
return fmt.Sprintf("probe collision:%v, build:%v", s.probeCollision, s.buildTableElapse)
return fmt.Sprintf("probe_collision:%v, build:%v", s.probeCollision, s.buildTableElapse)
}

// hashRowContainer handles the rows and the hash map of a table.
Expand Down
44 changes: 36 additions & 8 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"hash/fnv"
"runtime/trace"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -28,7 +30,6 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
)
Expand Down Expand Up @@ -66,6 +67,8 @@ type IndexNestedLoopHashJoin struct {
curTask *indexHashJoinTask
// taskCh is only used when `keepOuterOrder` is true.
taskCh chan *indexHashJoinTask

stats *indexLookUpJoinRuntimeStats
}

type indexHashJoinOuterWorker struct {
Expand Down Expand Up @@ -144,12 +147,21 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.startWorkers(ctx)
return nil
}

func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency()
if e.stats != nil {
e.stats.concurrency = concurrency
}
workerCtx, cancelFunc := context.WithCancel(ctx)
e.cancelFunc = cancelFunc
innerCh := make(chan *indexHashJoinTask, concurrency)
Expand Down Expand Up @@ -297,12 +309,6 @@ func (e *IndexNestedLoopHashJoin) Close() error {
}
e.taskCh = nil
}
if e.runtimeStats != nil {
concurrency := cap(e.joinChkResourceCh)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
for i := range e.joinChkResourceCh {
close(e.joinChkResourceCh[i])
}
Expand Down Expand Up @@ -404,6 +410,10 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
for _, ran := range e.indexRanges {
copiedRanges = append(copiedRanges, ran.Clone())
}
var innerStats *innerWorkerRuntimeStats
if e.stats != nil {
innerStats = &e.stats.innerWorker
}
iw := &indexHashJoinInnerWorker{
innerWorker: innerWorker{
innerCtx: e.innerCtx,
Expand All @@ -412,6 +422,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
},
taskCh: taskCh,
joiner: e.joiners[workerID],
Expand Down Expand Up @@ -510,6 +521,12 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde
}

func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.build, int64(time.Since(start)))
}()
}
buf, numChks := make([]byte, 1), task.outerResult.NumChunks()
task.lookupMap = newUnsafeHashTable(task.outerResult.Len())
for chkIdx := 0; chkIdx < numChks; chkIdx++ {
Expand Down Expand Up @@ -547,7 +564,6 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task
if err != nil {
return err
}
lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents)
return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents)
}

Expand All @@ -559,6 +575,16 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}
}

func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error {
var joinStartTime time.Time
if iw.stats != nil {
start := time.Now()
defer func() {
endTime := time.Now()
atomic.AddInt64(&iw.stats.totalTime, int64(endTime.Sub(start)))
atomic.AddInt64(&iw.stats.join, int64(endTime.Sub(joinStartTime)))
}()
}

iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
Expand All @@ -568,6 +594,8 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH
return err
}
iw.wg.Wait()

joinStartTime = time.Now()
if !task.keepOuterOrder {
return iw.doJoinUnordered(ctx, task, joinResult, h, resultCh)
}
Expand Down
108 changes: 101 additions & 7 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package executor

import (
"bytes"
"context"
"runtime"
"runtime/trace"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -77,6 +80,8 @@ type IndexLookUpJoin struct {
lastColHelper *plannercore.ColWithCmpFuncManager

memTracker *memory.Tracker // track memory usage.

stats *indexLookUpJoinRuntimeStats
}

type outerCtx struct {
Expand Down Expand Up @@ -138,6 +143,7 @@ type innerWorker struct {
indexRanges []*ranger.Range
nextColCompareFilters *plannercore.ColWithCmpFuncManager
keyOff2IdxOff []int
stats *innerWorkerRuntimeStats
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -171,12 +177,21 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.startWorkers(ctx)
return nil
}

func (e *IndexLookUpJoin) startWorkers(ctx context.Context) {
concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency()
if e.stats != nil {
e.stats.concurrency = concurrency
}
resultCh := make(chan *lookUpJoinTask, concurrency)
e.resultCh = resultCh
workerCtx, cancelFunc := context.WithCancel(ctx)
Expand Down Expand Up @@ -212,6 +227,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork
copiedRanges = append(copiedRanges, ran.Clone())
}

var innerStats *innerWorkerRuntimeStats
if e.stats != nil {
innerStats = &e.stats.innerWorker
}
iw := &innerWorker{
innerCtx: e.innerCtx,
outerCtx: e.outerCtx,
Expand All @@ -220,6 +239,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork
executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
}
if e.lastColHelper != nil {
// nextCwf.TmpConstant needs to be reset for every individual
Expand Down Expand Up @@ -250,6 +270,7 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error {
if task == nil {
return nil
}
startTime := time.Now()
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
e.lookUpMatchedInners(task, task.cursor)
e.innerIter = chunk.NewIterator4Slice(task.matchedInners)
Expand Down Expand Up @@ -277,6 +298,9 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error {
task.hasMatch = false
task.hasNull = false
}
if e.stats != nil {
atomic.AddInt64(&e.stats.probe, int64(time.Since(startTime)))
}
if req.IsFull() {
return nil
}
Expand Down Expand Up @@ -475,11 +499,16 @@ type indexJoinLookUpContent struct {
}

func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) error {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.totalTime, int64(time.Since(start)))
}()
}
lookUpContents, err := iw.constructLookupContent(task)
if err != nil {
return err
}
lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents)
err = iw.fetchInnerResults(ctx, task, lookUpContents)
if err != nil {
return err
Expand All @@ -492,6 +521,13 @@ func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) err
}

func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoinLookUpContent, error) {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.task, 1)
atomic.AddInt64(&iw.stats.construct, int64(time.Since(start)))
}()
}
lookUpContents := make([]*indexJoinLookUpContent, 0, task.outerResult.Len())
keyBuf := make([]byte, 0, 64)
for chkIdx := 0; chkIdx < task.outerResult.NumChunks(); chkIdx++ {
Expand Down Expand Up @@ -531,6 +567,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi
for i := range task.encodedLookUpKeys {
task.memTracker.Consume(task.encodedLookUpKeys[i].MemoryUsage())
}
lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents)
return lookUpContents, nil
}

Expand Down Expand Up @@ -612,6 +649,12 @@ func compareRow(sc *stmtctx.StatementContext, left, right []types.Datum) int {
}

func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask, lookUpContent []*indexJoinLookUpContent) error {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.fetch, int64(time.Since(start)))
}()
}
innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters)
if err != nil {
return err
Expand Down Expand Up @@ -641,6 +684,12 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
}

func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error {
if iw.stats != nil {
start := time.Now()
defer func() {
atomic.AddInt64(&iw.stats.build, int64(time.Since(start)))
}()
}
keyBuf := make([]byte, 0, 64)
valBuf := make([]byte, 8)
for i := 0; i < task.innerResult.NumChunks(); i++ {
Expand Down Expand Up @@ -685,11 +734,56 @@ func (e *IndexLookUpJoin) Close() error {
e.workerWg.Wait()
e.memTracker = nil
e.task = nil
if e.runtimeStats != nil {
concurrency := cap(e.resultCh)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return e.baseExecutor.Close()
}

type indexLookUpJoinRuntimeStats struct {
*execdetails.BasicRuntimeStats
concurrency int
probe int64
innerWorker innerWorkerRuntimeStats
}

type innerWorkerRuntimeStats struct {
totalTime int64
task int64
construct int64
fetch int64
build int64
join int64
}

func (e *indexLookUpJoinRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 16))
if e.BasicRuntimeStats != nil {
buf.WriteString(e.BasicRuntimeStats.String())
}
if e.innerWorker.totalTime > 0 {
buf.WriteString(", inner:{total:")
buf.WriteString(time.Duration(e.innerWorker.totalTime).String())
buf.WriteString(", concurrency:")
if e.concurrency > 0 {
buf.WriteString(strconv.Itoa(e.concurrency))
} else {
buf.WriteString("OFF")
}
buf.WriteString(", task:")
buf.WriteString(strconv.FormatInt(e.innerWorker.task, 10))
buf.WriteString(", construct:")
buf.WriteString(time.Duration(e.innerWorker.construct).String())
buf.WriteString(", fetch:")
buf.WriteString(time.Duration(e.innerWorker.fetch).String())
buf.WriteString(", build:")
buf.WriteString(time.Duration(e.innerWorker.build).String())
if e.innerWorker.join > 0 {
buf.WriteString(", join:")
buf.WriteString(time.Duration(e.innerWorker.join).String())
}
buf.WriteString("}")
}
if e.probe > 0 {
buf.WriteString(", probe:")
buf.WriteString(time.Duration(e.probe).String())
}
return buf.String()
}
Loading

0 comments on commit 221223b

Please sign in to comment.