-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42168][SQL][PYTHON][FOLLOW-UP] Test FlatMapCoGroupsInPandas with Window function #39752
Conversation
…atMapCoGroupsInPandas (as in CoGroup) Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala). This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`. The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2. ```Scala import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col, lit, sum} val ids = 1000 val days = 1000 val parts = 10 val id_df = spark.range(ids) val day_df = spark.range(days).withColumnRenamed("id", "day") val id_day_df = id_df.join(day_df) // these redundant aliases are needed to workaround bug SPARK-42132 val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache() val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache() //.withColumnRenamed("id", "id2") // note the column order is different to the groupBy("id", "day") column order below val window = Window.partitionBy("day", "id") case class Key(id: BigInt, day: BigInt) case class Value(id: BigInt, day: BigInt, side: String) case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt) val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value] val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum] val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left) df.explain() df.show(5) ``` Output was ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 3| 0| 1| | 0| 4| 0| 1| | 0| 13| 1| 0| | 0| 27| 0| 1| | 0| 31| 0| 1| +---+---+-----+------+ only showing top 5 rows ``` Output now is ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118] +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 13| 1| 1| | 0| 63| 1| 1| | 0| 89| 1| 1| | 0| 95| 1| 1| | 0| 96| 1| 1| +---+---+-----+------+ only showing top 5 rows ``` Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: apache#32875 This fixes correctness. A unit test in `EnsureRequirementsSuite`. Closes apache#39717 from EnricoMi/branch-3.2-cogroup-window-bug. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@sunchao @HyukjinKwon can we port the tests from #39717 to master, and back port them to branch-3.4 and branch-3.3? |
@EnricoMi could you fix the format error on |
Done, thanks! |
Merged, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM, too. Thank you all.
BTW, do we need this in branch-3.4, @sunchao ? |
I'd appreciate if this test coverage would also make it into 3.4 and 3.3. |
Yes, we can get this in 3.3/3.4 too @EnricoMi . Could you create a PR? |
…th Window function This ports tests from apache#39717 in branch-3.2 to master. To make sure this use case is tested. No E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`. Closes apache#39752 from EnricoMi/branch-cogroup-window-bug-test. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Chao Sun <sunchao@apple.com>
…as with Window function ### What changes were proposed in this pull request? This ports tests from #39717 in branch-3.2 to branch-3.3. See #39752 (comment). ### Why are the changes needed? To make sure this use case is tested. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`. Closes #39781 from EnricoMi/branch-3.3-cogroup-window-bug-test. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@sunchao @HyukjinKwon can you merge this into branch-3.4 as well, please? |
Mind creating a PR if you don't mind? I think that's easier since we're individually picking it up now. |
…th Window function ### What changes were proposed in this pull request? This ports tests from apache#39717 in branch-3.2 to master. ### Why are the changes needed? To make sure this use case is tested. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`. Closes apache#39752 from EnricoMi/branch-cogroup-window-bug-test. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Chao Sun <sunchao@apple.com>
@HyukjinKwon understood: #39803 |
…as with Window function ### What changes were proposed in this pull request? This ports tests from #39717 in branch-3.2 to branch-3.4. See #39752 (comment). ### Why are the changes needed? To make sure this use case is tested. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`. Closes #39803 from EnricoMi/branch-3.4-cogroup-window-bug-test. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This ports tests from #39717 in branch-3.2 to master.
Why are the changes needed?
To make sure this use case is tested.
Does this PR introduce any user-facing change?
No
How was this patch tested?
E2E test in
test_pandas_cogrouped_map.py
and analysis test inEnsureRequirementsSuite.scala
.