-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution #32875
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #139680 has finished for PR 32875 at commit
|
Test build #140377 has finished for PR 32875 at commit
|
@sunchao, do both changes in this PR need to be done together? If not, then you may want to separate them into different PRs. If they are tied together, then noting why this needs to be one PR would be helpful. |
Thanks @rdblue for taking a look. Yes IMO these two are correlated, and I've just updated the "Why are the changes needed?" section above to clarify the motivation. Let me know if you think otherwise or have any comment on this. The current WIP PR is somewhat messy and I plan to go back to this soon after finishing the item on my hand. |
6c441b7
to
7190faa
Compare
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143425 has finished for PR 32875 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #143476 has finished for PR 32875 at commit
|
622467b
to
fb32460
Compare
Kubernetes integration test starting |
Test build #143750 has finished for PR 32875 at commit
|
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test starting |
…s contain all the join keys ### What changes were proposed in this pull request? This is a followup of #32875 . Basically #32875 did two improvements: 1. allow bucket join even if the bucket hash function is different from Spark's shuffle hash function 2. allow bucket join even if the hash partition keys are subset of join keys. The first improvement is the major target for implementing the SPIP "storage partition join". The second improvement is kind of a consequence of the framework refactor, which is not planned. This PR is to disable the second improvement by default, which may introduce perf regression if there are data skew without shuffle. We need more designs to enable this improvement, like checking the ndv. ### Why are the changes needed? Avoid perf regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Closes #35138 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…s contain all the join keys ### What changes were proposed in this pull request? This is a followup of apache#32875 . Basically apache#32875 did two improvements: 1. allow bucket join even if the bucket hash function is different from Spark's shuffle hash function 2. allow bucket join even if the hash partition keys are subset of join keys. The first improvement is the major target for implementing the SPIP "storage partition join". The second improvement is kind of a consequence of the framework refactor, which is not planned. This PR is to disable the second improvement by default, which may introduce perf regression if there are data skew without shuffle. We need more designs to enable this improvement, like checking the ndv. ### Why are the changes needed? Avoid perf regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Closes apache#35138 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…-partitioning requirement ### What changes were proposed in this pull request? This is a followup of #32875 . This PR updates `ValidateRequirements` to match the new change in the partitioning-distribution framework, and check the co-partitioning requirement for join nodes. ### Why are the changes needed? Fix bugs in `ValidateRequirements` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? a new test suite Closes #35225 from cloud-fan/follow. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sorry for the post-review. I didn't know this PR may affect streaming query and indicated later. I discussed with @cloud-fan about this change, and we are concerned about any possibility on skipping shuffle against grouping keys in stateful operators, "including stream-stream join". In Structured Streaming, state is partitioned with grouping keys based on Spark's internal hash function, and the number of partition is static. That said, if Spark does not respect the distribution of state against stateful operator, it leads to correctness problem. So please consider that same key should be co-located for three aspects (left, right, state) in stream-stream join. It's going to apply the same for non-join case, e.g. aggregation against bucket table. other stateful operators will have two aspects, (key, state). In short, state partition must be considered. |
@sunchao can we add back |
@HeartSaVioR no worries, I should have pinged you too :)
Could you give me a concrete example of this? Currently the rule only skips shuffle in join if both sides report the same distribution. Also, with the first follow-up by @cloud-fan I think we've already restored the previous behavior. I'm no Spark streaming expert so still trying to know more about the problem here. :) |
Actually it's hypothetical one; I tried to reproduce it but you may see #35341 that I failed to reproduce with built-in sources.
If there is any chance for both sides to change the distributions altogether, rule will skip shuffle in join since they are still having same distribution, but stream-stream join should read from state as well which may be partitioned in different way and the partitioning is not flexible. That said, state partitioning cannot be changed during query lifetime, at least for now. And we don't have a way to maintain the information of state partition (we only ensure the number of partitions of state, via sticking the config value for the number of "shuffle" partition), so the safest way for state is to constrain the state partitioning as Spark's hash partitioning. The thing is the hash function - even if source is partitioned/bucketed in same criteria (columns), the hash function and the number of partitions must be same as Spark's state as well for stateful operators. That said, at least as of now, stateful operators cannot leverage the benefits on source side distribution. There's a room to improve this, like let state follow the distribution of inputs, but also store the distribution info to metadata so that the stateful operator forces the state distribution (triggering shuffle) if they are different in future runs.
Could you please refer the commit/PR for this? |
Thanks @HeartSaVioR . The PR I mentioned is #35138. To my understanding, Spark also skips shuffle for stream-stream join before this PR, when the distribution (bucket) keys exactly match join keys, so it seems the potential issue you mentioned above could occur before this PR too? |
Oh well... I may need to try hacking the file stream source to support bucketing (regardless of whether it works correctly or not) and check the physical plan. cc. @cloud-fan Could you please help triage that the problem may exist even before this PR? Would using HashClusteredDistribution "force" using Spark's internal hash function on distribution? |
I think this PR introduced two things:
So if we speak of the current master branch, I feel it should not break anything for streaming use cases.
|
This already assumes DSv2 bucketing algorithm can be different from Spark's one and Spark avoids shuffle for this case. It is of course a great improvement in general, but in streaming context, state partitioning is not considered here. Given state partitioning is not flexible and we are yet to make metadata on state to have the information of the partitioning, our major assumption of the state partitioning is using Spark's internal hash function with the number of shuffle partition as the number of partition. If there is any case the assumption can be broken, we are yet to allow the case for streaming query. That said, there's a room for improvement.
I meant shuffle must be performed for stream-stream join if the source doesn't follow the Spark's internal hash function, to retain the major assumption. Same for other stateful operators. That said, we may have already broken these cases since we didn't change these operators in this PR. I assume it was due to interchangeability between ClusteredDistribution and HashClusteredDistribution - in older version of Spark I found no difference between twos. |
I think this is kind of a potential bug. Let's say that we have 2 tables that can report hash partitioning optionally (e.g. controlled by a flag). Assume a streaming query is first run with the flag off, which means the tables do not report hash partitioning, then Spark will add shuffles before the stream-stream join, and the join state (steaming checkpoint) is partitioned by Spark's murmur3 hash function. Then we restart the streaming query with the flag on, and the 2 tables report hash partitioning (not the same as Spark's murmur3). Spark will not add shuffles before stream-stream join this time, and leads to wrong result, because the left/right join child is not co-partitioned with the join state in the previous run. |
Yes, since we can't change the state partitioning, state partitioning must be considered as the first priority for stateful operators - even we make some improvements here (having information of physical partitioning of state) and feel safer to store state as same as source's partition, the operator must request state's partitioning so that it doesn't break anything when source has changed. |
Thanks! Yes this makes sense, I spoke with @viirya offline today and he also pointed this potential issue. Let me work on a fix. I'm thinking whether there is anyway to treat streaming case specially in EnsureRequirements instead of reintroducing HashClusteredDistribution though. |
I would suspect we will do the same mistake unintentionally if we don't explicitly call out. Please bring back the way for "operators" to explicitly require Spark's internal hash partition on specifying requirement on distribution. This would be future-proof when the state partitioning can be flexible - at that time the operators would require Spark to fit to the partitioning of state (even if it's not Spark's internal hash partition). |
When we specify HashClusteredDistribution on stateful operator, there are major assumptions that
(I think we may be better to leave code comment for above to prevent the changes against HashClusteredDistribution.) Let's say, the child operator is range partitioned and we add stateful operator with ClusteredDistribution as required distribution. The range partitioning can satisfy ClusteredDistribution but the physical partitioning of the child is totally different with state and it leads correctness issue (even silently). Seems like DataSourcePartitioning doesn't allow the partitioning from data source to be satisfy HashClusteredDistribution - it only checks with ClusteredDistribution. This must not be changed unless the partitioning from data source guarantees the same physical partitioning with Spark's internal hash partitioning, which we don't have any way to guarantee it with the interface of Partitioning. EDIT: If we want to have a kind of HashClusteredDistribution which allows partitioning using any hash function, we will have to explicitly separate both. |
That said, I see other missing spots in stateful operators using ClusteredDistribution (except stream-stream join), so unfortunately it seems to be a long-standing issue. Once we revive HashClusteredDistribution, I'll fix the stateful operators to use it. |
One question @cloud-fan : is this already a correctness issue previously? say if one side of join reports Thanks @HeartSaVioR for your comments, duly noted. Let me bring back
That's correct. |
If we feel the name of HashClusteredDistribution is too generic to infer that it's tightly coupled with Spark's hash partitioning, it might not be a crazy idea we add some prefix in the classname to make it clear. Same may apply to HashPartitioning. |
@@ -87,31 +89,6 @@ case class ClusteredDistribution( | |||
} | |||
} | |||
|
|||
/** | |||
* Represents data where tuples have been clustered according to the hash of the given | |||
* `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized we even documented the characteristic I mentioned.
That said, I'd slightly in favor of be very clear about specialization of HashClusteredDistribution & HashPartitioning on Spark internal, via prefix on naming. The name seems to be too general and no one would know about the characteristic unless reading through the classdoc carefully. And it would be very confusing when someone finds a needs to have "general" HashClusteredDistribution & HashPartitioning and somehow finds these classes.
I confirmed that StreamingAggregation has same problem with stream-stream join problem described in SPARK-24588. Test code:
Output:
Note that there was only a single shuffle performed via While this seems OK and produces correct output, we can modify the query in various ways to break the query in further run - 1) remove The problem persisted on all stateful operators (otherwise this PR had to touch more places). Since HashClusteredDistribution was introduced in SPARK-21865 (2.3.0), Spark 2.3.0+ would have this problem. |
I just took the step on the work - please review #35419 |
…atMapCoGroupsInPandas (as in CoGroup) ### What changes were proposed in this pull request? Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala). ### Why are the changes needed? This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`. The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2. ```Scala import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col, lit, sum} val ids = 1000 val days = 1000 val parts = 10 val id_df = spark.range(ids) val day_df = spark.range(days).withColumnRenamed("id", "day") val id_day_df = id_df.join(day_df) // these redundant aliases are needed to workaround bug SPARK-42132 val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache() val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache() //.withColumnRenamed("id", "id2") // note the column order is different to the groupBy("id", "day") column order below val window = Window.partitionBy("day", "id") case class Key(id: BigInt, day: BigInt) case class Value(id: BigInt, day: BigInt, side: String) case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt) val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value] val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum] val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left) df.explain() df.show(5) ``` Output was ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 3| 0| 1| | 0| 4| 0| 1| | 0| 13| 1| 0| | 0| 27| 0| 1| | 0| 31| 0| 1| +---+---+-----+------+ only showing top 5 rows ``` Output now is ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118] +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 13| 1| 1| | 0| 63| 1| 1| | 0| 89| 1| 1| | 0| 95| 1| 1| | 0| 96| 1| 1| +---+---+-----+------+ only showing top 5 rows ``` Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: #32875 ### Does this PR introduce _any_ user-facing change? This fixes correctness. ### How was this patch tested? A unit test in `EnsureRequirementsSuite`. Closes #39717 from EnricoMi/branch-3.2-cogroup-window-bug. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…atMapCoGroupsInPandas (as in CoGroup) Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala). This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`. The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2. ```Scala import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col, lit, sum} val ids = 1000 val days = 1000 val parts = 10 val id_df = spark.range(ids) val day_df = spark.range(days).withColumnRenamed("id", "day") val id_day_df = id_df.join(day_df) // these redundant aliases are needed to workaround bug SPARK-42132 val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache() val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache() //.withColumnRenamed("id", "id2") // note the column order is different to the groupBy("id", "day") column order below val window = Window.partitionBy("day", "id") case class Key(id: BigInt, day: BigInt) case class Value(id: BigInt, day: BigInt, side: String) case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt) val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value] val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum] val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left) df.explain() df.show(5) ``` Output was ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 3| 0| 1| | 0| 4| 0| 1| | 0| 13| 1| 0| | 0| 27| 0| 1| | 0| 31| 0| 1| +---+---+-----+------+ only showing top 5 rows ``` Output now is ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118] +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 13| 1| 1| | 0| 63| 1| 1| | 0| 89| 1| 1| | 0| 95| 1| 1| | 0| 96| 1| 1| +---+---+-----+------+ only showing top 5 rows ``` Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: apache#32875 This fixes correctness. A unit test in `EnsureRequirementsSuite`. Closes apache#39717 from EnricoMi/branch-3.2-cogroup-window-bug. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…atMapCoGroupsInPandas (as in CoGroup) ### What changes were proposed in this pull request? Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala). ### Why are the changes needed? This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`. The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2. ```Scala import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col, lit, sum} val ids = 1000 val days = 1000 val parts = 10 val id_df = spark.range(ids) val day_df = spark.range(days).withColumnRenamed("id", "day") val id_day_df = id_df.join(day_df) // these redundant aliases are needed to workaround bug SPARK-42132 val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache() val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache() //.withColumnRenamed("id", "id2") // note the column order is different to the groupBy("id", "day") column order below val window = Window.partitionBy("day", "id") case class Key(id: BigInt, day: BigInt) case class Value(id: BigInt, day: BigInt, side: String) case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt) val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value] val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum] val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left) df.explain() df.show(5) ``` Output was ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 3| 0| 1| | 0| 4| 0| 1| | 0| 13| 1| 0| | 0| 27| 0| 1| | 0| 31| 0| 1| +---+---+-----+------+ only showing top 5 rows ``` Output now is ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118] +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 13| 1| 1| | 0| 63| 1| 1| | 0| 89| 1| 1| | 0| 95| 1| 1| | 0| 96| 1| 1| +---+---+-----+------+ only showing top 5 rows ``` Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: apache#32875 ### Does this PR introduce _any_ user-facing change? This fixes correctness. ### How was this patch tested? A unit test in `EnsureRequirementsSuite`. Closes apache#39717 from EnricoMi/branch-3.2-cogroup-window-bug. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR proposes the following:
ShuffleSpec
which is used inEnsureRequirements
when the node has more than one children and serves two purposes: 1) compare all children and check if they are compatible w.r.t partitioning & distribution, 2) create a new partitioning to re-shuffle the other side in case they are not compatible.HashClusteredDistribution
and replace its usages withClusteredDistribution
.Under the new mechanism, when
EnsureRequirements
check whether shuffles are required for a plan node with >1 children, it does the following:Why are the changes needed?
Spark currently only allow bucket join when the set of cluster keys from output partitioning exactly match the set of join keys from the required distribution. For instance, in the following:
bucket join will only be triggered if both
A
andB
are bucketed on columnsc1
andc2
, in which case Spark will avoid shuffling both sides of the join.The above requirement, however, is too strict, as shuffle can also be avoided if both
A
andB
are bucketed on either columnc1
orc2
. That is, if all rows that have the same value in columnc1
are clustered into the same partition, then all rows have the same values in columnc1
andc2
are also clustered into the same partition.In order to allow this, we'll need to change the logic of deciding whether two sides of a join operator are "co-partitioned". Currently, this is done by checking each side's output partitioning against its required distribution separately, using
Partitioning.satisfies
method. SinceHashClusteredDistribution
requires aHashPartitioning
to have the exact match on the cluster keys, this can be done in isolation without looking at the other side's output partitioning and required distribution.However, the approach is no longer valid if we are going to relax the above constraint, as we need to compare the output partitioning and required distribution on both sides. For instance, in the above example, if
A
is bucketed onc1
whileB
is bucketed onc2
, we may need to do the following check:A.c1
andB.c2
is used in the join keys (e.g., position 0 and 1 respectively)In order to achieve the above, this proposes the following:
A similar API is also required if we are going to support DSv2
DataSourcePartitioning
as output partitioning in bucket join scenario, or support custom hash functions such asHiveHash
for bucketing. With the former, even if bothA
andB
are partitioned on columnsc1
andc2
in the above example, they could be partitioned via different transform expressions, e.g.,A
is on(bucket(32, c1), day(c2)
whileB
is on(bucket(32, c1), hour(c2)
. This means we'll need to compare the partitioning from both sides of the join which makes the current approach withPartitioning.satisfies
insufficient. The same APIisCompatibleWith
can potentially be reused for the purpose.Does this PR introduce any user-facing change?
Yes, now bucket join will be enabled for more cases as mentioned above.
How was this patch tested?
ShuffleSpecSuite
EnsureRequirementsSuite
.