Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42168][SQL][PYTHON][FOLLOW-UP] Test FlatMapCoGroupsInPandas with Window function #39752

Closed

Conversation

EnricoMi
Copy link
Contributor

What changes were proposed in this pull request?

This ports tests from #39717 in branch-3.2 to master.

Why are the changes needed?

To make sure this use case is tested.

Does this PR introduce any user-facing change?

No

How was this patch tested?

E2E test in test_pandas_cogrouped_map.py and analysis test in EnsureRequirementsSuite.scala.

…atMapCoGroupsInPandas (as in CoGroup)

Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala).

This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`.

The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.

```Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, sum}

val ids = 1000
val days = 1000
val parts = 10

val id_df = spark.range(ids)
val day_df = spark.range(days).withColumnRenamed("id", "day")
val id_day_df = id_df.join(day_df)
// these redundant aliases are needed to workaround bug SPARK-42132
val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache()
val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache()  //.withColumnRenamed("id", "id2")

// note the column order is different to the groupBy("id", "day") column order below
val window = Window.partitionBy("day", "id")

case class Key(id: BigInt, day: BigInt)
case class Value(id: BigInt, day: BigInt, side: String)
case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)

val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]

val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left)

df.explain()
df.show(5)
```

Output was
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
         +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
            +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                  +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0|  3|    0|     1|
|  0|  4|    0|     1|
|  0| 13|    1|     0|
|  0| 27|    0|     1|
|  0| 31|    0|     1|
+---+---+-----+------+
only showing top 5 rows
```

Output now is
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                     +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0| 13|    1|     1|
|  0| 63|    1|     1|
|  0| 89|    1|     1|
|  0| 95|    1|     1|
|  0| 96|    1|     1|
+---+---+-----+------+
only showing top 5 rows
```

Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: apache#32875

This fixes correctness.

A unit test in `EnsureRequirementsSuite`.

Closes apache#39717 from EnricoMi/branch-3.2-cogroup-window-bug.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@EnricoMi
Copy link
Contributor Author

@sunchao @HyukjinKwon can we port the tests from #39717 to master, and back port them to branch-3.4 and branch-3.3?

@sunchao
Copy link
Member

sunchao commented Jan 26, 2023

@EnricoMi could you fix the format error on test_pandas_cogrouped_map.py? after that we can merge this.

@EnricoMi
Copy link
Contributor Author

Done, thanks!

@sunchao sunchao closed this in d9ca982 Jan 27, 2023
@sunchao
Copy link
Member

sunchao commented Jan 27, 2023

Merged, thanks!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM, too. Thank you all.

@dongjoon-hyun
Copy link
Member

BTW, do we need this in branch-3.4, @sunchao ?

@EnricoMi
Copy link
Contributor Author

I'd appreciate if this test coverage would also make it into 3.4 and 3.3.

@sunchao
Copy link
Member

sunchao commented Jan 27, 2023

Yes, we can get this in 3.3/3.4 too @EnricoMi . Could you create a PR?

EnricoMi added a commit to G-Research/spark that referenced this pull request Jan 28, 2023
…th Window function

This ports tests from apache#39717 in branch-3.2 to master.

To make sure this use case is tested.

No

E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`.

Closes apache#39752 from EnricoMi/branch-cogroup-window-bug-test.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Chao Sun <sunchao@apple.com>
@EnricoMi
Copy link
Contributor Author

Commit d9ca982 can be merged from master into branch-3.4 without conflicts. Do you need a PR from me for this?

Merging this into branch-3.3 requires resolving conflicts, done in #39781.

HyukjinKwon pushed a commit that referenced this pull request Jan 29, 2023
…as with Window function

### What changes were proposed in this pull request?
This ports tests from #39717 in branch-3.2 to branch-3.3. See #39752 (comment).

### Why are the changes needed?
To make sure this use case is tested.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`.

Closes #39781 from EnricoMi/branch-3.3-cogroup-window-bug-test.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@EnricoMi
Copy link
Contributor Author

@sunchao @HyukjinKwon can you merge this into branch-3.4 as well, please?

@HyukjinKwon
Copy link
Member

Mind creating a PR if you don't mind? I think that's easier since we're individually picking it up now.

EnricoMi added a commit to G-Research/spark that referenced this pull request Jan 30, 2023
…th Window function

### What changes were proposed in this pull request?
This ports tests from apache#39717 in branch-3.2 to master.

### Why are the changes needed?
To make sure this use case is tested.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`.

Closes apache#39752 from EnricoMi/branch-cogroup-window-bug-test.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Chao Sun <sunchao@apple.com>
@EnricoMi
Copy link
Contributor Author

@HyukjinKwon understood: #39803

HyukjinKwon pushed a commit that referenced this pull request Jan 30, 2023
…as with Window function

### What changes were proposed in this pull request?
This ports tests from #39717 in branch-3.2 to branch-3.4. See #39752 (comment).

### Why are the changes needed?
To make sure this use case is tested.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`.

Closes #39803 from EnricoMi/branch-3.4-cogroup-window-bug-test.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants