-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41914][SQL] FileFormatWriter materializes AQE plan before accessing outputOrdering #39431
Conversation
@cloud-fan here is the fix for SPARK-40588 migrated to Spark 3.4. This finally includes unit tests for the actual plan written to files (that has never been tested before). |
@@ -181,13 +178,111 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write | |||
|PARTITIONED BY (k STRING) | |||
|""".stripMargin) | |||
executeAndCheckOrdering( | |||
hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) { | |||
hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need the orderingMatched
parameter if it's always true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean, as in needed in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, shall we remove orderingMatched
from the method executeAndCheckOrdering
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no default value for orderingMatched
and two other unit tests still use orderingMatched=enabled
.
|
||
// SPARK-40885: this bug removes the in-partition sort, which manifests here | ||
case (true, SortExec(Seq( | ||
SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know why the sorting key is different when planned write is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is correctness bug SPARK-40885 discussed in #38356
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ulysses-you can you take a look at this bug?
All tests green: https://github.com/G-Research/spark/actions/runs/3855300306 |
Can one of the admins verify this patch? |
// the sort order doesn't matter | ||
// Use the output ordering from the original plan before adding the empty2null projection. | ||
val actualOrdering = writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child) | ||
val actualOrdering = writeFilesOpt.map(_.child) | ||
.getOrElse(materializeAdaptiveSparkPlan(plan)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we put all code changes inside if writeFilesOpt
is empty ? if writeFilesOpt
is defined that means the write have been planned which does not have this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.getOrElse
already does what you said, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, materializeAdaptiveSparkPlan
is applied on plan
only if writeFilesOpt
is undefined.
thanks, merging to master! |
Thanks! |
What changes were proposed in this pull request?
The
FileFormatWriter
materializes anAdaptiveQueryPlan
before accessing the plan'soutputOrdering
. This is required when planned writing is disabled (spark.sql.optimizer.plannedWrite.enabled
istrue
by default). With planned writing enabledFileFormatWriter
gets the final plan already.Why are the changes needed?
FileFormatWriter
enforces an ordering if the written plan does not provide that ordering. AnAdaptiveQueryPlan
does not know its final ordering, in which caseFileFormatWriter
enforces the ordering (e.g. by column"a"
) even if the plan provides a compatible ordering (e.g. by columns"a", "b"
). In case of spilling, that order (e.g. by columns"a", "b"
) gets broken (see SPARK-40588).Does this PR introduce any user-facing change?
This fixes SPARK-40588 for 3.4, which was introduced in 3.0. This restores behaviour from Spark 2.4.
How was this patch tested?
The final plan that is written to files is now stored in
FileFormatWriter.executedPlan
(similar to existingFileFormatWriter.outputOrderingMatched
). Unit tests assert the outermost sort order written to files.The actual plan written into the files changed from (taken from
"SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column"
):where
FileFormatWriter
enforces order withSort [input[2, int, false] ASC NULLS FIRST], false, 0
, towhere the sort given by the user is the outermost sort now.