Skip to content

Commit

Permalink
executor: split hashjoin into workers(part1) (pingcap#39063)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored Nov 11, 2022
1 parent f5171e3 commit 42ff5e6
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 129 deletions.
10 changes: 6 additions & 4 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,24 +915,26 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
probeKeys = append(probeKeys, cols1[keyIdx])
}
e := &HashJoinExec{
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec),
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec),
probeSideTupleFetcher: probeSideTupleFetcher{
probeSideExec: outerExec,
},
concurrency: uint(testCase.concurrency),
joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin
isOuterJoin: false,
buildKeys: joinKeys,
probeKeys: probeKeys,
buildSideExec: innerExec,
probeSideExec: outerExec,
buildSideEstCount: float64(testCase.rows),
useOuterToBuild: testCase.useOuterToBuild,
}

childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema())
defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
e.joiners = make([]joiner, e.concurrency)
e.probeWorker.joiners = make([]joiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
e.probeWorker.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes, childrenUsedSchema, false)
}
memLimit := int64(-1)
Expand Down
14 changes: 7 additions & 7 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,25 +1419,25 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
// update the buildSideEstCount due to changing the build side
if v.InnerChildIdx == 1 {
e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.outerFilter = v.LeftConditions
} else {
e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.outerFilter = v.RightConditions
leftIsBuildSide = false
}
if defaultValues == nil {
defaultValues = make([]types.Datum, e.probeSideExec.Schema().Len())
defaultValues = make([]types.Datum, e.probeSideTupleFetcher.probeSideExec.Schema().Len())
}
} else {
if v.InnerChildIdx == 0 {
e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.outerFilter = v.RightConditions
} else {
e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.outerFilter = v.LeftConditions
leftIsBuildSide = false
}
Expand All @@ -1448,9 +1448,9 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
isNAJoin := len(v.LeftNAJoinKeys) > 0
e.buildSideEstCount = b.buildSideEstCount(v)
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
e.joiners = make([]joiner, e.concurrency)
e.probeWorker.joiners = make([]joiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
e.probeWorker.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin)
}
executorCountHashJoinExec.Inc()
Expand Down
Loading

0 comments on commit 42ff5e6

Please sign in to comment.