Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering #38358

Closed

Conversation

EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Oct 23, 2022

What changes were proposed in this pull request?

The FileFormatWriter materializes an AdaptiveQueryPlan before accessing the plan's outputOrdering. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because FileFormatWriter gets the final plan.

Why are the changes needed?

FileFormatWriter enforces an ordering if the written plan does not provide that ordering. An AdaptiveQueryPlan does not know its final ordering (Spark 3.0 to 3.3), in which case FileFormatWriter enforces the ordering (e.g. by column "a") even if the plan provides a compatible ordering (e.g. by columns "a", "b"). In case of spilling, that order (e.g. by columns "a", "b") gets broken (see SPARK-40588).

Does this PR introduce any user-facing change?

This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4.

How was this patch tested?

The final plan that is written to files cannot be extracted from FileFormatWriter. The bug explained in SPARK-40588 can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario.

Therefore, this was tested manually. The example to reproduce this issue given in SPARK-40588 now produces sorted files.

The actual plan written into the files changed from

Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30]
         +- BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastExchange IdentityBroadcastMode, [id=#28]
            :  +- Project [id#0L AS day#2L]
            :     +- Range (0, 2, step=1, splits=2)
            +- Range (0, 10000000, step=1, splits=2)

where FileFormatWriter enforces order with Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0, to

*(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=#68]
         +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastQueryStage 0
            :  +- BroadcastExchange IdentityBroadcastMode, [id=#42]
            :     +- *(1) Project [id#0L AS day#2L]
            :        +- *(1) Range (0, 2, step=1, splits=2)
            +- *(2) Range (0, 1000000, step=1, splits=2)

where the sort given by the user is the outermost sort now.

@github-actions github-actions bot added the SQL label Oct 23, 2022
@EnricoMi
Copy link
Contributor Author

@cloud-fan this fixes a very peculiar bug introduced via AQE in Spark 3.0. With Spark 3.4, the issue disappeared.

A user-defined ordering, that starts with the partition columns, is broken by FileFormaWriter because it cannot see the actual order as AdaptiveSparkPlanExec does not know the final plan yet. Sorting by the partition columns breaks the existing user-defined order only when spills occur, because UnsafeSorterSpillMerger round-robins over the spill files.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@@ -221,6 +221,8 @@ case class AdaptiveSparkPlanExec(
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
}

private[sql] lazy val finalPlan: SparkPlan = getFinalPhysicalPlan()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just change access of getFinalPhysicalPlan() to private[sql] instead? I don't think we need a duplicate of executedPlan in this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.


// SPARK-40588: plan may contain an AdaptiveSparkPlanExec, which does not know
// its final plan's ordering, so we have to materialize that plan first
def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {

This comment was marked as resolved.

Copy link
Contributor Author

@EnricoMi EnricoMi Oct 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean with "is brought"` in "the missing ordering field of Sort in the top layer is also brought"?

It is also not clear to me what you are referring to with "removing 'AdaptiveSparkPlanExec'". Do you mean the change in this this PR? The 'AdaptiveSparkPlanExec' is not removed but replaced with the final plan. It is the same plan that is otherwise used when calling .execute, so AQE should work as before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V1WriteCommand is a concept introduced after Spark 3.3. The issue addressed in this PR does not exist post Spark 3.3 as the FieFormatWriter does not see AdaptiveSparkPlanExec but the final plan (exactly what this change is trying to achieve).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you mean. The problem of #38356 is also caused by enforces the ordering at V1WriteCommand, but not the same case.

@EnricoMi
Copy link
Contributor Author

@cloud-fan @sunchao is there a chance to get this regression fix into 3.2 before the upcoming 3.2.3 release?

There is currently no Spark 3 release that does not suffer from this regression (see SPARK-40588).

@EnricoMi
Copy link
Contributor Author

@HyukjinKwon this PR targets branch-3.3, not master. How are tests kicked off in my fork?

@EnricoMi EnricoMi force-pushed the branch-3.3-materialize-aqe-plan branch from f09d844 to 0233c81 Compare October 31, 2022 08:34
@EnricoMi
Copy link
Contributor Author

@HyukjinKwon never mind, I have looked at the wrong workflow ("Build") but branch 3.3 has the old workflows ("Build and test").

@sunchao
Copy link
Member

sunchao commented Oct 31, 2022

Yes, we can get this back ported to Spark 3.2.3 once this PR is merged.

@kristopherkane
Copy link

Thanks for the fix! Is it possible this could land in 3.1 as well?

@cloud-fan
Copy link
Contributor

@kristopherkane 3.1 is EOL unfortunately.

@zzzzming95 Does this PR fix your problem?

@zzzzming95
Copy link
Contributor

@kristopherkane 3.1 is EOL unfortunately.

@zzzzming95 Does this PR fix your problem?

I have no problem with this issue. Another similar issue I found #38356 is not the same case as this issue.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Nov 3, 2022

@zzzzming95 Your query is affected by this issue as well (Spark 3.3):

Sort [input[2, string, true] ASC NULLS FIRST], false, 0
+- Project [id#37, sort_col#38, empty2null(p#39) AS p#46]
   +- AdaptiveSparkPlan isFinalPlan=false
      +- Sort [p#39 ASC NULLS FIRST, sort_col#38 ASC NULLS FIRST], false, 0
         +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=42]
            +- LocalTableScan [id#37, sort_col#38, p#39]

The outer Sort will break the inner Sort when spills occur.

But you are right, the issue in #38356 is different and has been introduced in Spark 3.4.

For completeness, this is how nullable string partition columns look like after the fix (Spark 3.3):

Project [id#10, sort_col#11, empty2null(p#12) AS p#19]
+- *(1) Sort [p#12 ASC NULLS FIRST, sort_col#11 ASC NULLS FIRST], false, 0
   +- ShuffleQueryStage 0
      +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=12]
         +- LocalTableScan [id#10, sort_col#11, p#12]

Btw., this is what I would expect from Spark 3.4 when #38356 is fixed.

@EnricoMi EnricoMi force-pushed the branch-3.3-materialize-aqe-plan branch from 0233c81 to 019707d Compare November 7, 2022 07:16
@sunchao
Copy link
Member

sunchao commented Nov 7, 2022

Hi, curious what's the status of this PR? @EnricoMi @cloud-fan . I plan to start the 3.2.3 release process this week. Will this be fixed soon?

@cloud-fan
Copy link
Contributor

@sunchao I don't think this blocks 3.3. It's not a new regression in 3.3, but a long-standing issue since we have AQE.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Nov 8, 2022

There is no Spark 3.x release that does not suffer from this. This blocks people from moving to Spark 3, while Spark 3.0 and 3.1 are already EOL. Please reconsider providing a fix before the Spark 3.4 release.

@@ -222,7 +222,7 @@ case class AdaptiveSparkPlanExec(
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
}

private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
private[sql] def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at the code again, I think it's risky to call getFinalPhysicalPlan directly. Other places call withFinalPlanUpdate which can handle UI update. How about we add a new function here?

def finalPhysicalPlan = withFinalPlanUpdate(identity)

@cloud-fan
Copy link
Contributor

@EnricoMi I updated my comment... #38358 (comment)

@EnricoMi EnricoMi force-pushed the branch-3.3-materialize-aqe-plan branch from 0cf39d9 to 2052190 Compare November 8, 2022 14:09
@EnricoMi EnricoMi force-pushed the branch-3.3-materialize-aqe-plan branch from 60db4dc to 52b7bb6 Compare November 8, 2022 15:24
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Nov 8, 2022

All done, all green: https://github.com/G-Research/spark/actions/runs/3420780216

@cloud-fan
Copy link
Contributor

thanks, merging to 3.3/3.2!

cloud-fan pushed a commit that referenced this pull request Nov 9, 2022
… outputOrdering

### What changes were proposed in this pull request?
The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan.

### Why are the changes needed?
`FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588).

### Does this PR introduce _any_ user-facing change?
This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4.

### How was this patch tested?
The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario.

Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files.

The actual plan written into the files changed from

```
Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30]
         +- BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastExchange IdentityBroadcastMode, [id=#28]
            :  +- Project [id#0L AS day#2L]
            :     +- Range (0, 2, step=1, splits=2)
            +- Range (0, 10000000, step=1, splits=2)
```

where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to

```
*(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=#68]
         +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastQueryStage 0
            :  +- BroadcastExchange IdentityBroadcastMode, [id=#42]
            :     +- *(1) Project [id#0L AS day#2L]
            :        +- *(1) Range (0, 2, step=1, splits=2)
            +- *(2) Range (0, 1000000, step=1, splits=2)
```

where the sort given by the user is the outermost sort now.

Closes #38358 from EnricoMi/branch-3.3-materialize-aqe-plan.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Nov 9, 2022
… outputOrdering

The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan.

`FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588).

This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4.

The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario.

Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files.

The actual plan written into the files changed from

```
Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30]
         +- BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastExchange IdentityBroadcastMode, [id=#28]
            :  +- Project [id#0L AS day#2L]
            :     +- Range (0, 2, step=1, splits=2)
            +- Range (0, 10000000, step=1, splits=2)
```

where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to

```
*(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=#68]
         +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastQueryStage 0
            :  +- BroadcastExchange IdentityBroadcastMode, [id=#42]
            :     +- *(1) Project [id#0L AS day#2L]
            :        +- *(1) Range (0, 2, step=1, splits=2)
            +- *(2) Range (0, 1000000, step=1, splits=2)
```

where the sort given by the user is the outermost sort now.

Closes #38358 from EnricoMi/branch-3.3-materialize-aqe-plan.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f0cad7a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan cloud-fan closed this Nov 9, 2022
@EnricoMi EnricoMi deleted the branch-3.3-materialize-aqe-plan branch November 9, 2022 09:53
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Nov 9, 2022

Thank you!

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Jan 5, 2023

Unfortunately, a similar issue exists for Spark 3.4.0 as well, created issue SPARK-41914 to track that.

@cloud-fan
Copy link
Contributor

shall we simply port this PR to master? It will be a noop when planned write is enabled, as there will be no AdaptiveSparkPlanExec in the input plan.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Jan 6, 2023

Fixed in #39431.

@wangyum
Copy link
Member

wangyum commented Mar 6, 2023

@EnricoMi It seems it will remove the table location if a java.lang.ArithmeticException is thrown after this change on branch-3.2.
How to reproduce it on Spark 3.2.3:

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier

sql("CREATE TABLE IF NOT EXISTS spark32_overwrite(amt1 int) STORED AS ORC")
sql("CREATE TABLE IF NOT EXISTS spark32_overwrite2(amt1 long) STORED AS ORC")
sql("INSERT OVERWRITE TABLE spark32_overwrite2 select 6000044164")
sql("set spark.sql.ansi.enabled=true")

val loc =
  spark.sessionState.catalog.getTableMetadata(TableIdentifier("spark32_overwrite")).location

val fs = FileSystem.get(loc, spark.sparkContext.hadoopConfiguration)
println("Location exists: " + fs.exists(new Path(loc)))

try {
  sql("INSERT OVERWRITE TABLE spark32_overwrite select amt1 from " +
    "(select cast(amt1 as int) as amt1 from spark32_overwrite2 distribute by amt1)")
} finally {
  println("Location exists: " + fs.exists(new Path(loc)))
}

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Mar 6, 2023

Yes, it looks like it removes the empty table location after overwriting the table failed due to the ArithmeticException.

@cloud-fan do you consider the removal of an empty table location after overwriting the table fails is a regression?

@cloud-fan
Copy link
Contributor

@wangyum do you know why it's a problem only in 3.2?

@lordk911
Copy link

@wangyum do you know why it's a problem only in 3.2?

Spark3.2.1 don't have this problem.

sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
… outputOrdering

The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan.

`FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588).

This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4.

The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario.

Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files.

The actual plan written into the files changed from

```
Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=apache#30]
         +- BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastExchange IdentityBroadcastMode, [id=apache#28]
            :  +- Project [id#0L AS day#2L]
            :     +- Range (0, 2, step=1, splits=2)
            +- Range (0, 10000000, step=1, splits=2)
```

where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to

```
*(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=apache#68]
         +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner
            :- BroadcastQueryStage 0
            :  +- BroadcastExchange IdentityBroadcastMode, [id=apache#42]
            :     +- *(1) Project [id#0L AS day#2L]
            :        +- *(1) Range (0, 2, step=1, splits=2)
            +- *(2) Range (0, 1000000, step=1, splits=2)
```

where the sort given by the user is the outermost sort now.

Closes apache#38358 from EnricoMi/branch-3.3-materialize-aqe-plan.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f0cad7a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Aug 22, 2023
…cute in `FileFormatWriter#write`

### What changes were proposed in this pull request?

Trigger `committer.setupJob` before plan execute in `FileFormatWriter#write`

### Why are the changes needed?

In this issue, the case where `outputOrdering` might not work if AQE is enabled has been resolved.

#38358

However, since it materializes the AQE plan in advance (triggers getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to not execute When `AdaptiveSparkPlanExec#getFinalPhysicalPlan()` is executed with an error.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add UT

Closes #41154 from zzzzming95/spark3-SPARK-43327.

Lead-authored-by: zzzzming95 <505306252@qq.com>
Co-authored-by: zhiming she <505306252@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants