Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32399][SQL] Full outer shuffled hash join #29342

Closed
wants to merge 11 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Aug 3, 2020

What changes were proposed in this pull request?

Add support for full outer join inside shuffled hash join. Currently if the query is a full outer join, we only use sort merge join as the physical operator. However it can be CPU and IO intensive in case input table is large for sort merge join. Shuffled hash join on the other hand saves the sort CPU and IO compared to sort merge join, especially when table is large.

This PR implements the full outer join as followed:

  • Process rows from stream side by looking up hash relation, and mark the matched rows from build side by:

    • for joining with unique key, a BitSet is used to record matched rows from build side (key index to represent each row)
    • for joining with non-unique key, a HashSet[Long] is used to record matched rows from build side (key index + value index to represent each row).
      key index is defined as the index into key addressing array longArray in BytesToBytesMap.
      value index is defined as the iterator index of values for same key.
  • Process rows from build side by iterating hash relation, and filter out rows from build side being looked up already (done in ShuffledHashJoinExec.fullOuterJoin)

For context, this PR was originally implemented as followed (up to commit e332276):

  1. Construct hash relation from build side, with extra boolean value at the end of row to track look up information (done in ShuffledHashJoinExec.buildHashedRelation and UnsafeHashedRelation.apply).
  2. Process rows from stream side by looking up hash relation, and mark the matched rows from build side be looked up (done in ShuffledHashJoinExec.fullOuterJoin).
  3. Process rows from build side by iterating hash relation, and filter out rows from build side being looked up already (done in ShuffledHashJoinExec.fullOuterJoin).

See discussion of pros and cons between these two approaches here, here and here.

TODO: codegen for full outer shuffled hash join can be implemented in another followup PR.

Why are the changes needed?

As implementation in this PR, full outer shuffled hash join will have overhead to iterate build side twice (once for building hash map, and another for outputting non-matching rows), and iterate stream side once. However, full outer sort merge join needs to iterate both sides twice, and sort the large table can be more CPU and IO intensive. So full outer shuffled hash join can be more efficient than sort merge join when stream side is much more larger than build side.

For example query below, full outer SHJ saved 30% wall clock time compared to full outer SMJ.

def shuffleHashJoin(): Unit = {
    val N: Long = 4 << 22
    withSQLConf(
      SQLConf.SHUFFLE_PARTITIONS.key -> "2",
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000") {
      codegenBenchmark("shuffle hash join", N) {
        val df1 = spark.range(N).selectExpr(s"cast(id as string) as k1")
        val df2 = spark.range(N / 10).selectExpr(s"cast(id * 10 as string) as k2")
        val df = df1.join(df2, col("k1") === col("k2"), "full_outer")
        df.noop()
    }
  }
}
Running benchmark: shuffle hash join
  Running case: shuffle hash join off
  Stopped after 2 iterations, 16602 ms
  Running case: shuffle hash join on
  Stopped after 5 iterations, 31911 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join off                              7900           8301         567          2.1         470.9       1.0X
shuffle hash join on                               6250           6382          95          2.7         372.5       1.3X

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit test in JoinSuite.scala, AbstractBytesToBytesMapSuite.java and HashedRelationSuite.scala.

@c21
Copy link
Contributor Author

c21 commented Aug 3, 2020

cc @cloud-fan and @sameeragarwal if you guys have time to take a look, thanks.

@SparkQA
Copy link

SparkQA commented Aug 4, 2020

Test build #127008 has finished for PR 29342 at commit 635f6fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class HashJoinedRow extends JoinedRow

@SparkQA
Copy link

SparkQA commented Aug 5, 2020

Test build #127065 has finished for PR 29342 at commit 01f1f04.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this PR is trying to save the state of whether or not a row was matched inside the build side itself. Effectively making the build side be mutable.

I am curious if the approach of storing the 'matched rows' out of band was considered ? The join algorithm could be extended to keep say an auxiliary struct of matched keys instead of populating this on the build side ? Since the build side hash tables are open addressed arrays, this auxiliary struct might be a bitset that stores the matched indices.

In addition how do you account for this extra memory usage on the driver ? Is it possible that planner thinks that the query will "fit" and runs the query but it later on OOMs because of this extra "column" ?

PS: And also a big "Hello again!" :-P

@c21
Copy link
Contributor Author

c21 commented Aug 7, 2020

@agrawaldevesh - thank you for warm welcome, and excited to discuss and collaborate again here!

I am curious if the approach of storing the 'matched rows' out of band was considered ? The join algorithm could be extended to keep say an auxiliary struct of matched keys instead of populating this on the build side ? Since the build side hash tables are open addressed arrays, this auxiliary struct might be a bitset that stores the matched indices.

Yes I agree that would a good optimization for space. TLDR is I think given this full outer shuffled hash join is a new feature, and we could keep it simple to begin with and optimize further if needed, a detailed comment here.

In addition how do you account for this extra memory usage on the driver ? Is it possible that planner thinks that the query will "fit" and runs the query but it later on OOMs because of this extra "column" ?

That's a good question. Currently planner does not account for this extra boolean value overhead per row. However currently for BHJ and SHJ, planner does not take into account for extra key overhead in hash-map as well, and it's just based on size of rows. So to improve planner side of code, we need more thought to how to improve it in the future.

@c21
Copy link
Contributor Author

c21 commented Aug 7, 2020

@cloud-fan, @maropu, @agrawaldevesh - addressed all comments, and the PR is ready for review again. Thanks.

@SparkQA
Copy link

SparkQA commented Aug 7, 2020

Test build #127211 has finished for PR 29342 at commit d4e0084.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @c21, Thanks for adding more context around why it's difficult to add a separate data structure to track the matched/not-matched status of the row. I hadn't thought of the need for being spill aware, unsafe, etc and those are great reasons to embed it in the hash table itself.

Would you mind adding a short comment for this rationale in the HashedRelation ?

Thanks

@c21
Copy link
Contributor Author

c21 commented Aug 10, 2020

@cloud-fan, @maropu, @agrawaldevesh and @viirya - addressed all new comments, and PR is ready for review. I get a sense that no major objections on overall approach, and we fixed multiple details and wording in previous iterations. Please take a look when you have time, thanks.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code changes and non test code LGTM but I had some last few things to say wrt to testing.

@SparkQA
Copy link

SparkQA commented Aug 10, 2020

Test build #127255 has finished for PR 29342 at commit 8b18f79.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

I am curious if the approach of storing the 'matched rows' out of band was considered ? The join algorithm could be extended to keep say an auxiliary struct of matched keys instead of populating this on the build side ? Since the build side hash tables are open addressed arrays, this auxiliary struct might be a bitset that stores the matched indices.

I have the same feeling as well. The current approach has a few drawbacks to me:

  1. It complicates the driver side hashed-relation building logic.
  2. It increases the size of the data to be broadcasted.
  3. It may waste space (every row has an extra boolean column, but not all of them will be updated)

I like the idea from @agrawaldevesh , we can build a bitset at executor side. It's good to have a bitset based on BytesToBytesMap, but I think a java standard bitset is fine. This is a new code path so we don't need to worry about perf regression.

@c21
Copy link
Contributor Author

c21 commented Aug 10, 2020

@cloud-fan -

It complicates the driver side hashed-relation building logic.
It increases the size of the data to be broadcasted.

Note: the mark row feature is only active for full outer shuffled hash join. It has nothing to do with broadcast cast join and driver side hash relation building.

But if a java on-heap standard bitset is acceptable, I can change the PR with that approach.
WDYT?

@viirya
Copy link
Member

viirya commented Aug 10, 2020

+1 for the new idea. I guess that the code change can be more clear with it.

@cloud-fan
Copy link
Contributor

Yea let's use standard bitset. It's new code path anyway and we can improve later.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not finish yet, will look at this later.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new approach obviously has gotten a lot more polish so it is more merge ready. But I would love to know just how bad the CPU regression is compared to the previous approach.

@c21, Can you please report on the CPU performance of the new and the old approaches -- I think it is worth looking at just how much of a cpu different there is.

To measure just the executor CPU, I use the code from https://github.com/LucaCanali/sparkMeasure .

Here is a gist that explains that my approach: https://gist.github.com/agrawaldevesh/db6bb10f9c4f1a26b9ee5058867501ac (the top of the file contains instructions on how to integrate it with the Benchmark framework that the join benchmark is using).

Thanks for this great improvement !. Move fast :-P

@c21
Copy link
Contributor Author

c21 commented Aug 14, 2020

@agrawaldevesh - thanks for pointer of measuring metrics for query and script. I will take a look. I also have the plan to backport the PR in our internal fork. We don't have it internally, I just add the feature here inspired by discussion with @cloud-fan and Bart Samwell (who told me delta engine will have this feature as well). I will run with our production queries in more serious way and report CPU metrics back here. But I want to align with our expectation here, that it may take couple of days and it's not a blocker for merging this PR, thanks.

@SparkQA
Copy link

SparkQA commented Aug 15, 2020

Test build #127467 has finished for PR 29342 at commit cf04e2f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Aug 15, 2020

This improvement looks super nice! Thanks, @c21! Btw, I'm a bit afraid users won't notice this improvement, so how about describing something about it in SQLConf?

.doc("When true, prefer sort merge join over shuffle hash join.")

Copy link
Contributor Author

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I'm a bit afraid users won't notice this improvement, so how about describing something about it in SQLConf?

@maropu - sure. I added comment for config spark.sql.join.preferSortMergeJoin to explicitly mention SHJ supports all join types which SMJ supports:

When true, prefer sort merge join over shuffled hash join. 
Note that shuffled hash join supports all join types (e.g. full outer) 
that sort merge join supports.

/**
* Returns the maximum number of allowed keys index.
*/
def maxNumKeysIndex: Int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - per your comment in the other place, I take the current naming is okay as well, let me know if it's not the case, thanks.

@SparkQA
Copy link

SparkQA commented Aug 15, 2020

Test build #127482 has finished for PR 29342 at commit 381cdbc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Aug 15, 2020

retest this please

@c21
Copy link
Contributor Author

c21 commented Aug 15, 2020

@agrawaldevesh - sorry for a separate irrelevant ping. It seems that DecommissionWorkerSuite (added in #29014) was kind of flaky where it failed more than 3 times in testing for this PR (both jenkins and github actions). I can confirm the test failure is irrelevant as I don't touch any logic can affect the unit test and rerun of test succeeded.

An example of latest failed unit test is in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/127482/testReport/, with stack trace in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/127482/testReport/org.apache.spark.deploy/DecommissionWorkerSuite/decommission_workers_ensure_that_fetch_failures_lead_to_rerun/, thanks.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrawaldevesh - sorry for a separate irrelevant ping. It seems that DecommissionWorkerSuite (added in #29014) was kind of flaky where it failed more than 3 times in testing for this PR (both jenkins and github actions). I can confirm the test failure is irrelevant as I don't touch any logic can affect the unit test and rerun of test succeeded.

I fully agree it is not relevant here. I have a fix in review for it: #29422. I expect to merge it in this week.

@SparkQA
Copy link

SparkQA commented Aug 16, 2020

Test build #127483 has finished for PR 29342 at commit 381cdbc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2020

Test build #127492 has finished for PR 29342 at commit 526709b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in 8f0fef1 Aug 16, 2020
@maropu
Copy link
Member

maropu commented Aug 16, 2020

Thanks, @c21, for the great improvement! Merged to master.

@c21
Copy link
Contributor Author

c21 commented Aug 16, 2020

Thank you @maropu, @cloud-fan, @agrawaldevesh, and @viirya for all these insightful discussion and review!

@c21 c21 deleted the full-outer-shj branch August 16, 2020 23:12
maropu pushed a commit that referenced this pull request Aug 29, 2020
### What changes were proposed in this pull request?

This is followup from #29342, where to do two things:
* Per #29342 (comment), change from java `HashSet` to spark in-house `OpenHashSet` to track matched rows for non-unique join keys. I checked `OpenHashSet` implementation which is built from a key index (`OpenHashSet._bitset` as `BitSet`) and key array (`OpenHashSet._data` as `Array`). Java `HashSet` is built from `HashMap`, which stores value in `Node` linked list and by theory should have taken more memory than `OpenHashSet`. Reran the same benchmark query used in #29342, and verified the query has similar performance here between `HashSet` and `OpenHashSet`.
* Track metrics of the extra data structure `BitSet`/`OpenHashSet` for full outer SHJ. This depends on above thing, because there seems no easy way to get java `HashSet` memory size.

### Why are the changes needed?

To better surface the memory usage for full outer SHJ more accurately.
This can help users/developers to debug/improve full outer SHJ.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unite test in `SQLMetricsSuite.scala` .

Closes #29566 from c21/add-metrics.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
@Tagar
Copy link

Tagar commented Oct 20, 2020

@c21 thanks for this great improvement - quick question on this
#29342 (comment)

Btw, is it worth testing with shuffle partitions = 1 ?

I don't think so, as if there's only one shuffle partition,
we should use broadcast hash join instead.

BHJ doesn't support full outer join currently. Would it be possible to improve BHJ similarly to support full outer join as it's done here for SHJ? Thanks!

@maropu
Copy link
Member

maropu commented Oct 21, 2020

How do we know if join keys in a build(broadcasted) side don't exist in a stream side? If we can implement it cleanly, it looks okay to add it.

@c21
Copy link
Contributor Author

c21 commented Oct 21, 2020

BHJ doesn't support full outer join currently. Would it be possible to improve BHJ similarly to support full outer join as it's done here for SHJ?

@Tagar - the major blocker is to figure out the non-matched rows from broadcasted side, as @maropu said. We did some brainstorming internally and one idea can be to make one single task to output all non-matched rows after collecting information from all other tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants