Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41914][SQL] FileFormatWriter materializes AQE plan before accessing outputOrdering #39431

Closed

Conversation

EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Jan 6, 2023

What changes were proposed in this pull request?

The FileFormatWriter materializes an AdaptiveQueryPlan before accessing the plan's outputOrdering. This is required when planned writing is disabled (spark.sql.optimizer.plannedWrite.enabled is true by default). With planned writing enabled FileFormatWriter gets the final plan already.

Why are the changes needed?

FileFormatWriter enforces an ordering if the written plan does not provide that ordering. An AdaptiveQueryPlan does not know its final ordering, in which case FileFormatWriter enforces the ordering (e.g. by column "a") even if the plan provides a compatible ordering (e.g. by columns "a", "b"). In case of spilling, that order (e.g. by columns "a", "b") gets broken (see SPARK-40588).

Does this PR introduce any user-facing change?

This fixes SPARK-40588 for 3.4, which was introduced in 3.0. This restores behaviour from Spark 2.4.

How was this patch tested?

The final plan that is written to files is now stored in FileFormatWriter.executedPlan (similar to existing FileFormatWriter.outputOrderingMatched). Unit tests assert the outermost sort order written to files.

The actual plan written into the files changed from (taken from "SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column"):

Sort [input[2, int, false] ASC NULLS FIRST], false, 0
+- *(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0
   +- *(3) Project [b#24, value#14, key#13]
      +- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
         :- BroadcastQueryStage 2
         :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=376]
         :     +- AQEShuffleRead local
         :        +- ShuffleQueryStage 0
         :           +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [plan_id=328]
         :              +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
         :                 +- Scan[obj#12]
         +- AQEShuffleRead local
            +- ShuffleQueryStage 1
               +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [plan_id=345]
                  +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
                     +- Scan[obj#22]

where FileFormatWriter enforces order with Sort [input[2, int, false] ASC NULLS FIRST], false, 0, to

*(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0
+- *(3) Project [b#24, value#14, key#13]
   +- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
      :- BroadcastQueryStage 2
      :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=375]
      :     +- AQEShuffleRead local
      :        +- ShuffleQueryStage 0
      :           +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [plan_id=327]
      :              +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
      :                 +- Scan[obj#12]
      +- AQEShuffleRead local
         +- ShuffleQueryStage 1
            +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [plan_id=344]
               +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
                  +- Scan[obj#22]

where the sort given by the user is the outermost sort now.

@github-actions github-actions bot added the SQL label Jan 6, 2023
@EnricoMi EnricoMi changed the title [SPARK-41914] FileFormatWriter materializes AQE plan before accessing outputOrdering [SPARK-41914][SQL] FileFormatWriter materializes AQE plan before accessing outputOrdering Jan 6, 2023
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Jan 6, 2023

@cloud-fan here is the fix for SPARK-40588 migrated to Spark 3.4.

This finally includes unit tests for the actual plan written to files (that has never been tested before).

@@ -181,13 +178,111 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write
|PARTITIONED BY (k STRING)
|""".stripMargin)
executeAndCheckOrdering(
hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the orderingMatched parameter if it's always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean, as in needed in this test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, shall we remove orderingMatched from the method executeAndCheckOrdering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no default value for orderingMatched and two other unit tests still use orderingMatched=enabled.


// SPARK-40885: this bug removes the in-partition sort, which manifests here
case (true, SortExec(Seq(
SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why the sorting key is different when planned write is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is correctness bug SPARK-40885 discussed in #38356

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you can you take a look at this bug?

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Jan 6, 2023

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

// the sort order doesn't matter
// Use the output ordering from the original plan before adding the empty2null projection.
val actualOrdering = writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child)
val actualOrdering = writeFilesOpt.map(_.child)
.getOrElse(materializeAdaptiveSparkPlan(plan))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put all code changes inside if writeFilesOpt is empty ? if writeFilesOpt is defined that means the write have been planned which does not have this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.getOrElse already does what you said, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, materializeAdaptiveSparkPlan is applied on plan only if writeFilesOpt is undefined.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@EnricoMi
Copy link
Contributor Author

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants