Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join #7904

Closed
wants to merge 62 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch adds a new SortMergeOuterJoin operator that performs left and right outer joins using sort merge join. It also refactors SortMergeJoin in order to improve performance and code clarity.

Along the way, I also performed a couple pieces of minor cleanup and optimization:

  • Rename the HashJoin physical planner rule to EquiJoinSelection, since it's also used for non-hash joins.
  • Rewrite the comment at the top of HashJoin to better explain the precedence for choosing join operators.
  • Update JoinSuite to use SqlTestUtils.withConf for changing SQLConf settings.

This patch incorporates several ideas from @adrian-wang's patch, #5717.

Closes #5717.

Review on Reviewable

@JoshRosen
Copy link
Contributor Author

Current plan is to have separate iterators for left, right and full outer join, with some possible code-reuse / sharing of the iterators defined in the HashOuterJoin trait (I'll move them elsewhere). The key idea here is that once you've constructed the buffer for half of the left outer join then it doesn't really matter whether that buffer came from a hash map or was built up by scanning over the other sorted input. This should substantially reduce code complexity and will make it easier to spot the functionality which is only used for full outer join.

I'll work on implementing this design tomorrow.

leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
joinType match {
case LeftOuter =>
// TODO(josh): for SMJ we would buffer keys here:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, this class just copies ShuffledHashJoin; I'm going to edit it now to take advantage of the fact that the inputs are sorted.

@JoshRosen
Copy link
Contributor Author

I think that this could also benefit from randomized agreement tests, using SparkPlanTest. I'll look into adding a new OuterJoinSuite to do this (this could also be done as a followup during the QA period).

@@ -61,6 +61,7 @@ case class SortMergeJoin(
keys.map(SortOrder(_, Ascending))

protected override def doExecute(): RDD[InternalRow] = {
// TODO(josh): why is this copying necessary?
val leftResults = left.execute().map(_.copy())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that SortMergeJoin has this defensive copying on both inputs. I think that this is overly-conservative: we should only need to copy UnsafeRows rows that might be buffered and we should be able to perform that copying at the last possible moment when inserting the rows into the buffers. This means that the stream side of a left or right outer join should not need to be copied.

}

protected[this] def isUnsafeMode: Boolean = {
// TODO(josh): there is an existing bug here: this should also check whether unsafe mode
// is enabled. also, the default for self.codegenEnabled looks inconsistent to me.
(self.codegenEnabled && joinType != FullOuter
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure there's a bug here (see above comment): if unsafe is disabled then we should never generate unsafe projections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be addressed by @davies' patch to consolidate the Unsafe and Codegen configurations.

@SparkQA
Copy link

SparkQA commented Aug 9, 2015

Test build #40252 has finished for PR 7904 at commit f701652.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Alright, looks like this is passing tests even with the defensive copying disabled, so I'm going to go ahead and clean up this patch to get it into a merge-ready state.

@SparkQA
Copy link

SparkQA commented Aug 9, 2015

Test build #40266 has finished for PR 7904 at commit f83b412.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


override def outputOrdering: Seq[SortOrder] = joinType match {
// For left and right outer joins, the output is ordered by the streamed input's join keys.
case LeftOuter => requiredOrders(leftKeys)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question about this, actually: if the join keys contain nulls then a left or right outer join may output rows with null join keys. Does this have any impact on the outputOrdering (e.g. is it safe to say that it's still ordered by the left keys if those columns are nullable in the output)? Presumably this is safe, since those nulls were also ordered in the input, but I just want to confirm. @yhuai?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can say output rows are order by left keys (as long as our sort operator groups them together). Rows with null right keys will not be grouped.

@JoshRosen
Copy link
Contributor Author

I've pulled #7985 into this patch in order to improve the unit test coverage of our physical join operators, including this new SMJ outer join.


private def advanceRightUntilBoundConditionSatisfied(): Boolean = {
var foundMatch: Boolean = false
if (!foundMatch && rightIdx < smjScanner.getBufferedMatches.length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we need to use while loop at here?

@yhuai
Copy link
Contributor

yhuai commented Aug 10, 2015

It will be helpful to add comments to explain the flow of these two join operators. The flow is not obvious from the code.

@SparkQA
Copy link

SparkQA commented Aug 11, 2015

Test build #40345 has finished for PR 7904 at commit 899dce2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SortMergeOuterJoin(

@SparkQA
Copy link

SparkQA commented Aug 11, 2015

Test build #40361 has finished for PR 7904 at commit e79909e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SortMergeOuterJoin(

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@yhuai
Copy link
Contributor

yhuai commented Aug 11, 2015

I think the the workflow of these join operators is correct. We can merge it once it passes the tests. I will do a post-hoc then.

@SparkQA
Copy link

SparkQA commented Aug 11, 2015

Test build #40383 has finished for PR 7904 at commit eabacca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SortMergeOuterJoin(

asfgit pushed a commit that referenced this pull request Aug 11, 2015
…t outer join

This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join.  It also refactors `SortMergeJoin` in order to improve performance and code clarity.

Along the way, I also performed a couple pieces of minor cleanup and optimization:

- Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins.
- Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators.
- Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings.

This patch incorporates several ideas from adrian-wang's patch, #5717.

Closes #5717.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904)
<!-- Reviewable:end -->

Author: Josh Rosen <joshrosen@databricks.com>
Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #7904 from JoshRosen/outer-join-smj and squashes 1 commits.

(cherry picked from commit 91e9389)
Signed-off-by: Reynold Xin <rxin@databricks.com>
@asfgit asfgit closed this in 91e9389 Aug 11, 2015
CodingCat pushed a commit to CodingCat/spark that referenced this pull request Aug 17, 2015
…t outer join

This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join.  It also refactors `SortMergeJoin` in order to improve performance and code clarity.

Along the way, I also performed a couple pieces of minor cleanup and optimization:

- Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins.
- Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators.
- Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings.

This patch incorporates several ideas from adrian-wang's patch, apache#5717.

Closes apache#5717.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904)
<!-- Reviewable:end -->

Author: Josh Rosen <joshrosen@databricks.com>
Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes apache#7904 from JoshRosen/outer-join-smj and squashes 1 commits.
@JoshRosen JoshRosen deleted the outer-join-smj branch October 16, 2015 21:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants