From ea53416ee578fb00933ae734dd4c0851e937228e Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 23 Nov 2022 17:13:37 +0800 Subject: [PATCH 1/4] executor: introduce build worker --- executor/benchmark_test.go | 23 +++++++------ executor/builder.go | 65 ++++++++++++++++++++++--------------- executor/hash_table.go | 2 +- executor/hash_table_test.go | 2 +- executor/join.go | 60 ++++++++++++---------------------- 5 files changed, 73 insertions(+), 79 deletions(-) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 0968c629b4b0d..f189c5ad31a27 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -906,13 +906,13 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) joinSchema.Append(cols1...) } - joinKeys := make([]*expression.Column, 0, len(testCase.keyIdx)) + joinKeysColIdx := make([]int, 0, len(testCase.keyIdx)) for _, keyIdx := range testCase.keyIdx { - joinKeys = append(joinKeys, cols0[keyIdx]) + joinKeysColIdx = append(joinKeysColIdx, keyIdx) } - probeKeys := make([]*expression.Column, 0, len(testCase.keyIdx)) + probeKeysColIdx := make([]int, 0, len(testCase.keyIdx)) for _, keyIdx := range testCase.keyIdx { - probeKeys = append(probeKeys, cols1[keyIdx]) + probeKeysColIdx = append(probeKeysColIdx, keyIdx) } e := &HashJoinExec{ baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec), @@ -921,19 +921,21 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) isOuterJoin: false, useOuterToBuild: testCase.useOuterToBuild, concurrency: uint(testCase.concurrency), + probeTypes: retTypes(outerExec), + buildTypes: retTypes(innerExec), }, probeSideTupleFetcher: &probeSideTupleFetcher{ probeSideExec: outerExec, }, - probeWorkers: make([]probeWorker, testCase.concurrency), - buildKeys: joinKeys, - probeKeys: probeKeys, - buildSideExec: innerExec, - buildSideEstCount: float64(testCase.rows), + probeWorkers: make([]*probeWorker, testCase.concurrency), + buildWorker: &buildWorker{ + buildKeyColIdx: joinKeysColIdx, + buildSideExec: innerExec, + }, } childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema()) - defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len()) + defaultValues := make([]types.Datum, e.buildWorker.buildSideExec.Schema().Len()) lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec) for i := uint(0); i < e.concurrency; i++ { e.probeWorkers[i].workerID = i @@ -941,6 +943,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx e.probeWorkers[i].joiner = newJoiner(testCase.ctx, e.joinType, true, defaultValues, nil, lhsTypes, rhsTypes, childrenUsedSchema, false) + e.probeWorkers[i].probeKeyColIdx = probeKeysColIdx } memLimit := int64(-1) if testCase.disk { diff --git a/executor/builder.go b/executor/builder.go index eb8825dd0ce99..aaabafcc00d6c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1400,17 +1400,6 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu return e } -func (b *executorBuilder) buildSideEstCount(v *plannercore.PhysicalHashJoin) float64 { - buildSide := v.Children()[v.InnerChildIdx] - if v.UseOuterToBuild { - buildSide = v.Children()[1-v.InnerChildIdx] - } - if buildSide.Stats().HistColl == nil || buildSide.Stats().HistColl.Pseudo { - return 0.0 - } - return buildSide.StatsCount() -} - func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executor { leftExec := b.build(v.Children()[0]) if b.err != nil { @@ -1425,6 +1414,8 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo e := &HashJoinExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec), probeSideTupleFetcher: &probeSideTupleFetcher{}, + probeWorkers: make([]*probeWorker, v.Concurrency), + buildWorker: &buildWorker{}, hashJoinCtx: &hashJoinCtx{ isOuterJoin: v.JoinType.IsOuterJoin(), useOuterToBuild: v.UseOuterToBuild, @@ -1449,15 +1440,17 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo leftIsBuildSide := true e.isNullEQ = v.IsNullEQ + var probeKeys, probeNAKeys, buildKeys, buildNAKeys []*expression.Column + var buildSideExec Executor if v.UseOuterToBuild { // update the buildSideEstCount due to changing the build side if v.InnerChildIdx == 1 { - e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + buildSideExec, buildKeys, buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys e.outerFilter = v.LeftConditions } else { - e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + buildSideExec, buildKeys, buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys e.outerFilter = v.RightConditions leftIsBuildSide = false } @@ -1466,30 +1459,48 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo } } else { if v.InnerChildIdx == 0 { - e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + buildSideExec, buildKeys, buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys e.outerFilter = v.RightConditions } else { - e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + buildSideExec, buildKeys, buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys e.outerFilter = v.LeftConditions leftIsBuildSide = false } if defaultValues == nil { - defaultValues = make([]types.Datum, e.buildSideExec.Schema().Len()) + defaultValues = make([]types.Datum, buildSideExec.Schema().Len()) } } + probeKeyColIdx := make([]int, len(probeKeys)) + probeNAKeColIdx := make([]int, len(probeNAKeys)) + buildKeyColIdx := make([]int, len(buildKeys)) + buildNAKeyColIdx := make([]int, len(buildNAKeys)) + for i := range buildKeys { + buildKeyColIdx[i] = buildKeys[i].Index + } + for i := range buildNAKeys { + buildNAKeyColIdx[i] = buildNAKeys[i].Index + } + for i := range probeKeys { + probeKeyColIdx[i] = probeKeys[i].Index + } + for i := range probeNAKeys { + probeNAKeColIdx[i] = probeNAKeys[i].Index + } isNAJoin := len(v.LeftNAJoinKeys) > 0 - e.buildSideEstCount = b.buildSideEstCount(v) childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) - e.probeWorkers = make([]probeWorker, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx - e.probeWorkers[i].workerID = i - e.probeWorkers[i].sessCtx = e.ctx - e.probeWorkers[i].joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, - v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin) + e.probeWorkers[i] = &probeWorker{ + hashJoinCtx: e.hashJoinCtx, + workerID: i, + sessCtx: e.ctx, + joiner: newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin), + probeKeyColIdx: probeKeyColIdx, + probeNAKeyColIdx: probeNAKeColIdx, + } } + e.buildWorker.buildSideExec = buildSideExec e.hashJoinCtx.isNullAware = isNAJoin executorCountHashJoinExec.Inc() diff --git a/executor/hash_table.go b/executor/hash_table.go index b7c875148bffa..2ba840d04fdc9 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -117,7 +117,7 @@ type hashRowContainer struct { chkBuf *chunk.Chunk } -func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { +func newHashRowContainer(sCtx sessionctx.Context, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { maxChunkSize := sCtx.GetSessionVars().MaxChunkSize rc := chunk.NewRowContainer(allTypes, maxChunkSize) c := &hashRowContainer{ diff --git a/executor/hash_table_test.go b/executor/hash_table_test.go index 3b4a4acee5284..0a387e0e7e5b6 100644 --- a/executor/hash_table_test.go +++ b/executor/hash_table_test.go @@ -127,7 +127,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool) for i := 0; i < numRows; i++ { hCtx.hashVals = append(hCtx.hashVals, hashFunc()) } - rowContainer := newHashRowContainer(sctx, 0, hCtx, colTypes) + rowContainer := newHashRowContainer(sctx, hCtx, colTypes) copiedRC = rowContainer.ShallowCopy() tracker := rowContainer.GetMemTracker() tracker.SetLabel(memory.LabelForBuildSideResult) diff --git a/executor/join.go b/executor/join.go index 1c75252a5e876..b8f92fa61ddc0 100644 --- a/executor/join.go +++ b/executor/join.go @@ -80,8 +80,10 @@ type probeSideTupleFetcher struct { type probeWorker struct { hashJoinCtx *hashJoinCtx sessCtx sessionctx.Context + workerID uint - workerID uint + probeKeyColIdx []int + probeNAKeyColIdx []int // We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently buildSideRows []chunk.Row buildSideRowPtrs []chunk.RowPtr @@ -99,20 +101,20 @@ type probeWorker struct { probeResultCh chan *chunk.Chunk } +type buildWorker struct { + buildSideExec Executor + buildKeyColIdx []int + buildNAKeyColIdx []int +} + // HashJoinExec implements the hash join algorithm. type HashJoinExec struct { baseExecutor *hashJoinCtx probeSideTupleFetcher *probeSideTupleFetcher - probeWorkers []probeWorker - - buildSideExec Executor - buildSideEstCount float64 - probeKeys []*expression.Column - probeNAKeys []*expression.Column - buildKeys []*expression.Column - buildNAKeys []*expression.Column + probeWorkers []*probeWorker + buildWorker *buildWorker worker util.WaitGroupWrapper waiter util.WaitGroupWrapper @@ -204,12 +206,6 @@ func (e *HashJoinExec) Open(ctx context.Context) error { e.closeCh = make(chan struct{}) e.finished.Store(false) - if e.probeTypes == nil { - e.probeTypes = retTypes(e.probeSideTupleFetcher.probeSideExec) - } - if e.buildTypes == nil { - e.buildTypes = retTypes(e.buildSideExec) - } if e.runtimeStats != nil { e.stats = &hashJoinRuntimeStats{ concurrent: int(e.concurrency), @@ -311,8 +307,8 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu if e.finished.Load() { return } - chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool) - err = Next(ctx, e.buildSideExec, chk) + chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool) + err = Next(ctx, e.buildWorker.buildSideExec, chk) if err != nil { errCh <- errors.Trace(err) return @@ -373,19 +369,11 @@ func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { e.probeSideTupleFetcher.fetchProbeSideChunks(ctx, e.maxChunkSize) }, e.probeSideTupleFetcher.handleProbeSideFetcherPanic) - probeKeyColIdx := make([]int, len(e.probeKeys)) - probeNAKeColIdx := make([]int, len(e.probeNAKeys)) - for i := range e.probeKeys { - probeKeyColIdx[i] = e.probeKeys[i].Index - } - for i := range e.probeNAKeys { - probeNAKeColIdx[i] = e.probeNAKeys[i].Index - } for i := uint(0); i < e.concurrency; i++ { workerID := i e.worker.RunWithRecover(func() { defer trace.StartRegion(ctx, "HashJoinWorker").End() - e.probeWorkers[workerID].runJoinWorker(probeKeyColIdx, probeNAKeColIdx) + e.probeWorkers[workerID].runJoinWorker() }, e.probeWorkers[workerID].handleProbeWorkerPanic) } e.waiter.RunWithRecover(e.waitJoinWorkersAndCloseResultChan, nil) @@ -461,7 +449,7 @@ func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() { close(e.joinResultCh) } -func (w *probeWorker) runJoinWorker(probeKeyColIdx, probeNAKeyColIdx []int) { +func (w *probeWorker) runJoinWorker() { probeTime := int64(0) if w.hashJoinCtx.stats != nil { start := time.Now() @@ -488,8 +476,8 @@ func (w *probeWorker) runJoinWorker(probeKeyColIdx, probeNAKeyColIdx []int) { } hCtx := &hashContext{ allTypes: w.hashJoinCtx.probeTypes, - keyColIdx: probeKeyColIdx, - naKeyColIdx: probeNAKeyColIdx, + keyColIdx: w.probeKeyColIdx, + naKeyColIdx: w.probeNAKeyColIdx, } for ok := true; ok; { if w.hashJoinCtx.finished.Load() { @@ -1103,20 +1091,12 @@ func (w *probeWorker) join2ChunkForOuterHashJoin(probeSideChk *chunk.Chunk, hCtx func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { if !e.prepared { e.buildFinished = make(chan error, 1) - buildKeyColIdx := make([]int, len(e.buildKeys)) - for i := range e.buildKeys { - buildKeyColIdx[i] = e.buildKeys[i].Index - } - buildNAKeyColIdx := make([]int, len(e.buildNAKeys)) - for i := range e.buildNAKeys { - buildNAKeyColIdx[i] = e.buildNAKeys[i].Index - } hCtx := &hashContext{ allTypes: e.buildTypes, - keyColIdx: buildKeyColIdx, - naKeyColIdx: buildNAKeyColIdx, + keyColIdx: e.buildWorker.buildKeyColIdx, + naKeyColIdx: e.buildWorker.buildNAKeyColIdx, } - e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx, retTypes(e.buildSideExec)) + e.rowContainer = newHashRowContainer(e.ctx, hCtx, e.buildTypes) // we shallow copies rowContainer for each probe worker to avoid lock contention for i := uint(0); i < e.concurrency; i++ { if i == 0 { From 2c8b0e260f358be43743300ea342a59dc645ba79 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 28 Nov 2022 20:29:03 +0800 Subject: [PATCH 2/4] address comment --- executor/builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/builder.go b/executor/builder.go index aaabafcc00d6c..3d015849aa5ef 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1500,7 +1500,7 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo probeNAKeyColIdx: probeNAKeColIdx, } } - e.buildWorker.buildSideExec = buildSideExec + e.buildWorker.buildKeyColIdx, e.buildWorker.buildNAKeyColIdx, e.buildWorker.buildSideExec = buildKeyColIdx, buildNAKeyColIdx, buildSideExec e.hashJoinCtx.isNullAware = isNAJoin executorCountHashJoinExec.Inc() From ab231231f376a88f4e2bba59db419ca14212b976 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 28 Nov 2022 20:42:37 +0800 Subject: [PATCH 3/4] lint --- executor/benchmark_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index f189c5ad31a27..e0a38a73a884e 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -907,13 +907,9 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) } joinKeysColIdx := make([]int, 0, len(testCase.keyIdx)) - for _, keyIdx := range testCase.keyIdx { - joinKeysColIdx = append(joinKeysColIdx, keyIdx) - } + joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...) probeKeysColIdx := make([]int, 0, len(testCase.keyIdx)) - for _, keyIdx := range testCase.keyIdx { - probeKeysColIdx = append(probeKeysColIdx, keyIdx) - } + probeKeysColIdx = append(probeKeysColIdx, testCase.keyIdx...) e := &HashJoinExec{ baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec), hashJoinCtx: &hashJoinCtx{ From 420d7face6f67e13173af9a42ef160e7cbe55f4f Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 28 Nov 2022 22:23:37 +0800 Subject: [PATCH 4/4] fix ci --- executor/join.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/join.go b/executor/join.go index b8f92fa61ddc0..2db97a8d3b2a2 100644 --- a/executor/join.go +++ b/executor/join.go @@ -307,7 +307,7 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu if e.finished.Load() { return } - chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool) + chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildWorker.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool) err = Next(ctx, e.buildWorker.buildSideExec, chk) if err != nil { errCh <- errors.Trace(err)