-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24251][SQL] Add AppendData logical plan. #21305
Conversation
Test build #90530 has finished for PR 21305 at commit
|
Test build #90535 has finished for PR 21305 at commit
|
/** | ||
* Append data to an existing DataSourceV2 table. | ||
*/ | ||
case class AppendData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this logical plan node map to the 8 operations outlined in your SPIP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is InsertInto
. Wenchen wanted to call it Append
because that is more specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the SPIP doc to have both names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually really like this. I had thought the standardization would end up being a more invasive change than this. (Although I guess AppendData is the node with the simplest mapping from writer behavior.)
* Append data to an existing DataSourceV2 table. | ||
*/ | ||
case class AppendData( | ||
table: LogicalPlan, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still want to represent the table as a generic LogicalPlan rather than a separate table type? (Maybe the answer's yes; I'm not really clear on why this happens in the current code.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. These plans should not be specific to DataSourceV2Relation
. In the long term, we want any other relations using AppendData
to automatically have the same validation rules applied. That means that AppendData
should be generic and not care what the table implementation is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on this. We will still keep data source v1 and file format(for fallback), which doesn't use v2 relation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then seems that above code comment may be updated?
|
||
} else { | ||
val writer = ws.createWriter( | ||
UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: probably put the timestamp back to minimize the change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. The timestamp was added to reduce the possibility of conflicting job ids.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would random UUIDs conflict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's very unlikely to conflict, but UUID+timestamp is even more unlikely, isn't it? I feel it's safer the keep the original logic, e.g. we may have users to look at the temporary directory name and see when the write job was started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see no good reason to over-complicate the unique string passed in. Here's a quote from wikipedia on the chance of a conflict (from this SO answer):
Only after generating 1 billion UUIDs every second for the next 100 years, the probability of creating just one duplicate would be about 50%. Or, to put it another way, the probability of one duplicate would be about 50% if every person on earth owned 600 million UUIDs.
Adding timestamp to a UUID to avoid collisions is unnecessary.
For the other use, why would a user go to the temp directory of some node's file system -- which may not even be used by a given source -- instead of going to the logs? What if the user wants any other piece of information besides the starting timestamp (that's in some format that has to be converted)?
In short, I don't agree with the argument that it is helpful to pass the old format. This is just a carry-over from making fake Hadoop job IDs (why it was called jobId
and started with job_
). It's debatable whether the write UUID itself is even useful given that there is no requirement to use it anywhere.
d2e4c41
to
b906ab1
Compare
@cloud-fan, can you also review this PR for DataSourceV2? This adds the first of the logical plans proposed in SPIP: Standardize Logical Plans: This can be done in parallel with #21306 because it doesn't rely on those changes. The other logical operations, like |
Test build #92627 has finished for PR 21305 at commit
|
@@ -172,6 +173,7 @@ class Analyzer( | |||
ResolveWindowOrder :: | |||
ResolveWindowFrame :: | |||
ResolveNaturalAndUsingJoin :: | |||
ResolveOutputRelation :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this rule doesn't need to be in the giant resolution batch. Shall we put it in the Post-Hoc Resolution
batch which run only once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule may add Projection
, UpCast
, and Alias
nodes to the plan, so there are some rules in this batch that should be run after the output is resolved. ResolveUpCast
will rewrite the casts that were inserted and throw exceptions if the cast would truncate and needs to run after this rule.
I could create a batch just after resolution for output resolution. We could just run this rule and ResolveUpCast
. I think the optimizer will handle collapsing Projection
nodes and aliases are only resolved in this batch, so adding resolved aliases shouldn't be a problem.
Would you like a separate batch for output resolution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ok then let's just keep it here
|
||
import DataSourceV2Relation._ | ||
|
||
override def name: String = { | ||
tableIdent.map(_.unquotedString).getOrElse("unknown") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use data source short name or its full class name instead of unknown
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the name of the data source, not the name of the table. I'd be fine with updating this if you want to include the source name. What about s"${source.name}:unknown"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -203,33 +203,33 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { | |||
val path = file.getCanonicalPath | |||
assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty) | |||
|
|||
spark.range(10).select('id, -'id).write.format(cls.getName) | |||
spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these required changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The new resolution rule validates the dataframe that will be written to the table.
Because this uses the DataFrameWriter
API, it matches columns by name because there isn't a strong expectation for ordering in the dataframe API (e.g. withColumn
doesn't specify where the new column is added).
|
||
private def upcast(inAttr: NamedExpression, outAttr: Attribute): NamedExpression = { | ||
Alias( | ||
UpCast(inAttr, outAttr.dataType, Seq()), outAttr.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Upcast is being used in more places, I wanna raise #21586 again.
The current Upcast has a bug that it forbids string-to-int, but allows string-to-boolean. Changing it is It's a behavior change so we should make sure the behavior is reasonable. What's our expectation of Upcast here? Are we expecting runtime error when casting string to other types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of UpCast
here is to prevent Spark from automatically inserting casts that could lose information, like long
to int
or string
to int
.
I would support the same for string
to boolean
to catch destructive problems from accidental column alignment (in SQL) or similar errors. The main problem here is that Spark inserts casts instead of alerting the user that there's a problem. When the write succeeds, it may be a while before the user realizes the mistake and can't recover the original data.
@cloud-fan, I've updated this with the requested changes. Thanks for looking at it! |
Test build #92693 has finished for PR 21305 at commit
|
*/ | ||
object ResolveOutputRelation extends Rule[LogicalPlan] { | ||
override def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case append @ AppendData(table: NamedRelation, query, isByName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't have an analyzer rule for ApendData(LogicalPlan ...)
, shall we just require AppendData.table
to be a NamedRelation
in class definition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree.
expected.flatMap { outAttr => | ||
query.resolveQuoted(outAttr.name, resolver) match { | ||
case Some(inAttr) if inAttr.nullable && !outAttr.nullable => | ||
errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add a runtime null check instead of failing at the beginning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would much rather have a job fail fast and give a clear error message than to fail during a write. I can see how adding such an assertion to the plan could be useful, so I'd consider it if someone wanted to add that feature later. Right now, though, I think this is good.
errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" | ||
None | ||
|
||
case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sameType
ignores nullability, shall we add null check for nested fields too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll update to check nested fields.
} | ||
|
||
} else { | ||
if (expected.size > query.output.size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is already done in https://github.com/apache/spark/pull/21305/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R2152
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That check is the other direction: not enough columns.
When matching by position, we need to have the same number of columns so we add this check (we already know that there aren't too few columns, so this checks for too many). When matching by name, we can call out specific columns that are missing, which is why we do the validation differently for the two cases.
* @param jobId A unique string for the writing job. It's possible that there are many writing | ||
* jobs running at the same time, and the returned {@link DataSourceWriter} can | ||
* use this job id to distinguish itself from other jobs. | ||
* @param writeUUID A unique string for the writing job. It's possible that there are many writing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the rationale behind this renaming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the ID of the Spark job that is writing. I think the UUID name is more clear about what is actually passed, a unique string that identifies the write. There's also no need to make the string more complicated than a UUID since there are no guarantees about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is removed in the v2 API redesign.
Test build #92723 has finished for PR 21305 at commit
|
@cloud-fan, I've updated this and the tests are passing, so I think it is ready for another look. I just pushed a comments-only commit to fix the Javadoc for AppendData that @viirya pointed out (thanks!). Since that's only comments, it shouldn't affect test results. I think there's just one more point under discussion, which is the change from |
Test build #92761 has finished for PR 21305 at commit
|
Retest this please. |
@cloud-fan, I think we can ignore that last test failure because tests are passing on the last commit that made real changes. The latest commit only changed a comment. |
(The last test failure is a known flaky test I've been working (albeit unsuccessfully so far) to find a solution for.) |
Test build #92780 has finished for PR 21305 at commit
|
Do we have a plan to update the data source v2 write API to support |
@cloud-fan, yes. There is an open PR, #21308, that adds |
cool! so |
I don't think we need (or want) A couple other notes:
|
65c2670
to
f041019
Compare
@cloud-fan, I've rebased this so it is ready for final review when you get a chance. Thanks! |
Test build #93208 has finished for PR 21305 at commit
|
Previously missing fields were allowed if the read type's corresponding field was optional.
618a79d
to
e81790d
Compare
@cloud-fan, I've rebased and updated with the requested change to disallow missing columns, even if they're optional. Thanks for reviewing! |
LGTM, pending jenkins |
Test build #94339 has finished for PR 21305 at commit
|
retest this please |
Test build #94352 has finished for PR 21305 at commit
|
retest this please |
Test build #94364 has finished for PR 21305 at commit
|
retest this please |
Test build #94373 has finished for PR 21305 at commit
|
Test build #94384 has finished for PR 21305 at commit
|
thanks, merging to master! |
Thanks for reviewing, @cloud-fan! |
## What changes were proposed in this pull request? This is a follow-up to #21305 that adds a test suite for AppendData analysis. This also fixes the following problems uncovered by these tests: * Incorrect order of data types passed to `canWrite` is fixed * The field check calls `canWrite` first to ensure all errors are found * `AppendData#resolved` must check resolution of the query's attributes * Column names are quoted to show empty names ## How was this patch tested? This PR adds a test suite for AppendData analysis. Closes #22043 from rdblue/SPARK-24251-add-append-data-analysis-tests. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This adds a new logical plan, AppendData, that was proposed in SPARK-23521: Standardize SQL logical plans. * DataFrameWriter uses the new AppendData plan for DataSourceV2 appends * AppendData is resolved if its output columns match the incoming data frame * A new analyzer rule, ResolveOutputColumns, validates data before it is appended. This rule will add safe casts, rename columns, and checks nullability Existing tests for v2 appends. Will add AppendData tests to validate logical plan analysis. Closes apache#21305 from rdblue/SPARK-24251-add-append-data. Lead-authored-by: Ryan Blue <blue@apache.org> Co-authored-by: Ryan Blue <rdblue@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 5fef6e3) Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala
## What changes were proposed in this pull request? This is a follow-up to apache#21305 that adds a test suite for AppendData analysis. This also fixes the following problems uncovered by these tests: * Incorrect order of data types passed to `canWrite` is fixed * The field check calls `canWrite` first to ensure all errors are found * `AppendData#resolved` must check resolution of the query's attributes * Column names are quoted to show empty names ## How was this patch tested? This PR adds a test suite for AppendData analysis. Closes apache#22043 from rdblue/SPARK-24251-add-append-data-analysis-tests. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit bdd2796)
This adds a new logical plan, AppendData, that was proposed in SPARK-23521: Standardize SQL logical plans. * DataFrameWriter uses the new AppendData plan for DataSourceV2 appends * AppendData is resolved if its output columns match the incoming data frame * A new analyzer rule, ResolveOutputColumns, validates data before it is appended. This rule will add safe casts, rename columns, and checks nullability Existing tests for v2 appends. Will add AppendData tests to validate logical plan analysis. Closes apache#21305 from rdblue/SPARK-24251-add-append-data. Lead-authored-by: Ryan Blue <blue@apache.org> Co-authored-by: Ryan Blue <rdblue@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This is a follow-up to apache#21305 that adds a test suite for AppendData analysis. This also fixes the following problems uncovered by these tests: * Incorrect order of data types passed to `canWrite` is fixed * The field check calls `canWrite` first to ensure all errors are found * `AppendData#resolved` must check resolution of the query's attributes * Column names are quoted to show empty names This PR adds a test suite for AppendData analysis. Closes apache#22043 from rdblue/SPARK-24251-add-append-data-analysis-tests. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This adds a new logical plan, AppendData, that was proposed in SPARK-23521: Standardize SQL logical plans. * DataFrameWriter uses the new AppendData plan for DataSourceV2 appends * AppendData is resolved if its output columns match the incoming data frame * A new analyzer rule, ResolveOutputColumns, validates data before it is appended. This rule will add safe casts, rename columns, and checks nullability Existing tests for v2 appends. Will add AppendData tests to validate logical plan analysis. Closes apache#21305 from rdblue/SPARK-24251-add-append-data. Lead-authored-by: Ryan Blue <blue@apache.org> Co-authored-by: Ryan Blue <rdblue@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 5fef6e3) Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala
## What changes were proposed in this pull request? This is a follow-up to apache#21305 that adds a test suite for AppendData analysis. This also fixes the following problems uncovered by these tests: * Incorrect order of data types passed to `canWrite` is fixed * The field check calls `canWrite` first to ensure all errors are found * `AppendData#resolved` must check resolution of the query's attributes * Column names are quoted to show empty names ## How was this patch tested? This PR adds a test suite for AppendData analysis. Closes apache#22043 from rdblue/SPARK-24251-add-append-data-analysis-tests. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit bdd2796)
What changes were proposed in this pull request?
This adds a new logical plan, AppendData, that was proposed in SPARK-23521: Standardize SQL logical plans.
How was this patch tested?
Existing tests for v2 appends. Will add AppendData tests to validate logical plan analysis.