-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24923][SQL] Implement v2 CreateTableAsSelect #24570
Conversation
Test build #105292 has finished for PR 24570 at commit
|
Test build #105296 has finished for PR 24570 at commit
|
Test build #105297 has finished for PR 24570 at commit
|
@mccheah, @cloud-fan, here's a v2 implementation for CTAS. Please review if you have time. Thanks! |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
Outdated
Show resolved
Hide resolved
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
Outdated
Show resolved
Hide resolved
|
||
val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec, | ||
properties, provider, options, location, comment, ifNotExists) | ||
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists | ||
|
||
CreateTable(tableDesc, mode, Some(query)) | ||
|
||
case create: CreateTableAsSelectStatement => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we implement CREATE TABLE as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this PR was getting a little large, so I was going to do create table in a separate one. I can add it to this one if you'd prefer.
@cloud-fan, I've updated this PR and fixed your review comments, so please have another look. Also, I added a test suite that I accidentally didn't add, |
Test build #105364 has finished for PR 24570 at commit
|
|
||
// convert USING, LOCATION, and COMMENT clauses to table properties | ||
properties += ("provider" -> ctas.provider) | ||
ctas.comment.map(text => properties += ("comment" -> text)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if these overwrite table properties of the same type in the properties part of the SQL statement itself? Do these overwrite properties of the same name? Are these special property keys documented anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. When saving Spark tables to Hive metastore, we need to store some spark specific information in the table properties. And we always use spark.sql.
as the prefix for the property keys.
Shall we follow it and add a prefix here as well?
EDIT:
Since data source implementations need to know these properties, shall we just document these special properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added validations so that the properties and the clauses cannot both be used. Setting "comment" and using a COMMENT clause will result in an AnalysisException
.
Passing these as well-known properties was included in the SPIP, but exactly which properties are used should be documented. I'll open a documentation issues and add it as a blocker for the 3.0 release.
If we want to define a prefix for the clauses that are passed as properties, that sounds fine to me. What is a reasonable prefix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the latest changes, conflicting properties are no longer allowed.
Also, I opened https://issues.apache.org/jira/browse/SPARK-27708 to track documentation.
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
.withQueryId(UUID.randomUUID().toString) | ||
val batchWrite = builder match { | ||
case supportsSaveMode: SupportsSaveMode => | ||
supportsSaveMode.mode(SaveMode.Append).buildForBatch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why we have to specifically tell the writer to use append mode. Can you elaborate? I think I'm missing something. It would be simpler to remove this branch entirely if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SupportsSaveMode
is a hack in TableProvider
only. Seems we don't need to deal with it for TableCatalog
. Anyway it will be removed soon.
assert(!plan.resolved) | ||
assertAnalysisError(plan, Seq( | ||
"Invalid partitioning", | ||
"does_not_exist.z is missing or is in a map or array")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker, but we can improve the error message in the future. It's better to let users know which column/field is missing. For example, a.b
, it's possible that column a
exists but a
is not a struct or it doesn't have a b
field.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
Show resolved
Hide resolved
|
||
before { | ||
spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) | ||
spark.conf.set("spark.sql.default.catalog", "testcat") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it has been moved to another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has. Does this need to be removed? I made the tests that rely on it ignored, so we should just need to revert that. I'd rather keep it so we don't have to figure out why those tests are failing when the other PR is merged.
Test build #105387 has finished for PR 24570 at commit
|
thanks, merging to master! |
Thanks for reviewing, @cloud-fan and @mccheah! I'll get the next few PRs posted. |
This adds a v2 implementation for CTAS queries * Update the SQL parser to parse CREATE queries using multi-part identifiers * Update `CheckAnalysis` to validate partitioning references with the CTAS query schema * Add `CreateTableAsSelect` v2 logical plan and `CreateTableAsSelectExec` v2 physical plan * Update create conversion from `CreateTableAsSelectStatement` to support the new v2 logical plan * Update `DataSourceV2Strategy` to convert v2 CTAS logical plan to the new physical plan * Add `findNestedField` to `StructType` to support reference validation We have been running these changes in production for several months. Also: * Add a test suite `CreateTablePartitioningValidationSuite` for new analysis checks * Add a test suite for v2 SQL, `DataSourceV2SQLSuite` * Update catalyst `DDLParserSuite` to use multi-part identifiers (`Seq[String]`) * Add test cases to `PlanResolutionSuite` for v2 CTAS: known catalog and v2 source implementation Closes apache#24570 from rdblue/SPARK-24923-add-v2-ctas. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This adds a v2 implementation for CTAS queries
CheckAnalysis
to validate partitioning references with the CTAS query schemaCreateTableAsSelect
v2 logical plan andCreateTableAsSelectExec
v2 physical planCreateTableAsSelectStatement
to support the new v2 logical planDataSourceV2Strategy
to convert v2 CTAS logical plan to the new physical planfindNestedField
toStructType
to support reference validationHow was this patch tested?
We have been running these changes in production for several months. Also:
CreateTablePartitioningValidationSuite
for new analysis checksDataSourceV2SQLSuite
DDLParserSuite
to use multi-part identifiers (Seq[String]
)PlanResolutionSuite
for v2 CTAS: known catalog and v2 source implementation