Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CometExec's outputPartitioning might not be same as Spark expects after AQE interferes #298

Closed
viirya opened this issue Apr 22, 2024 · 0 comments · Fixed by #299
Closed
Assignees
Labels
bug Something isn't working

Comments

@viirya
Copy link
Member

viirya commented Apr 22, 2024

Describe the bug

Currently, CometExec has default outputPartitioning implementation that reuses Spark original plan's outputPartitioning. In most cases, it is correct. But Spark AQE has special node AQEShuffleRead which can possibly change output partitioning for coalescing purpose, e.g.:

*(2) ColumnarToRow                                                                                                                                                                                                               
+- CometSort [id#3311, data#3312, day#3313], [data#3312 DESC NULLS FIRST, id#3311 ASC NULLS FIRST]                                                                                                                                     
   +- AQEShuffleRead coalesced                                                                                                                                                                                                         
      +- ShuffleQueryStage 0                                                                                                                                                                                                           
         +- CometColumnarExchange hashpartitioning(data#3312, 5), REPARTITION_BY_COL, CometColumnarShuffle, [plan_id=10211]                                                                                                            
            +- RowToColumnar                                                                                                                                                                                                           
               +- *(1) Project [_1#3304 AS id#3311, _2#3305 AS data#3312, _3#3306 AS day#3313]                                                                                                                                         
                  +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple3, true]))._1 AS _1#3304, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull
(input[0, scala.Tuple3, true]))._2, true, false, true) AS _2#3305, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, DateType, fromJavaDate, knownnotnull(assertnotnull(input[0, scala.Tuple3, true]))._3, true, fa
lse, true) AS _3#3306]                                                                                                                                                                                                                                      +- Scan[obj#3303]                      

It causes many test failures in org.apache.spark.sql.connector.WriteDistributionAndOrderingSuite on #250. For example, CometSort's output partitioning is the original plan's hashpartitioning(data#3312, 5) of original exchange. But after AQE, AQEShuffleRead is added and it modifies output partitioning to coalescedhashpartitioning(hashpartitioning('data, 5), CoalescedBoundary(0,5)). Because we replace Spark SortExec with CometSort before AQE interferes Spark query plan, CometSort uses SortExec's output partitioning hashpartitioning(data#3312, 5).

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
1 participant