diff --git a/executor/hash_table.go b/executor/hash_table.go index 24359bcfba36f..1df5828abb146 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -82,7 +82,7 @@ type hashStatistic struct { } func (s *hashStatistic) String() string { - return fmt.Sprintf("probe collision:%v, build:%v", s.probeCollision, s.buildTableElapse) + return fmt.Sprintf("probe_collision:%v, build:%v", s.probeCollision, s.buildTableElapse) } // hashRowContainer handles the rows and the hash map of a table. diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 49eb1bb2a9b20..b7cdbe739c245 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -20,6 +20,8 @@ import ( "hash/fnv" "runtime/trace" "sync" + "sync/atomic" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -28,7 +30,6 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" ) @@ -66,6 +67,8 @@ type IndexNestedLoopHashJoin struct { curTask *indexHashJoinTask // taskCh is only used when `keepOuterOrder` is true. taskCh chan *indexHashJoinTask + + stats *indexLookUpJoinRuntimeStats } type indexHashJoinOuterWorker struct { @@ -144,12 +147,21 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) + if e.runtimeStats != nil { + e.stats = &indexLookUpJoinRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } e.startWorkers(ctx) return nil } func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() + if e.stats != nil { + e.stats.concurrency = concurrency + } workerCtx, cancelFunc := context.WithCancel(ctx) e.cancelFunc = cancelFunc innerCh := make(chan *indexHashJoinTask, concurrency) @@ -297,12 +309,6 @@ func (e *IndexNestedLoopHashJoin) Close() error { } e.taskCh = nil } - if e.runtimeStats != nil { - concurrency := cap(e.joinChkResourceCh) - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - } for i := range e.joinChkResourceCh { close(e.joinChkResourceCh[i]) } @@ -404,6 +410,10 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, for _, ran := range e.indexRanges { copiedRanges = append(copiedRanges, ran.Clone()) } + var innerStats *innerWorkerRuntimeStats + if e.stats != nil { + innerStats = &e.stats.innerWorker + } iw := &indexHashJoinInnerWorker{ innerWorker: innerWorker{ innerCtx: e.innerCtx, @@ -412,6 +422,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, + stats: innerStats, }, taskCh: taskCh, joiner: e.joiners[workerID], @@ -510,6 +521,12 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde } func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.build, int64(time.Since(start))) + }() + } buf, numChks := make([]byte, 1), task.outerResult.NumChunks() task.lookupMap = newUnsafeHashTable(task.outerResult.Len()) for chkIdx := 0; chkIdx < numChks; chkIdx++ { @@ -547,7 +564,6 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task if err != nil { return err } - lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents) } @@ -559,6 +575,16 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{} } func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { + var joinStartTime time.Time + if iw.stats != nil { + start := time.Now() + defer func() { + endTime := time.Now() + atomic.AddInt64(&iw.stats.totalTime, int64(endTime.Sub(start))) + atomic.AddInt64(&iw.stats.join, int64(endTime.Sub(joinStartTime))) + }() + } + iw.wg = &sync.WaitGroup{} iw.wg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. @@ -568,6 +594,8 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH return err } iw.wg.Wait() + + joinStartTime = time.Now() if !task.keepOuterOrder { return iw.doJoinUnordered(ctx, task, joinResult, h, resultCh) } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index cbdc41292c486..f1692f2bd722e 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -14,12 +14,15 @@ package executor import ( + "bytes" "context" "runtime" "runtime/trace" "sort" + "strconv" "sync" "sync/atomic" + "time" "unsafe" "github.com/pingcap/errors" @@ -77,6 +80,8 @@ type IndexLookUpJoin struct { lastColHelper *plannercore.ColWithCmpFuncManager memTracker *memory.Tracker // track memory usage. + + stats *indexLookUpJoinRuntimeStats } type outerCtx struct { @@ -138,6 +143,7 @@ type innerWorker struct { indexRanges []*ranger.Range nextColCompareFilters *plannercore.ColWithCmpFuncManager keyOff2IdxOff []int + stats *innerWorkerRuntimeStats } // Open implements the Executor interface. @@ -171,12 +177,21 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) + if e.runtimeStats != nil { + e.stats = &indexLookUpJoinRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } e.startWorkers(ctx) return nil } func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() + if e.stats != nil { + e.stats.concurrency = concurrency + } resultCh := make(chan *lookUpJoinTask, concurrency) e.resultCh = resultCh workerCtx, cancelFunc := context.WithCancel(ctx) @@ -212,6 +227,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork copiedRanges = append(copiedRanges, ran.Clone()) } + var innerStats *innerWorkerRuntimeStats + if e.stats != nil { + innerStats = &e.stats.innerWorker + } iw := &innerWorker{ innerCtx: e.innerCtx, outerCtx: e.outerCtx, @@ -220,6 +239,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, + stats: innerStats, } if e.lastColHelper != nil { // nextCwf.TmpConstant needs to be reset for every individual @@ -250,6 +270,7 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { if task == nil { return nil } + startTime := time.Now() if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() { e.lookUpMatchedInners(task, task.cursor) e.innerIter = chunk.NewIterator4Slice(task.matchedInners) @@ -277,6 +298,9 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { task.hasMatch = false task.hasNull = false } + if e.stats != nil { + atomic.AddInt64(&e.stats.probe, int64(time.Since(startTime))) + } if req.IsFull() { return nil } @@ -475,11 +499,16 @@ type indexJoinLookUpContent struct { } func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) error { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.totalTime, int64(time.Since(start))) + }() + } lookUpContents, err := iw.constructLookupContent(task) if err != nil { return err } - lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) err = iw.fetchInnerResults(ctx, task, lookUpContents) if err != nil { return err @@ -492,6 +521,13 @@ func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) err } func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoinLookUpContent, error) { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.task, 1) + atomic.AddInt64(&iw.stats.construct, int64(time.Since(start))) + }() + } lookUpContents := make([]*indexJoinLookUpContent, 0, task.outerResult.Len()) keyBuf := make([]byte, 0, 64) for chkIdx := 0; chkIdx < task.outerResult.NumChunks(); chkIdx++ { @@ -531,6 +567,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi for i := range task.encodedLookUpKeys { task.memTracker.Consume(task.encodedLookUpKeys[i].MemoryUsage()) } + lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) return lookUpContents, nil } @@ -612,6 +649,12 @@ func compareRow(sc *stmtctx.StatementContext, left, right []types.Datum) int { } func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask, lookUpContent []*indexJoinLookUpContent) error { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.fetch, int64(time.Since(start))) + }() + } innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters) if err != nil { return err @@ -641,6 +684,12 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa } func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.build, int64(time.Since(start))) + }() + } keyBuf := make([]byte, 0, 64) valBuf := make([]byte, 8) for i := 0; i < task.innerResult.NumChunks(); i++ { @@ -685,11 +734,56 @@ func (e *IndexLookUpJoin) Close() error { e.workerWg.Wait() e.memTracker = nil e.task = nil - if e.runtimeStats != nil { - concurrency := cap(e.resultCh) - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - } return e.baseExecutor.Close() } + +type indexLookUpJoinRuntimeStats struct { + *execdetails.BasicRuntimeStats + concurrency int + probe int64 + innerWorker innerWorkerRuntimeStats +} + +type innerWorkerRuntimeStats struct { + totalTime int64 + task int64 + construct int64 + fetch int64 + build int64 + join int64 +} + +func (e *indexLookUpJoinRuntimeStats) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 16)) + if e.BasicRuntimeStats != nil { + buf.WriteString(e.BasicRuntimeStats.String()) + } + if e.innerWorker.totalTime > 0 { + buf.WriteString(", inner:{total:") + buf.WriteString(time.Duration(e.innerWorker.totalTime).String()) + buf.WriteString(", concurrency:") + if e.concurrency > 0 { + buf.WriteString(strconv.Itoa(e.concurrency)) + } else { + buf.WriteString("OFF") + } + buf.WriteString(", task:") + buf.WriteString(strconv.FormatInt(e.innerWorker.task, 10)) + buf.WriteString(", construct:") + buf.WriteString(time.Duration(e.innerWorker.construct).String()) + buf.WriteString(", fetch:") + buf.WriteString(time.Duration(e.innerWorker.fetch).String()) + buf.WriteString(", build:") + buf.WriteString(time.Duration(e.innerWorker.build).String()) + if e.innerWorker.join > 0 { + buf.WriteString(", join:") + buf.WriteString(time.Duration(e.innerWorker.join).String()) + } + buf.WriteString("}") + } + if e.probe > 0 { + buf.WriteString(", probe:") + buf.WriteString(time.Duration(e.probe).String()) + } + return buf.String() +} diff --git a/executor/join.go b/executor/join.go index e552c349e8f91..31cbed3e28049 100644 --- a/executor/join.go +++ b/executor/join.go @@ -14,11 +14,14 @@ package executor import ( + "bytes" "context" "fmt" "runtime/trace" + "strconv" "sync" "sync/atomic" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -87,6 +90,8 @@ type HashJoinExec struct { // joinWorkerWaitGroup is for sync multiple join workers. joinWorkerWaitGroup sync.WaitGroup finished atomic.Value + + stats *hashJoinRuntimeStats } // probeChkResource stores the result of the join probe side fetch worker, @@ -140,14 +145,8 @@ func (e *HashJoinExec) Close() error { } e.outerMatchedStatus = e.outerMatchedStatus[:0] - if e.runtimeStats != nil { - concurrency := cap(e.joiners) - runtimeStats := newJoinRuntimeStats(e.runtimeStats) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - if e.rowContainer != nil { - runtimeStats.setHashStat(e.rowContainer.stat) - } + if e.stats != nil && e.rowContainer != nil { + e.stats.hashStat = e.rowContainer.stat } err := e.baseExecutor.Close() return err @@ -176,6 +175,13 @@ func (e *HashJoinExec) Open(ctx context.Context) error { if e.buildTypes == nil { e.buildTypes = retTypes(e.buildSideExec) } + if e.runtimeStats != nil { + e.stats = &hashJoinRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + concurrent: cap(e.joiners), + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } return nil } @@ -405,6 +411,17 @@ func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() { } func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { + probeTime := int64(0) + if e.stats != nil { + start := time.Now() + defer func() { + t := time.Since(start) + atomic.AddInt64(&e.stats.probe, probeTime) + atomic.AddInt64(&e.stats.fetchAndProbe, int64(t)) + e.stats.setMaxFetchAndProbeTime(int64(t)) + }() + } + var ( probeSideResult *chunk.Chunk selected = make([]bool, 0, chunk.InitialCapacity) @@ -434,11 +451,13 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { if !ok { break } + start := time.Now() if e.useOuterToBuild { ok, joinResult = e.join2ChunkForOuterHashJoin(workerID, probeSideResult, hCtx, joinResult) } else { ok, joinResult = e.join2Chunk(workerID, probeSideResult, hCtx, joinResult, selected) } + probeTime += int64(time.Since(start)) if !ok { break } @@ -648,6 +667,12 @@ func (e *HashJoinExec) handleFetchAndBuildHashTablePanic(r interface{}) { } func (e *HashJoinExec) fetchAndBuildHashTable(ctx context.Context) { + if e.stats != nil { + start := time.Now() + defer func() { + e.stats.fetchAndBuildHashTable = time.Since(start) + }() + } // buildSideResultCh transfers build side chunk from build side fetch to build hash table. buildSideResultCh := make(chan *chunk.Chunk, 1) doneCh := make(chan struct{}) @@ -997,16 +1022,72 @@ func (e *joinRuntimeStats) setHashStat(hashStat hashStatistic) { } func (e *joinRuntimeStats) String() string { - result := e.RuntimeStatsWithConcurrencyInfo.String() + buf := bytes.NewBuffer(make([]byte, 0, 16)) + buf.WriteString(e.RuntimeStatsWithConcurrencyInfo.String()) if e.applyCache { if e.cache.useCache { - result += fmt.Sprintf(", cache:ON, cacheHitRatio:%.3f%%", e.cache.hitRatio*100) + buf.WriteString(fmt.Sprintf(", cache:ON, cacheHitRatio:%.3f%%", e.cache.hitRatio*100)) } else { - result += fmt.Sprintf(", cache:OFF") + buf.WriteString(fmt.Sprintf(", cache:OFF")) } } if e.hasHashStat { - result += ", " + e.hashStat.String() + buf.WriteString(", " + e.hashStat.String()) + } + return buf.String() +} + +type hashJoinRuntimeStats struct { + *execdetails.BasicRuntimeStats + + fetchAndBuildHashTable time.Duration + hashStat hashStatistic + fetchAndProbe int64 + probe int64 + concurrent int + maxFetchAndProbe int64 +} + +func (e *hashJoinRuntimeStats) setMaxFetchAndProbeTime(t int64) { + for { + value := atomic.LoadInt64(&e.maxFetchAndProbe) + if t <= value { + return + } + if atomic.CompareAndSwapInt64(&e.maxFetchAndProbe, value, t) { + return + } + } +} + +func (e *hashJoinRuntimeStats) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 128)) + buf.WriteString(e.BasicRuntimeStats.String()) + if e.fetchAndBuildHashTable > 0 { + buf.WriteString(", build_hash_table:{total:") + buf.WriteString(e.fetchAndBuildHashTable.String()) + buf.WriteString(", fetch:") + buf.WriteString((e.fetchAndBuildHashTable - e.hashStat.buildTableElapse).String()) + buf.WriteString(", build:") + buf.WriteString(e.hashStat.buildTableElapse.String()) + buf.WriteString("}") + } + if e.probe > 0 { + buf.WriteString(", probe:{concurrency:") + buf.WriteString(strconv.Itoa(e.concurrent)) + buf.WriteString(", total:") + buf.WriteString(time.Duration(e.fetchAndProbe).String()) + buf.WriteString(", max:") + buf.WriteString(time.Duration(atomic.LoadInt64(&e.maxFetchAndProbe)).String()) + buf.WriteString(", probe:") + buf.WriteString(time.Duration(e.probe).String()) + buf.WriteString(", fetch:") + buf.WriteString(time.Duration(e.fetchAndProbe - e.probe).String()) + if e.hashStat.probeCollision > 0 { + buf.WriteString(", probe_collision:") + buf.WriteString(strconv.Itoa(e.hashStat.probeCollision)) + } + buf.WriteString("}") } - return result + return buf.String() } diff --git a/executor/join_test.go b/executor/join_test.go index 9846a13fcd11e..4a3f97a143a3c 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2291,3 +2291,27 @@ func (s *testSuiteJoin3) TestIssue19500(c *C) { tk.MustQuery("select (select (select sum(c_int) from t3 where t3.c_str > t2.c_str) from t2 where t2.c_int > t1.c_int order by c_int limit 1) q from t1 order by q;"). Check(testkit.Rows("", "", "3", "3", "3")) } + +func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1,t2;") + tk.MustExec("create table t1 (a int, b int, unique index (a));") + tk.MustExec("create table t2 (a int, b int, unique index (a))") + tk.MustExec("insert into t1 values (1,1),(2,2),(3,3),(4,4),(5,5)") + tk.MustExec("insert into t2 values (1,1),(2,2),(3,3),(4,4),(5,5)") + // Test for index lookup join. + rows := tk.MustQuery("explain analyze select /*+ INL_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 8) + c.Assert(rows[0][0], Matches, "IndexJoin_.*") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*}, probe:.*") + // Test for index lookup hash join. + rows = tk.MustQuery("explain analyze select /*+ INL_HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 8) + c.Assert(rows[0][0], Matches, "IndexHashJoin.*") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*, join:.*}") + // Test for hash join. + rows = tk.MustQuery("explain analyze select /*+ HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 7) + c.Assert(rows[0][0], Matches, "HashJoin.*") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, max:.*, probe:.*, fetch:.*}") +}