Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repeated shuffle should not trigger Comet columnar shuffle #295

Closed
viirya opened this issue Apr 20, 2024 · 2 comments · Fixed by #296
Closed

Repeated shuffle should not trigger Comet columnar shuffle #295

viirya opened this issue Apr 20, 2024 · 2 comments · Fixed by #296
Assignees
Labels
bug Something isn't working

Comments

@viirya
Copy link
Member

viirya commented Apr 20, 2024

Describe the bug

I noticed a special case while debugging test failures in #250.

org.apache.spark.sql.DataFrameWindowFunctionsSuite: SPARK-38237: require all cluster keys for child required distribution for window query:

== Physical Plan ==                                                                                                
*(3) Project [lead(key1, 1, NULL) OVER (PARTITION BY key1, key2 ORDER BY value ASC NULLS FIRST ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING)#3854, lead(value, 1, NULL) OVER (PARTITION BY key1, key2 ORDER BY value ASC NULLS FIRST ROWS B
ETWEEN 1 FOLLOWING AND 1 FOLLOWING)#3855]                                                                          
+- Window [lead(key1#3848, 1, null) windowspecdefinition(key1#3848, key2#3849, value#3850 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS lead(key1, 1, NULL) OVER (PARTITION BY key1, key2 ORDER BY value ASC NULLS FIRST RO
WS BETWEEN 1 FOLLOWING AND 1 FOLLOWING)#3854, lead(value#3850, 1, null) windowspecdefinition(key1#3848, key2#3849, value#3850 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS lead(value, 1, NULL) OVER (PARTITION BY key1, k
ey2 ORDER BY value ASC NULLS FIRST ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING)#3855], [key1#3848, key2#3849], [value#3850 ASC NULLS FIRST]                                                                                               
   +- *(2) ColumnarToRow                                                                                           
      +- CometSort [key1#3848, key2#3849, value#3850], [key1#3848 ASC NULLS FIRST, key2#3849 ASC NULLS FIRST, value#3850 ASC NULLS FIRST]                                                                                              
         +- CometColumnarExchange hashpartitioning(key1#3848, key2#3849, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=8988]                                                                                                 
            +- CometColumnarExchange hashpartitioning(key1#3848, 5), REPARTITION_BY_COL, CometColumnarShuffle, [plan_id=8987]                                                                                                          
               +- RowToColumnar
                  +- *(1) Project [_1#3841 AS key1#3848, _2#3842 AS key2#3849, _3#3843 AS value#3850]
                     +- *(1) LocalTableScan [_1#3841, _2#3842, _3#3843]

There is repeated shuffle operators existing in the query. Currently it fails by

[info]   java.lang.UnsupportedOperationException: CometShuffleExchangeExec.doExecute should not be executed.
[info]   at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.doExecute(CometShuffleExchangeExec.scala:169)
[info]   at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)                 
[info]   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)              
[info]   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)        
[info]   at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.inputRDD$lzycompute(CometShuffleExchangeExec.scala:98)
[info]   at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.inputRDD(CometShuffleExchangeExec.scala:91)
[info]   at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.shuffleDependency$lzycompute(CometShuffleExchangeExec.scala:150)
[info]   at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.shuffleDependency(CometShuffleExchangeExec.scala:133)
[info]   at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.doExecuteColumnar(CometShuffleExchangeExec.scala:188)

It is cause the upper CometShuffleExchangeExec will call the bottom CometShuffleExchangeExec.doExecute because CometShuffleExchangeExec takes row inputs.

To fix it, although we can add a ColumnarToRow on top of the bottom CometShuffleExchangeExec. I don't think it is efficient as the snippet of shuffles has too many row-to-column/column-to-row conversions:

I think it will be more reasonable to skip such case for Comet shuffle.

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

@advancedxy
Copy link
Contributor

Do you have any idea why two consecutive shuffle operators are created at the first place?

Spark's EnsureRequirements should replace the old ShuffleExec with a new one with updated output partitioning.

@viirya
Copy link
Member Author

viirya commented Apr 22, 2024

The query has a user specified repartition operator. Such repartition won't be removed as it is required by user.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants