-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering #38358
[SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering #38358
Conversation
@cloud-fan this fixes a very peculiar bug introduced via AQE in Spark 3.0. With Spark 3.4, the issue disappeared. A user-defined ordering, that starts with the partition columns, is broken by |
Can one of the admins verify this patch? |
@@ -221,6 +221,8 @@ case class AdaptiveSparkPlanExec( | |||
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) | |||
} | |||
|
|||
private[sql] lazy val finalPlan: SparkPlan = getFinalPhysicalPlan() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just change access of getFinalPhysicalPlan()
to private[sql]
instead? I don't think we need a duplicate of executedPlan
in this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
|
||
// SPARK-40588: plan may contain an AdaptiveSparkPlanExec, which does not know | ||
// its final plan's ordering, so we have to materialize that plan first | ||
def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what you mean with "is brought"` in "the missing ordering field of Sort in the top layer is also brought"?
It is also not clear to me what you are referring to with "removing 'AdaptiveSparkPlanExec'". Do you mean the change in this this PR? The 'AdaptiveSparkPlanExec' is not removed but replaced with the final plan. It is the same plan that is otherwise used when calling .execute
, so AQE should work as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V1WriteCommand
is a concept introduced after Spark 3.3. The issue addressed in this PR does not exist post Spark 3.3 as the FieFormatWriter
does not see AdaptiveSparkPlanExec
but the final plan (exactly what this change is trying to achieve).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand what you mean. The problem of #38356 is also caused by enforces the ordering at V1WriteCommand
, but not the same case.
@cloud-fan @sunchao is there a chance to get this regression fix into 3.2 before the upcoming 3.2.3 release? There is currently no Spark 3 release that does not suffer from this regression (see SPARK-40588). |
@HyukjinKwon this PR targets |
f09d844
to
0233c81
Compare
@HyukjinKwon never mind, I have looked at the wrong workflow ("Build") but branch 3.3 has the old workflows ("Build and test"). |
Yes, we can get this back ported to Spark 3.2.3 once this PR is merged. |
Thanks for the fix! Is it possible this could land in 3.1 as well? |
@kristopherkane 3.1 is EOL unfortunately. @zzzzming95 Does this PR fix your problem? |
I have no problem with this issue. Another similar issue I found #38356 is not the same case as this issue. |
@zzzzming95 Your query is affected by this issue as well (Spark 3.3):
The outer But you are right, the issue in #38356 is different and has been introduced in Spark 3.4. For completeness, this is how nullable string partition columns look like after the fix (Spark 3.3):
Btw., this is what I would expect from Spark 3.4 when #38356 is fixed. |
0233c81
to
019707d
Compare
Hi, curious what's the status of this PR? @EnricoMi @cloud-fan . I plan to start the 3.2.3 release process this week. Will this be fixed soon? |
@sunchao I don't think this blocks 3.3. It's not a new regression in 3.3, but a long-standing issue since we have AQE. |
There is no Spark 3.x release that does not suffer from this. This blocks people from moving to Spark 3, while Spark 3.0 and 3.1 are already EOL. Please reconsider providing a fix before the Spark 3.4 release. |
@@ -222,7 +222,7 @@ case class AdaptiveSparkPlanExec( | |||
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) | |||
} | |||
|
|||
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { | |||
private[sql] def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking at the code again, I think it's risky to call getFinalPhysicalPlan
directly. Other places call withFinalPlanUpdate
which can handle UI update. How about we add a new function here?
def finalPhysicalPlan = withFinalPlanUpdate(identity)
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
Show resolved
Hide resolved
@EnricoMi I updated my comment... #38358 (comment) |
0cf39d9
to
2052190
Compare
60db4dc
to
52b7bb6
Compare
All done, all green: https://github.com/G-Research/spark/actions/runs/3420780216 |
thanks, merging to 3.3/3.2! |
… outputOrdering ### What changes were proposed in this pull request? The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan. ### Why are the changes needed? `FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588). ### Does this PR introduce _any_ user-facing change? This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4. ### How was this patch tested? The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario. Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files. The actual plan written into the files changed from ``` Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0 +- AdaptiveSparkPlan isFinalPlan=false +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30] +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [id=#28] : +- Project [id#0L AS day#2L] : +- Range (0, 2, step=1, splits=2) +- Range (0, 10000000, step=1, splits=2) ``` where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to ``` *(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=#68] +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastQueryStage 0 : +- BroadcastExchange IdentityBroadcastMode, [id=#42] : +- *(1) Project [id#0L AS day#2L] : +- *(1) Range (0, 2, step=1, splits=2) +- *(2) Range (0, 1000000, step=1, splits=2) ``` where the sort given by the user is the outermost sort now. Closes #38358 from EnricoMi/branch-3.3-materialize-aqe-plan. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… outputOrdering The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan. `FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588). This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4. The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario. Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files. The actual plan written into the files changed from ``` Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0 +- AdaptiveSparkPlan isFinalPlan=false +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30] +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [id=#28] : +- Project [id#0L AS day#2L] : +- Range (0, 2, step=1, splits=2) +- Range (0, 10000000, step=1, splits=2) ``` where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to ``` *(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=#68] +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastQueryStage 0 : +- BroadcastExchange IdentityBroadcastMode, [id=#42] : +- *(1) Project [id#0L AS day#2L] : +- *(1) Range (0, 2, step=1, splits=2) +- *(2) Range (0, 1000000, step=1, splits=2) ``` where the sort given by the user is the outermost sort now. Closes #38358 from EnricoMi/branch-3.3-materialize-aqe-plan. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit f0cad7a) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thank you! |
Unfortunately, a similar issue exists for Spark 3.4.0 as well, created issue SPARK-41914 to track that. |
shall we simply port this PR to master? It will be a noop when planned write is enabled, as there will be no |
Fixed in #39431. |
@EnricoMi It seems it will remove the table location if a import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
sql("CREATE TABLE IF NOT EXISTS spark32_overwrite(amt1 int) STORED AS ORC")
sql("CREATE TABLE IF NOT EXISTS spark32_overwrite2(amt1 long) STORED AS ORC")
sql("INSERT OVERWRITE TABLE spark32_overwrite2 select 6000044164")
sql("set spark.sql.ansi.enabled=true")
val loc =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("spark32_overwrite")).location
val fs = FileSystem.get(loc, spark.sparkContext.hadoopConfiguration)
println("Location exists: " + fs.exists(new Path(loc)))
try {
sql("INSERT OVERWRITE TABLE spark32_overwrite select amt1 from " +
"(select cast(amt1 as int) as amt1 from spark32_overwrite2 distribute by amt1)")
} finally {
println("Location exists: " + fs.exists(new Path(loc)))
} |
Yes, it looks like it removes the empty table location after overwriting the table failed due to the @cloud-fan do you consider the removal of an empty table location after overwriting the table fails is a regression? |
@wangyum do you know why it's a problem only in 3.2? |
Spark3.2.1 don't have this problem. |
… outputOrdering The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan. `FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588). This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4. The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario. Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files. The actual plan written into the files changed from ``` Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0 +- AdaptiveSparkPlan isFinalPlan=false +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=apache#30] +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [id=apache#28] : +- Project [id#0L AS day#2L] : +- Range (0, 2, step=1, splits=2) +- Range (0, 10000000, step=1, splits=2) ``` where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to ``` *(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=apache#68] +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastQueryStage 0 : +- BroadcastExchange IdentityBroadcastMode, [id=apache#42] : +- *(1) Project [id#0L AS day#2L] : +- *(1) Range (0, 2, step=1, splits=2) +- *(2) Range (0, 1000000, step=1, splits=2) ``` where the sort given by the user is the outermost sort now. Closes apache#38358 from EnricoMi/branch-3.3-materialize-aqe-plan. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit f0cad7a) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…cute in `FileFormatWriter#write` ### What changes were proposed in this pull request? Trigger `committer.setupJob` before plan execute in `FileFormatWriter#write` ### Why are the changes needed? In this issue, the case where `outputOrdering` might not work if AQE is enabled has been resolved. #38358 However, since it materializes the AQE plan in advance (triggers getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to not execute When `AdaptiveSparkPlanExec#getFinalPhysicalPlan()` is executed with an error. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add UT Closes #41154 from zzzzming95/spark3-SPARK-43327. Lead-authored-by: zzzzming95 <505306252@qq.com> Co-authored-by: zhiming she <505306252@qq.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
The
FileFormatWriter
materializes anAdaptiveQueryPlan
before accessing the plan'soutputOrdering
. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this becauseFileFormatWriter
gets the final plan.Why are the changes needed?
FileFormatWriter
enforces an ordering if the written plan does not provide that ordering. AnAdaptiveQueryPlan
does not know its final ordering (Spark 3.0 to 3.3), in which caseFileFormatWriter
enforces the ordering (e.g. by column"a"
) even if the plan provides a compatible ordering (e.g. by columns"a", "b"
). In case of spilling, that order (e.g. by columns"a", "b"
) gets broken (see SPARK-40588).Does this PR introduce any user-facing change?
This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4.
How was this patch tested?
The final plan that is written to files cannot be extracted from
FileFormatWriter
. The bug explained in SPARK-40588 can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario.Therefore, this was tested manually. The example to reproduce this issue given in SPARK-40588 now produces sorted files.
The actual plan written into the files changed from
where
FileFormatWriter
enforces order withSort [input[0, bigint, false] ASC NULLS FIRST], false, 0
, towhere the sort given by the user is the outermost sort now.