-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32399][SQL] Full outer shuffled hash join #29342
Conversation
cc @cloud-fan and @sameeragarwal if you guys have time to take a look, thanks. |
Test build #127008 has finished for PR 29342 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
Test build #127065 has finished for PR 29342 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, this PR is trying to save the state of whether or not a row was matched inside the build side itself. Effectively making the build side be mutable.
I am curious if the approach of storing the 'matched rows' out of band was considered ? The join algorithm could be extended to keep say an auxiliary struct of matched keys instead of populating this on the build side ? Since the build side hash tables are open addressed arrays, this auxiliary struct might be a bitset that stores the matched indices.
In addition how do you account for this extra memory usage on the driver ? Is it possible that planner thinks that the query will "fit" and runs the query but it later on OOMs because of this extra "column" ?
PS: And also a big "Hello again!" :-P
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
@agrawaldevesh - thank you for warm welcome, and excited to discuss and collaborate again here!
Yes I agree that would a good optimization for space. TLDR is I think given this full outer shuffled hash join is a new feature, and we could keep it simple to begin with and optimize further if needed, a detailed comment here.
That's a good question. Currently planner does not account for this extra boolean value overhead per row. However currently for BHJ and SHJ, planner does not take into account for extra key overhead in hash-map as well, and it's just based on size of rows. So to improve planner side of code, we need more thought to how to improve it in the future. |
@cloud-fan, @maropu, @agrawaldevesh - addressed all comments, and the PR is ready for review again. Thanks. |
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
Test build #127211 has finished for PR 29342 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @c21, Thanks for adding more context around why it's difficult to add a separate data structure to track the matched/not-matched status of the row. I hadn't thought of the need for being spill aware, unsafe, etc and those are great reasons to embed it in the hash table itself.
Would you mind adding a short comment for this rationale in the HashedRelation ?
Thanks
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
@cloud-fan, @maropu, @agrawaldevesh and @viirya - addressed all new comments, and PR is ready for review. I get a sense that no major objections on overall approach, and we fixed multiple details and wording in previous iterations. Please take a look when you have time, thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code changes and non test code LGTM but I had some last few things to say wrt to testing.
Test build #127255 has finished for PR 29342 at commit
|
I have the same feeling as well. The current approach has a few drawbacks to me:
I like the idea from @agrawaldevesh , we can build a bitset at executor side. It's good to have a bitset based on |
Note: the mark row feature is only active for full outer shuffled hash join. It has nothing to do with broadcast cast join and driver side hash relation building. But if a java on-heap standard bitset is acceptable, I can change the PR with that approach. |
+1 for the new idea. I guess that the code change can be more clear with it. |
Yea let's use standard bitset. It's new code path anyway and we can improve later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not finish yet, will look at this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new approach obviously has gotten a lot more polish so it is more merge ready. But I would love to know just how bad the CPU regression is compared to the previous approach.
@c21, Can you please report on the CPU performance of the new and the old approaches -- I think it is worth looking at just how much of a cpu different there is.
To measure just the executor CPU, I use the code from https://github.com/LucaCanali/sparkMeasure .
Here is a gist that explains that my approach: https://gist.github.com/agrawaldevesh/db6bb10f9c4f1a26b9ee5058867501ac (the top of the file contains instructions on how to integrate it with the Benchmark framework that the join benchmark is using).
Thanks for this great improvement !. Move fast :-P
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Show resolved
Hide resolved
@agrawaldevesh - thanks for pointer of measuring metrics for query and script. I will take a look. I also have the plan to backport the PR in our internal fork. We don't have it internally, I just add the feature here inspired by discussion with @cloud-fan and Bart Samwell (who told me delta engine will have this feature as well). I will run with our production queries in more serious way and report CPU metrics back here. But I want to align with our expectation here, that it may take couple of days and it's not a blocker for merging this PR, thanks. |
Test build #127467 has finished for PR 29342 at commit
|
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Show resolved
Hide resolved
This improvement looks super nice! Thanks, @c21! Btw, I'm a bit afraid users won't notice this improvement, so how about describing something about it in
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, I'm a bit afraid users won't notice this improvement, so how about describing something about it in SQLConf?
@maropu - sure. I added comment for config spark.sql.join.preferSortMergeJoin
to explicitly mention SHJ supports all join types which SMJ supports:
When true, prefer sort merge join over shuffled hash join.
Note that shuffled hash join supports all join types (e.g. full outer)
that sort merge join supports.
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Show resolved
Hide resolved
/** | ||
* Returns the maximum number of allowed keys index. | ||
*/ | ||
def maxNumKeysIndex: Int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - per your comment in the other place, I take the current naming is okay as well, let me know if it's not the case, thanks.
Test build #127482 has finished for PR 29342 at commit
|
retest this please |
@agrawaldevesh - sorry for a separate irrelevant ping. It seems that An example of latest failed unit test is in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/127482/testReport/, with stack trace in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/127482/testReport/org.apache.spark.deploy/DecommissionWorkerSuite/decommission_workers_ensure_that_fetch_failures_lead_to_rerun/, thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@agrawaldevesh - sorry for a separate irrelevant ping. It seems that
DecommissionWorkerSuite
(added in #29014) was kind of flaky where it failed more than 3 times in testing for this PR (both jenkins and github actions). I can confirm the test failure is irrelevant as I don't touch any logic can affect the unit test and rerun of test succeeded.
I fully agree it is not relevant here. I have a fix in review for it: #29422. I expect to merge it in this week.
Test build #127483 has finished for PR 29342 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
Test build #127492 has finished for PR 29342 at commit
|
Thanks, @c21, for the great improvement! Merged to master. |
Thank you @maropu, @cloud-fan, @agrawaldevesh, and @viirya for all these insightful discussion and review! |
### What changes were proposed in this pull request? This is followup from #29342, where to do two things: * Per #29342 (comment), change from java `HashSet` to spark in-house `OpenHashSet` to track matched rows for non-unique join keys. I checked `OpenHashSet` implementation which is built from a key index (`OpenHashSet._bitset` as `BitSet`) and key array (`OpenHashSet._data` as `Array`). Java `HashSet` is built from `HashMap`, which stores value in `Node` linked list and by theory should have taken more memory than `OpenHashSet`. Reran the same benchmark query used in #29342, and verified the query has similar performance here between `HashSet` and `OpenHashSet`. * Track metrics of the extra data structure `BitSet`/`OpenHashSet` for full outer SHJ. This depends on above thing, because there seems no easy way to get java `HashSet` memory size. ### Why are the changes needed? To better surface the memory usage for full outer SHJ more accurately. This can help users/developers to debug/improve full outer SHJ. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unite test in `SQLMetricsSuite.scala` . Closes #29566 from c21/add-metrics. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
@c21 thanks for this great improvement - quick question on this
|
How do we know if join keys in a build(broadcasted) side don't exist in a stream side? If we can implement it cleanly, it looks okay to add it. |
@Tagar - the major blocker is to figure out the non-matched rows from broadcasted side, as @maropu said. We did some brainstorming internally and one idea can be to make one single task to output all non-matched rows after collecting information from all other tasks. |
What changes were proposed in this pull request?
Add support for full outer join inside shuffled hash join. Currently if the query is a full outer join, we only use sort merge join as the physical operator. However it can be CPU and IO intensive in case input table is large for sort merge join. Shuffled hash join on the other hand saves the sort CPU and IO compared to sort merge join, especially when table is large.
This PR implements the full outer join as followed:
Process rows from stream side by looking up hash relation, and mark the matched rows from build side by:
BitSet
is used to record matched rows from build side (key index
to represent each row)HashSet[Long]
is used to record matched rows from build side (key index
+value index
to represent each row).key index
is defined as the index into key addressing arraylongArray
inBytesToBytesMap
.value index
is defined as the iterator index of values for same key.Process rows from build side by iterating hash relation, and filter out rows from build side being looked up already (done in
ShuffledHashJoinExec.fullOuterJoin
)For context, this PR was originally implemented as followed (up to commit e332276):
ShuffledHashJoinExec.buildHashedRelation
andUnsafeHashedRelation.apply
).ShuffledHashJoinExec.fullOuterJoin
).ShuffledHashJoinExec.fullOuterJoin
).See discussion of pros and cons between these two approaches here, here and here.
TODO: codegen for full outer shuffled hash join can be implemented in another followup PR.
Why are the changes needed?
As implementation in this PR, full outer shuffled hash join will have overhead to iterate build side twice (once for building hash map, and another for outputting non-matching rows), and iterate stream side once. However, full outer sort merge join needs to iterate both sides twice, and sort the large table can be more CPU and IO intensive. So full outer shuffled hash join can be more efficient than sort merge join when stream side is much more larger than build side.
For example query below, full outer SHJ saved 30% wall clock time compared to full outer SMJ.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test in
JoinSuite.scala
,AbstractBytesToBytesMapSuite.java
andHashedRelationSuite.scala
.