Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42168][3.2][SQL][PYTHON] Fix required child distribution of FlatMapCoGroupsInPandas (as in CoGroup) #39717

Closed

Conversation

EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Jan 24, 2023

What changes were proposed in this pull request?

Make FlatMapCoGroupsInPandas (used by PySpark) report its required child distribution as HashClusteredDistribution, rather than ClusteredDistribution. That is the same distribution as reported by CoGroup (used by Scala).

Why are the changes needed?

This allows the EnsureRequirements rule to correctly recognizes that FlatMapCoGroupsInPandas requiring HashClusteredDistribution(id, day) is not compatible with HashPartitioning(day, id), while ClusteredDistribution(id, day) is compatible with HashPartitioning(day, id).

The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, sum}

val ids = 1000
val days = 1000
val parts = 10

val id_df = spark.range(ids)
val day_df = spark.range(days).withColumnRenamed("id", "day")
val id_day_df = id_df.join(day_df)
// these redundant aliases are needed to workaround bug SPARK-42132
val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache()
val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache()  //.withColumnRenamed("id", "id2")

// note the column order is different to the groupBy("id", "day") column order below
val window = Window.partitionBy("day", "id")

case class Key(id: BigInt, day: BigInt)
case class Value(id: BigInt, day: BigInt, side: String)
case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)

val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]

val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left)

df.explain()
df.show(5)

Output was

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
         +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
            +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                  +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0|  3|    0|     1|
|  0|  4|    0|     1|
|  0| 13|    1|     0|
|  0| 27|    0|     1|
|  0| 31|    0|     1|
+---+---+-----+------+
only showing top 5 rows

Output now is

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                     +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0| 13|    1|     1|
|  0| 63|    1|     1|
|  0| 89|    1|     1|
|  0| 95|    1|     1|
|  0| 96|    1|     1|
+---+---+-----+------+
only showing top 5 rows

Spark 3.3 reworked HashClusteredDistribution, and is not sensitive to using ClusteredDistribution: #32875

Does this PR introduce any user-facing change?

This fixes correctness.

How was this patch tested?

A unit test in EnsureRequirementsSuite.

@HyukjinKwon
Copy link
Member

cc @sunchao

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK to me, just a few questions:

  • We probably should change the title to use [3.2] instead of [3.3]?
  • Is it easy to add a E2E test for this? perhaps in test_pandas_udf_cogrouped_map.py?

@EnricoMi EnricoMi changed the title [SPARK-42168][3.3][SQL][PYTHON] Fix required child distribution of FlatMapCoGroupsInPandas (as in CoGroup) [SPARK-42168][3.2][SQL][PYTHON] Fix required child distribution of FlatMapCoGroupsInPandas (as in CoGroup) Jan 25, 2023
@github-actions github-actions bot added the CORE label Jan 25, 2023
@EnricoMi
Copy link
Contributor Author

@sunchao good catch!

I have renamed the PR and added the Python example as a unit test.

@HyukjinKwon
Copy link
Member

I am good w/ this. will defer to @sunchao

@EnricoMi
Copy link
Contributor Author

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@HyukjinKwon
Copy link
Member

Merged to branch-3.2.

HyukjinKwon pushed a commit that referenced this pull request Jan 26, 2023
…atMapCoGroupsInPandas (as in CoGroup)

### What changes were proposed in this pull request?
Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala).

### Why are the changes needed?
This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`.

The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.

```Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, sum}

val ids = 1000
val days = 1000
val parts = 10

val id_df = spark.range(ids)
val day_df = spark.range(days).withColumnRenamed("id", "day")
val id_day_df = id_df.join(day_df)
// these redundant aliases are needed to workaround bug SPARK-42132
val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache()
val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache()  //.withColumnRenamed("id", "id2")

// note the column order is different to the groupBy("id", "day") column order below
val window = Window.partitionBy("day", "id")

case class Key(id: BigInt, day: BigInt)
case class Value(id: BigInt, day: BigInt, side: String)
case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)

val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]

val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left)

df.explain()
df.show(5)
```

Output was
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
         +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
            +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                  +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0|  3|    0|     1|
|  0|  4|    0|     1|
|  0| 13|    1|     0|
|  0| 27|    0|     1|
|  0| 31|    0|     1|
+---+---+-----+------+
only showing top 5 rows
```

Output now is
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                     +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0| 13|    1|     1|
|  0| 63|    1|     1|
|  0| 89|    1|     1|
|  0| 95|    1|     1|
|  0| 96|    1|     1|
+---+---+-----+------+
only showing top 5 rows
```

Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: #32875

### Does this PR introduce _any_ user-facing change?
This fixes correctness.

### How was this patch tested?
A unit test in `EnsureRequirementsSuite`.

Closes #39717 from EnricoMi/branch-3.2-cogroup-window-bug.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
EnricoMi added a commit to G-Research/spark that referenced this pull request Jan 26, 2023
…atMapCoGroupsInPandas (as in CoGroup)

Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala).

This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`.

The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.

```Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, sum}

val ids = 1000
val days = 1000
val parts = 10

val id_df = spark.range(ids)
val day_df = spark.range(days).withColumnRenamed("id", "day")
val id_day_df = id_df.join(day_df)
// these redundant aliases are needed to workaround bug SPARK-42132
val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache()
val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache()  //.withColumnRenamed("id", "id2")

// note the column order is different to the groupBy("id", "day") column order below
val window = Window.partitionBy("day", "id")

case class Key(id: BigInt, day: BigInt)
case class Value(id: BigInt, day: BigInt, side: String)
case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)

val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]

val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left)

df.explain()
df.show(5)
```

Output was
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
         +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
            +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                  +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0|  3|    0|     1|
|  0|  4|    0|     1|
|  0| 13|    1|     0|
|  0| 27|    0|     1|
|  0| 31|    0|     1|
+---+---+-----+------+
only showing top 5 rows
```

Output now is
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                     +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0| 13|    1|     1|
|  0| 63|    1|     1|
|  0| 89|    1|     1|
|  0| 95|    1|     1|
|  0| 96|    1|     1|
+---+---+-----+------+
only showing top 5 rows
```

Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: apache#32875

This fixes correctness.

A unit test in `EnsureRequirementsSuite`.

Closes apache#39717 from EnricoMi/branch-3.2-cogroup-window-bug.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
sunchao pushed a commit that referenced this pull request Jan 27, 2023
…th Window function

### What changes were proposed in this pull request?
This ports tests from #39717 in branch-3.2 to master.

### Why are the changes needed?
To make sure this use case is tested.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`.

Closes #39752 from EnricoMi/branch-cogroup-window-bug-test.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Chao Sun <sunchao@apple.com>
@dongjoon-hyun
Copy link
Member

cc @kazuyukitanimura since this landed to branch-3.2.

EnricoMi added a commit to G-Research/spark that referenced this pull request Jan 28, 2023
…th Window function

This ports tests from apache#39717 in branch-3.2 to master.

To make sure this use case is tested.

No

E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`.

Closes apache#39752 from EnricoMi/branch-cogroup-window-bug-test.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Chao Sun <sunchao@apple.com>
HyukjinKwon pushed a commit that referenced this pull request Jan 29, 2023
…as with Window function

### What changes were proposed in this pull request?
This ports tests from #39717 in branch-3.2 to branch-3.3. See #39752 (comment).

### Why are the changes needed?
To make sure this use case is tested.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`.

Closes #39781 from EnricoMi/branch-3.3-cogroup-window-bug-test.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
EnricoMi added a commit to G-Research/spark that referenced this pull request Jan 30, 2023
…th Window function

### What changes were proposed in this pull request?
This ports tests from apache#39717 in branch-3.2 to master.

### Why are the changes needed?
To make sure this use case is tested.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`.

Closes apache#39752 from EnricoMi/branch-cogroup-window-bug-test.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Chao Sun <sunchao@apple.com>
HyukjinKwon pushed a commit that referenced this pull request Jan 30, 2023
…as with Window function

### What changes were proposed in this pull request?
This ports tests from #39717 in branch-3.2 to branch-3.4. See #39752 (comment).

### Why are the changes needed?
To make sure this use case is tested.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`.

Closes #39803 from EnricoMi/branch-3.4-cogroup-window-bug-test.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…atMapCoGroupsInPandas (as in CoGroup)

### What changes were proposed in this pull request?
Make `FlatMapCoGroupsInPandas` (used by PySpark) report its required child distribution as `HashClusteredDistribution`, rather than `ClusteredDistribution`. That is the same distribution as reported by `CoGroup` (used by Scala).

### Why are the changes needed?
This allows the `EnsureRequirements` rule to correctly recognizes that `FlatMapCoGroupsInPandas` requiring `HashClusteredDistribution(id, day)` is not compatible with `HashPartitioning(day, id)`, while `ClusteredDistribution(id, day)` is compatible with `HashPartitioning(day, id)`.

The following example returns an incorrect result in Spark 3.0, 3.1, and 3.2.

```Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, sum}

val ids = 1000
val days = 1000
val parts = 10

val id_df = spark.range(ids)
val day_df = spark.range(days).withColumnRenamed("id", "day")
val id_day_df = id_df.join(day_df)
// these redundant aliases are needed to workaround bug SPARK-42132
val left_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("left").as("side")).repartition(parts).cache()
val right_df = id_day_df.select($"id".as("id"), $"day".as("day"), lit("right").as("side")).repartition(parts).cache()  //.withColumnRenamed("id", "id2")

// note the column order is different to the groupBy("id", "day") column order below
val window = Window.partitionBy("day", "id")

case class Key(id: BigInt, day: BigInt)
case class Value(id: BigInt, day: BigInt, side: String)
case class Sum(id: BigInt, day: BigInt, side: String, day_sum: BigInt)

val left_grouped_df = left_df.groupBy("id", "day").as[Key, Value]
val right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day").as[Key, Sum]

val df = left_grouped_df.cogroup(right_grouped_df)((key: Key, left: Iterator[Value], right: Iterator[Sum]) => left)

df.explain()
df.show(5)
```

Output was
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
         +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
            +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                  +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0|  3|    0|     1|
|  0|  4|    0|     1|
|  0| 13|    1|     0|
|  0| 27|    0|     1|
|  0| 31|    0|     1|
+---+---+-----+------+
only showing top 5 rows
```

Output now is
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
   :     +- ...
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                     +- ...

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0| 13|    1|     1|
|  0| 63|    1|     1|
|  0| 89|    1|     1|
|  0| 95|    1|     1|
|  0| 96|    1|     1|
+---+---+-----+------+
only showing top 5 rows
```

Spark 3.3 [reworked](https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76) `HashClusteredDistribution`, and is not sensitive to using `ClusteredDistribution`: apache#32875

### Does this PR introduce _any_ user-facing change?
This fixes correctness.

### How was this patch tested?
A unit test in `EnsureRequirementsSuite`.

Closes apache#39717 from EnricoMi/branch-3.2-cogroup-window-bug.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants