From 5406cfd441c6c92a974812bcdf2b9084edde9a52 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 1 Sep 2020 19:48:33 +0800 Subject: [PATCH 1/8] add join stats Signed-off-by: crazycs520 --- executor/hash_table.go | 2 +- executor/index_lookup_hash_join.go | 1 - executor/index_lookup_join.go | 99 +++++++++++++++++++++++++++--- executor/join.go | 44 +++++++++---- 4 files changed, 124 insertions(+), 22 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index 24359bcfba36f..1df5828abb146 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -82,7 +82,7 @@ type hashStatistic struct { } func (s *hashStatistic) String() string { - return fmt.Sprintf("probe collision:%v, build:%v", s.probeCollision, s.buildTableElapse) + return fmt.Sprintf("probe_collision:%v, build:%v", s.probeCollision, s.buildTableElapse) } // hashRowContainer handles the rows and the hash map of a table. diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index d0ca46488c074..5d419b3cf3ebe 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -545,7 +545,6 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task if err != nil { return err } - lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents) } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index cbdc41292c486..60df4bd13dd7e 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -14,12 +14,15 @@ package executor import ( + "bytes" "context" "runtime" "runtime/trace" "sort" + "strconv" "sync" "sync/atomic" + "time" "unsafe" "github.com/pingcap/errors" @@ -77,6 +80,8 @@ type IndexLookUpJoin struct { lastColHelper *plannercore.ColWithCmpFuncManager memTracker *memory.Tracker // track memory usage. + + stats *indexLookUpJoinRuntimeStats } type outerCtx struct { @@ -138,6 +143,7 @@ type innerWorker struct { indexRanges []*ranger.Range nextColCompareFilters *plannercore.ColWithCmpFuncManager keyOff2IdxOff []int + stats *innerWorkerRuntimeStats } // Open implements the Executor interface. @@ -171,12 +177,19 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) + if e.runtimeStats != nil { + e.stats = &indexLookUpJoinRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } e.startWorkers(ctx) return nil } func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() + e.stats.concurrency = concurrency resultCh := make(chan *lookUpJoinTask, concurrency) e.resultCh = resultCh workerCtx, cancelFunc := context.WithCancel(ctx) @@ -212,6 +225,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork copiedRanges = append(copiedRanges, ran.Clone()) } + var innerStats *innerWorkerRuntimeStats + if e.stats != nil { + innerStats = &e.stats.innerWorker + } iw := &innerWorker{ innerCtx: e.innerCtx, outerCtx: e.outerCtx, @@ -220,6 +237,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, + stats: innerStats, } if e.lastColHelper != nil { // nextCwf.TmpConstant needs to be reset for every individual @@ -250,6 +268,7 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { if task == nil { return nil } + startTime := time.Now() if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() { e.lookUpMatchedInners(task, task.cursor) e.innerIter = chunk.NewIterator4Slice(task.matchedInners) @@ -277,6 +296,9 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { task.hasMatch = false task.hasNull = false } + if e.stats != nil { + atomic.AddInt64(&e.stats.probe, int64(time.Since(startTime))) + } if req.IsFull() { return nil } @@ -475,11 +497,16 @@ type indexJoinLookUpContent struct { } func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) error { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.totalTime, int64(time.Since(start))) + }() + } lookUpContents, err := iw.constructLookupContent(task) if err != nil { return err } - lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) err = iw.fetchInnerResults(ctx, task, lookUpContents) if err != nil { return err @@ -492,6 +519,13 @@ func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) err } func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoinLookUpContent, error) { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.task, 1) + atomic.AddInt64(&iw.stats.construct, int64(time.Since(start))) + }() + } lookUpContents := make([]*indexJoinLookUpContent, 0, task.outerResult.Len()) keyBuf := make([]byte, 0, 64) for chkIdx := 0; chkIdx < task.outerResult.NumChunks(); chkIdx++ { @@ -531,6 +565,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi for i := range task.encodedLookUpKeys { task.memTracker.Consume(task.encodedLookUpKeys[i].MemoryUsage()) } + lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) return lookUpContents, nil } @@ -612,6 +647,12 @@ func compareRow(sc *stmtctx.StatementContext, left, right []types.Datum) int { } func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask, lookUpContent []*indexJoinLookUpContent) error { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.fetch, int64(time.Since(start))) + }() + } innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters) if err != nil { return err @@ -641,6 +682,10 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa } func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.build, int64(time.Since(start))) + }() keyBuf := make([]byte, 0, 64) valBuf := make([]byte, 8) for i := 0; i < task.innerResult.NumChunks(); i++ { @@ -685,11 +730,51 @@ func (e *IndexLookUpJoin) Close() error { e.workerWg.Wait() e.memTracker = nil e.task = nil - if e.runtimeStats != nil { - concurrency := cap(e.resultCh) - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - } return e.baseExecutor.Close() } + +type indexLookUpJoinRuntimeStats struct { + *execdetails.BasicRuntimeStats + concurrency int + probe int64 + innerWorker innerWorkerRuntimeStats +} + +type innerWorkerRuntimeStats struct { + totalTime int64 + task int64 + construct int64 + fetch int64 + build int64 +} + +func (e *indexLookUpJoinRuntimeStats) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 16)) + if e.BasicRuntimeStats != nil { + buf.WriteString(e.BasicRuntimeStats.String()) + } + if e.probe > 0 { + buf.WriteString(", probe:") + buf.WriteString(time.Duration(e.probe).String()) + } + if e.innerWorker.totalTime > 0 { + buf.WriteString(", inner:{total:") + buf.WriteString(time.Duration(e.innerWorker.totalTime).String()) + buf.WriteString(", concurrency:") + if e.concurrency > 0 { + buf.WriteString(strconv.Itoa(e.concurrency)) + } else { + buf.WriteString("OFF") + } + buf.WriteString(", task:") + buf.WriteString(strconv.FormatInt(e.innerWorker.task, 10)) + buf.WriteString(", construct:") + buf.WriteString(time.Duration(e.innerWorker.construct).String()) + buf.WriteString(", fetch:") + buf.WriteString(time.Duration(e.innerWorker.fetch).String()) + buf.WriteString(", build:") + buf.WriteString(time.Duration(e.innerWorker.build).String()) + buf.WriteString("}") + } + return buf.String() +} diff --git a/executor/join.go b/executor/join.go index e552c349e8f91..608658346e3f7 100644 --- a/executor/join.go +++ b/executor/join.go @@ -14,11 +14,13 @@ package executor import ( + "bytes" "context" "fmt" "runtime/trace" "sync" "sync/atomic" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -87,6 +89,8 @@ type HashJoinExec struct { // joinWorkerWaitGroup is for sync multiple join workers. joinWorkerWaitGroup sync.WaitGroup finished atomic.Value + + stats *joinRuntimeStats } // probeChkResource stores the result of the join probe side fetch worker, @@ -140,14 +144,8 @@ func (e *HashJoinExec) Close() error { } e.outerMatchedStatus = e.outerMatchedStatus[:0] - if e.runtimeStats != nil { - concurrency := cap(e.joiners) - runtimeStats := newJoinRuntimeStats(e.runtimeStats) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - if e.rowContainer != nil { - runtimeStats.setHashStat(e.rowContainer.stat) - } + if e.stats != nil && e.rowContainer != nil { + e.stats.setHashStat(e.rowContainer.stat) } err := e.baseExecutor.Close() return err @@ -176,6 +174,12 @@ func (e *HashJoinExec) Open(ctx context.Context) error { if e.buildTypes == nil { e.buildTypes = retTypes(e.buildSideExec) } + if e.runtimeStats != nil { + e.stats = newJoinRuntimeStats(e.runtimeStats) + concurrency := cap(e.joiners) + e.stats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } return nil } @@ -422,6 +426,12 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { allTypes: e.probeTypes, keyColIdx: probeKeyColIdx, } + probeTime := int64(0) + if e.stats != nil { + defer func() { + atomic.AddInt64(&e.stats.probe, probeTime) + }() + } for ok := true; ok; { if e.finished.Load().(bool) { break @@ -434,11 +444,13 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { if !ok { break } + start := time.Now() if e.useOuterToBuild { ok, joinResult = e.join2ChunkForOuterHashJoin(workerID, probeSideResult, hCtx, joinResult) } else { ok, joinResult = e.join2Chunk(workerID, probeSideResult, hCtx, joinResult, selected) } + probeTime += int64(time.Since(start)) if !ok { break } @@ -969,6 +981,7 @@ type joinRuntimeStats struct { cache cacheInfo hasHashStat bool hashStat hashStatistic + probe int64 } func newJoinRuntimeStats(basic *execdetails.BasicRuntimeStats) *joinRuntimeStats { @@ -997,16 +1010,21 @@ func (e *joinRuntimeStats) setHashStat(hashStat hashStatistic) { } func (e *joinRuntimeStats) String() string { - result := e.RuntimeStatsWithConcurrencyInfo.String() + buf := bytes.NewBuffer(make([]byte, 0, 16)) + buf.WriteString(e.RuntimeStatsWithConcurrencyInfo.String()) if e.applyCache { if e.cache.useCache { - result += fmt.Sprintf(", cache:ON, cacheHitRatio:%.3f%%", e.cache.hitRatio*100) + buf.WriteString(fmt.Sprintf(", cache:ON, cacheHitRatio:%.3f%%", e.cache.hitRatio*100)) } else { - result += fmt.Sprintf(", cache:OFF") + buf.WriteString(fmt.Sprintf(", cache:OFF")) } } if e.hasHashStat { - result += ", " + e.hashStat.String() + buf.WriteString(", " + e.hashStat.String()) + } + if e.probe > 0 { + buf.WriteString(", probe:") + buf.WriteString(time.Duration(e.probe).String()) } - return result + return buf.String() } From 3bba5548f36bb9032ed5ef294e7af44a5fc2cab8 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 2 Sep 2020 15:29:17 +0800 Subject: [PATCH 2/8] add index for index look up hash join Signed-off-by: crazycs520 --- executor/index_lookup_hash_join.go | 43 +++++++++++++++++++++++++----- executor/index_lookup_join.go | 17 ++++++++---- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 5d419b3cf3ebe..ed3c45f850a3b 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -20,6 +20,8 @@ import ( "hash/fnv" "runtime/trace" "sync" + "sync/atomic" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -28,7 +30,6 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" ) @@ -66,6 +67,8 @@ type IndexNestedLoopHashJoin struct { curTask *indexHashJoinTask // taskCh is only used when `keepOuterOrder` is true. taskCh chan *indexHashJoinTask + + stats *indexLookUpJoinRuntimeStats } type indexHashJoinOuterWorker struct { @@ -144,12 +147,21 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) + if e.runtimeStats != nil { + e.stats = &indexLookUpJoinRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } e.startWorkers(ctx) return nil } func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() + if e.stats != nil { + e.stats.concurrency = concurrency + } workerCtx, cancelFunc := context.WithCancel(ctx) e.cancelFunc = cancelFunc innerCh := make(chan *indexHashJoinTask, concurrency) @@ -297,12 +309,6 @@ func (e *IndexNestedLoopHashJoin) Close() error { } e.taskCh = nil } - if e.runtimeStats != nil { - concurrency := cap(e.joinChkResourceCh) - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - } for i := range e.joinChkResourceCh { close(e.joinChkResourceCh[i]) } @@ -403,6 +409,10 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, for _, ran := range e.indexRanges { copiedRanges = append(copiedRanges, ran.Clone()) } + var innerStats *innerWorkerRuntimeStats + if e.stats != nil { + innerStats = &e.stats.innerWorker + } iw := &indexHashJoinInnerWorker{ innerWorker: innerWorker{ innerCtx: e.innerCtx, @@ -411,6 +421,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, + stats: innerStats, }, taskCh: taskCh, joiner: e.joiners[workerID], @@ -508,6 +519,12 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde } func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.build, int64(time.Since(start))) + }() + } buf, numChks := make([]byte, 1), task.outerResult.NumChunks() task.lookupMap = newUnsafeHashTable(task.outerResult.Len()) for chkIdx := 0; chkIdx < numChks; chkIdx++ { @@ -556,6 +573,16 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{} } func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { + var joinStartTime time.Time + if iw.stats != nil { + start := time.Now() + defer func() { + endTime := time.Now() + atomic.AddInt64(&iw.stats.totalTime, int64(endTime.Sub(start))) + atomic.AddInt64(&iw.stats.join, int64(endTime.Sub(joinStartTime))) + }() + } + iw.wg = &sync.WaitGroup{} iw.wg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. @@ -565,6 +592,8 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH return err } iw.wg.Wait() + + joinStartTime = time.Now() if !task.keepOuterOrder { return iw.doJoinUnordered(ctx, task, joinResult, h, resultCh) } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 60df4bd13dd7e..2c3632db3bbe2 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -189,7 +189,9 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() - e.stats.concurrency = concurrency + if e.stats != nil { + e.stats.concurrency = concurrency + } resultCh := make(chan *lookUpJoinTask, concurrency) e.resultCh = resultCh workerCtx, cancelFunc := context.WithCancel(ctx) @@ -682,10 +684,12 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa } func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error { - start := time.Now() - defer func() { - atomic.AddInt64(&iw.stats.build, int64(time.Since(start))) - }() + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.build, int64(time.Since(start))) + }() + } keyBuf := make([]byte, 0, 64) valBuf := make([]byte, 8) for i := 0; i < task.innerResult.NumChunks(); i++ { @@ -746,6 +750,7 @@ type innerWorkerRuntimeStats struct { construct int64 fetch int64 build int64 + join int64 } func (e *indexLookUpJoinRuntimeStats) String() string { @@ -774,6 +779,8 @@ func (e *indexLookUpJoinRuntimeStats) String() string { buf.WriteString(time.Duration(e.innerWorker.fetch).String()) buf.WriteString(", build:") buf.WriteString(time.Duration(e.innerWorker.build).String()) + buf.WriteString(", join:") + buf.WriteString(time.Duration(e.innerWorker.join).String()) buf.WriteString("}") } return buf.String() From 6d1d0aab420da2e241b95d1421d88f2bddb8a8cf Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 2 Sep 2020 18:34:21 +0800 Subject: [PATCH 3/8] add test Signed-off-by: crazycs520 --- executor/index_lookup_join.go | 6 ++++-- executor/join_test.go | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 2c3632db3bbe2..444cf05743a71 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -779,8 +779,10 @@ func (e *indexLookUpJoinRuntimeStats) String() string { buf.WriteString(time.Duration(e.innerWorker.fetch).String()) buf.WriteString(", build:") buf.WriteString(time.Duration(e.innerWorker.build).String()) - buf.WriteString(", join:") - buf.WriteString(time.Duration(e.innerWorker.join).String()) + if e.innerWorker.join > 0 { + buf.WriteString(", join:") + buf.WriteString(time.Duration(e.innerWorker.join).String()) + } buf.WriteString("}") } return buf.String() diff --git a/executor/join_test.go b/executor/join_test.go index aee678d028905..918873f8ec76f 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2285,3 +2285,22 @@ func (s *testSuiteJoin3) TestIssue19500(c *C) { tk.MustQuery("select (select (select sum(c_int) from t3 where t3.c_str > t2.c_str) from t2 where t2.c_int > t1.c_int order by c_int limit 1) q from t1 order by q;"). Check(testkit.Rows("", "", "3", "3", "3")) } + +func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1,t2;") + tk.MustExec("create table t1 (a int, b int, unique index (a));") + tk.MustExec("create table t2 (a int, b int, unique index (a))") + tk.MustExec("insert into t1 values (1,1),(2,2),(3,3),(4,4),(5,5)") + tk.MustExec("insert into t2 values (1,1),(2,2),(3,3),(4,4),(5,5)") + // Test for index lookup join. + rows := tk.MustQuery("explain analyze select /*+ INL_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 8) + c.Assert(rows[0][0], Matches, "IndexJoin_.*") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, probe:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*}") + // Test for index lookup hash join. + rows = tk.MustQuery("explain analyze select /*+ INL_HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 8) + c.Assert(rows[0][0], Matches, "IndexHashJoin.*") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*, join:.*}") +} From 2b0d75b626fce3e9bfc39b57fc18891924688cc7 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 2 Sep 2020 19:26:29 +0800 Subject: [PATCH 4/8] refine hash join runtime stats Signed-off-by: crazycs520 --- executor/join.go | 73 ++++++++++++++++++++++++++++++++++++------- executor/join_test.go | 5 +++ 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/executor/join.go b/executor/join.go index 608658346e3f7..8d8dfc6ae3993 100644 --- a/executor/join.go +++ b/executor/join.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "runtime/trace" + "strconv" "sync" "sync/atomic" "time" @@ -90,7 +91,7 @@ type HashJoinExec struct { joinWorkerWaitGroup sync.WaitGroup finished atomic.Value - stats *joinRuntimeStats + stats *hashJoinRuntimeStats } // probeChkResource stores the result of the join probe side fetch worker, @@ -145,7 +146,7 @@ func (e *HashJoinExec) Close() error { e.outerMatchedStatus = e.outerMatchedStatus[:0] if e.stats != nil && e.rowContainer != nil { - e.stats.setHashStat(e.rowContainer.stat) + e.stats.hashStat = e.rowContainer.stat } err := e.baseExecutor.Close() return err @@ -175,9 +176,10 @@ func (e *HashJoinExec) Open(ctx context.Context) error { e.buildTypes = retTypes(e.buildSideExec) } if e.runtimeStats != nil { - e.stats = newJoinRuntimeStats(e.runtimeStats) - concurrency := cap(e.joiners) - e.stats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + e.stats = &hashJoinRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + concurrent: cap(e.joiners), + } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } return nil @@ -409,6 +411,15 @@ func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() { } func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { + probeTime := int64(0) + if e.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&e.stats.probe, probeTime) + atomic.AddInt64(&e.stats.fetchAndProbe, int64(time.Since(start))) + }() + } + var ( probeSideResult *chunk.Chunk selected = make([]bool, 0, chunk.InitialCapacity) @@ -426,12 +437,6 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { allTypes: e.probeTypes, keyColIdx: probeKeyColIdx, } - probeTime := int64(0) - if e.stats != nil { - defer func() { - atomic.AddInt64(&e.stats.probe, probeTime) - }() - } for ok := true; ok; { if e.finished.Load().(bool) { break @@ -660,6 +665,12 @@ func (e *HashJoinExec) handleFetchAndBuildHashTablePanic(r interface{}) { } func (e *HashJoinExec) fetchAndBuildHashTable(ctx context.Context) { + if e.stats != nil { + start := time.Now() + defer func() { + e.stats.fetchAndBuildHashTable = time.Since(start) + }() + } // buildSideResultCh transfers build side chunk from build side fetch to build hash table. buildSideResultCh := make(chan *chunk.Chunk, 1) doneCh := make(chan struct{}) @@ -1028,3 +1039,43 @@ func (e *joinRuntimeStats) String() string { } return buf.String() } + +type hashJoinRuntimeStats struct { + *execdetails.BasicRuntimeStats + + fetchAndBuildHashTable time.Duration + hashStat hashStatistic + fetchAndProbe int64 + probe int64 + concurrent int +} + +func (e *hashJoinRuntimeStats) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 128)) + buf.WriteString(e.BasicRuntimeStats.String()) + if e.fetchAndBuildHashTable > 0 { + buf.WriteString(", build_hash_table:{total:") + buf.WriteString(e.fetchAndBuildHashTable.String()) + buf.WriteString(", fetch:") + buf.WriteString((e.fetchAndBuildHashTable - e.hashStat.buildTableElapse).String()) + buf.WriteString(", build:") + buf.WriteString(e.hashStat.buildTableElapse.String()) + buf.WriteString("}") + } + if e.probe > 0 { + buf.WriteString(", probe:{concurrency:") + buf.WriteString(strconv.Itoa(e.concurrent)) + buf.WriteString(", total:") + buf.WriteString(time.Duration(e.fetchAndProbe).String()) + buf.WriteString(", probe:") + buf.WriteString(time.Duration(e.probe).String()) + buf.WriteString(", fetch:") + buf.WriteString(time.Duration(e.fetchAndProbe - e.probe).String()) + if e.hashStat.probeCollision > 0 { + buf.WriteString(", probe_collision:") + buf.WriteString(strconv.Itoa(e.hashStat.probeCollision)) + } + buf.WriteString("}") + } + return buf.String() +} diff --git a/executor/join_test.go b/executor/join_test.go index 918873f8ec76f..5c7170afd3a49 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2303,4 +2303,9 @@ func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) { c.Assert(len(rows), Equals, 8) c.Assert(rows[0][0], Matches, "IndexHashJoin.*") c.Assert(rows[0][5], Matches, "time:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*, join:.*}") + // Test for hash join. + rows = tk.MustQuery("explain analyze select /*+ HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 7) + c.Assert(rows[0][0], Matches, "HashJoin.*") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, probe:.*, fetch:.*}") } From 08567311adf434931c06f897464a5613244387e6 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 2 Sep 2020 19:39:01 +0800 Subject: [PATCH 5/8] refine code Signed-off-by: crazycs520 --- executor/join.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/executor/join.go b/executor/join.go index 8d8dfc6ae3993..8ba6e9c891a3d 100644 --- a/executor/join.go +++ b/executor/join.go @@ -992,7 +992,6 @@ type joinRuntimeStats struct { cache cacheInfo hasHashStat bool hashStat hashStatistic - probe int64 } func newJoinRuntimeStats(basic *execdetails.BasicRuntimeStats) *joinRuntimeStats { @@ -1033,10 +1032,6 @@ func (e *joinRuntimeStats) String() string { if e.hasHashStat { buf.WriteString(", " + e.hashStat.String()) } - if e.probe > 0 { - buf.WriteString(", probe:") - buf.WriteString(time.Duration(e.probe).String()) - } return buf.String() } From 93c4bf3cbac04a7bbfbb992b1a5468eac696bb52 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 2 Sep 2020 20:31:21 +0800 Subject: [PATCH 6/8] refine code Signed-off-by: crazycs520 --- executor/index_lookup_join.go | 8 ++++---- executor/join_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 984f0bc626176..b9d2265c32c9f 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -760,10 +760,6 @@ func (e *indexLookUpJoinRuntimeStats) String() string { if e.BasicRuntimeStats != nil { buf.WriteString(e.BasicRuntimeStats.String()) } - if e.probe > 0 { - buf.WriteString(", probe:") - buf.WriteString(time.Duration(e.probe).String()) - } if e.innerWorker.totalTime > 0 { buf.WriteString(", inner:{total:") buf.WriteString(time.Duration(e.innerWorker.totalTime).String()) @@ -787,5 +783,9 @@ func (e *indexLookUpJoinRuntimeStats) String() string { } buf.WriteString("}") } + if e.probe > 0 { + buf.WriteString(", probe:") + buf.WriteString(time.Duration(e.probe).String()) + } return buf.String() } diff --git a/executor/join_test.go b/executor/join_test.go index 5c7170afd3a49..7e63dc6190485 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2297,7 +2297,7 @@ func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) { rows := tk.MustQuery("explain analyze select /*+ INL_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() c.Assert(len(rows), Equals, 8) c.Assert(rows[0][0], Matches, "IndexJoin_.*") - c.Assert(rows[0][5], Matches, "time:.*, loops:.*, probe:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*}") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*}, probe:.*") // Test for index lookup hash join. rows = tk.MustQuery("explain analyze select /*+ INL_HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() c.Assert(len(rows), Equals, 8) From 2009920150c624575af4ef9a98df81e1c8b6efb6 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 4 Sep 2020 10:49:15 +0800 Subject: [PATCH 7/8] add max hash join worker time Signed-off-by: crazycs520 --- executor/join.go | 20 +++++++++++++++++++- executor/join_test.go | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/executor/join.go b/executor/join.go index 8ba6e9c891a3d..2e67443cbf43b 100644 --- a/executor/join.go +++ b/executor/join.go @@ -415,8 +415,10 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { if e.stats != nil { start := time.Now() defer func() { + t := time.Since(start) atomic.AddInt64(&e.stats.probe, probeTime) - atomic.AddInt64(&e.stats.fetchAndProbe, int64(time.Since(start))) + atomic.AddInt64(&e.stats.fetchAndProbe, int64(t)) + e.stats.setMaxFetchAndProbeTime(t) }() } @@ -1043,6 +1045,20 @@ type hashJoinRuntimeStats struct { fetchAndProbe int64 probe int64 concurrent int + mu struct { + sync.Mutex + maxFetchAndProbe time.Duration + } +} + +func (e *hashJoinRuntimeStats) setMaxFetchAndProbeTime(t time.Duration) { + if t > e.mu.maxFetchAndProbe { + e.mu.Lock() + if t > e.mu.maxFetchAndProbe { + e.mu.maxFetchAndProbe = t + } + e.mu.Unlock() + } } func (e *hashJoinRuntimeStats) String() string { @@ -1062,6 +1078,8 @@ func (e *hashJoinRuntimeStats) String() string { buf.WriteString(strconv.Itoa(e.concurrent)) buf.WriteString(", total:") buf.WriteString(time.Duration(e.fetchAndProbe).String()) + buf.WriteString(", max:") + buf.WriteString(e.mu.maxFetchAndProbe.String()) buf.WriteString(", probe:") buf.WriteString(time.Duration(e.probe).String()) buf.WriteString(", fetch:") diff --git a/executor/join_test.go b/executor/join_test.go index 7e63dc6190485..c971db26f515e 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2307,5 +2307,5 @@ func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) { rows = tk.MustQuery("explain analyze select /*+ HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() c.Assert(len(rows), Equals, 7) c.Assert(rows[0][0], Matches, "HashJoin.*") - c.Assert(rows[0][5], Matches, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, probe:.*, fetch:.*}") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, max:.*, probe:.*, fetch:.*}") } From 55a9714ad91c1bcf484996e2352b2cc43e180212 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 7 Sep 2020 11:11:18 +0800 Subject: [PATCH 8/8] fix race Signed-off-by: crazycs520 --- executor/join.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/executor/join.go b/executor/join.go index 2e67443cbf43b..31cbed3e28049 100644 --- a/executor/join.go +++ b/executor/join.go @@ -418,7 +418,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { t := time.Since(start) atomic.AddInt64(&e.stats.probe, probeTime) atomic.AddInt64(&e.stats.fetchAndProbe, int64(t)) - e.stats.setMaxFetchAndProbeTime(t) + e.stats.setMaxFetchAndProbeTime(int64(t)) }() } @@ -1045,19 +1045,18 @@ type hashJoinRuntimeStats struct { fetchAndProbe int64 probe int64 concurrent int - mu struct { - sync.Mutex - maxFetchAndProbe time.Duration - } + maxFetchAndProbe int64 } -func (e *hashJoinRuntimeStats) setMaxFetchAndProbeTime(t time.Duration) { - if t > e.mu.maxFetchAndProbe { - e.mu.Lock() - if t > e.mu.maxFetchAndProbe { - e.mu.maxFetchAndProbe = t +func (e *hashJoinRuntimeStats) setMaxFetchAndProbeTime(t int64) { + for { + value := atomic.LoadInt64(&e.maxFetchAndProbe) + if t <= value { + return + } + if atomic.CompareAndSwapInt64(&e.maxFetchAndProbe, value, t) { + return } - e.mu.Unlock() } } @@ -1079,7 +1078,7 @@ func (e *hashJoinRuntimeStats) String() string { buf.WriteString(", total:") buf.WriteString(time.Duration(e.fetchAndProbe).String()) buf.WriteString(", max:") - buf.WriteString(e.mu.maxFetchAndProbe.String()) + buf.WriteString(time.Duration(atomic.LoadInt64(&e.maxFetchAndProbe)).String()) buf.WriteString(", probe:") buf.WriteString(time.Duration(e.probe).String()) buf.WriteString(", fetch:")