-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join #7904
Conversation
Current plan is to have separate iterators for left, right and full outer join, with some possible code-reuse / sharing of the iterators defined in the HashOuterJoin trait (I'll move them elsewhere). The key idea here is that once you've constructed the buffer for half of the left outer join then it doesn't really matter whether that buffer came from a hash map or was built up by scanning over the other sorted input. This should substantially reduce code complexity and will make it easier to spot the functionality which is only used for full outer join. I'll work on implementing this design tomorrow. |
leftResults.zipPartitions(rightResults) { (leftIter, rightIter) => | ||
joinType match { | ||
case LeftOuter => | ||
// TODO(josh): for SMJ we would buffer keys here: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, this class just copies ShuffledHashJoin; I'm going to edit it now to take advantage of the fact that the inputs are sorted.
I think that this could also benefit from randomized agreement tests, using SparkPlanTest. I'll look into adding a new OuterJoinSuite to do this (this could also be done as a followup during the QA period). |
@@ -61,6 +61,7 @@ case class SortMergeJoin( | |||
keys.map(SortOrder(_, Ascending)) | |||
|
|||
protected override def doExecute(): RDD[InternalRow] = { | |||
// TODO(josh): why is this copying necessary? | |||
val leftResults = left.execute().map(_.copy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that SortMergeJoin has this defensive copying on both inputs. I think that this is overly-conservative: we should only need to copy UnsafeRows rows that might be buffered and we should be able to perform that copying at the last possible moment when inserting the rows into the buffers. This means that the stream side of a left or right outer join should not need to be copied.
bda0101
to
dd8a94e
Compare
} | ||
|
||
protected[this] def isUnsafeMode: Boolean = { | ||
// TODO(josh): there is an existing bug here: this should also check whether unsafe mode | ||
// is enabled. also, the default for self.codegenEnabled looks inconsistent to me. | ||
(self.codegenEnabled && joinType != FullOuter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure there's a bug here (see above comment): if unsafe is disabled then we should never generate unsafe projections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be addressed by @davies' patch to consolidate the Unsafe and Codegen configurations.
…Ordering in more places.
Previously, the planner would always choose sort-merge-join for outer joins, even in cases where broadcast outer join could be used.
Test build #40252 has finished for PR 7904 at commit
|
Alright, looks like this is passing tests even with the defensive copying disabled, so I'm going to go ahead and clean up this patch to get it into a merge-ready state. |
Test build #40266 has finished for PR 7904 at commit
|
|
||
override def outputOrdering: Seq[SortOrder] = joinType match { | ||
// For left and right outer joins, the output is ordered by the streamed input's join keys. | ||
case LeftOuter => requiredOrders(leftKeys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick question about this, actually: if the join keys contain nulls then a left or right outer join may output rows with null join keys. Does this have any impact on the outputOrdering
(e.g. is it safe to say that it's still ordered by the left keys if those columns are nullable in the output)? Presumably this is safe, since those nulls were also ordered in the input, but I just want to confirm. @yhuai?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can say output rows are order by left keys (as long as our sort operator groups them together). Rows with null right keys will not be grouped.
48e49b9
to
f83b412
Compare
I've pulled #7985 into this patch in order to improve the unit test coverage of our physical join operators, including this new SMJ outer join. |
|
||
private def advanceRightUntilBoundConditionSatisfied(): Boolean = { | ||
var foundMatch: Boolean = false | ||
if (!foundMatch && rightIdx < smjScanner.getBufferedMatches.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we need to use while
loop at here?
It will be helpful to add comments to explain the flow of these two join operators. The flow is not obvious from the code. |
Test build #40345 has finished for PR 7904 at commit
|
Test build #40361 has finished for PR 7904 at commit
|
Jenkins, retest this please. |
I think the the workflow of these join operators is correct. We can merge it once it passes the tests. I will do a post-hoc then. |
Test build #40383 has finished for PR 7904 at commit
|
…t outer join This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join. It also refactors `SortMergeJoin` in order to improve performance and code clarity. Along the way, I also performed a couple pieces of minor cleanup and optimization: - Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins. - Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators. - Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings. This patch incorporates several ideas from adrian-wang's patch, #5717. Closes #5717. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #7904 from JoshRosen/outer-join-smj and squashes 1 commits. (cherry picked from commit 91e9389) Signed-off-by: Reynold Xin <rxin@databricks.com>
…t outer join This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join. It also refactors `SortMergeJoin` in order to improve performance and code clarity. Along the way, I also performed a couple pieces of minor cleanup and optimization: - Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins. - Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators. - Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings. This patch incorporates several ideas from adrian-wang's patch, apache#5717. Closes apache#5717. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes apache#7904 from JoshRosen/outer-join-smj and squashes 1 commits.
This patch adds a new
SortMergeOuterJoin
operator that performs left and right outer joins using sort merge join. It also refactorsSortMergeJoin
in order to improve performance and code clarity.Along the way, I also performed a couple pieces of minor cleanup and optimization:
HashJoin
physical planner rule toEquiJoinSelection
, since it's also used for non-hash joins.HashJoin
to better explain the precedence for choosing join operators.JoinSuite
to useSqlTestUtils.withConf
for changing SQLConf settings.This patch incorporates several ideas from @adrian-wang's patch, #5717.
Closes #5717.