Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24923][SQL] Implement v2 CreateTableAsSelect #24570

Closed
wants to merge 8 commits into from

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented May 9, 2019

What changes were proposed in this pull request?

This adds a v2 implementation for CTAS queries

  • Update the SQL parser to parse CREATE queries using multi-part identifiers
  • Update CheckAnalysis to validate partitioning references with the CTAS query schema
  • Add CreateTableAsSelect v2 logical plan and CreateTableAsSelectExec v2 physical plan
  • Update create conversion from CreateTableAsSelectStatement to support the new v2 logical plan
  • Update DataSourceV2Strategy to convert v2 CTAS logical plan to the new physical plan
  • Add findNestedField to StructType to support reference validation

How was this patch tested?

We have been running these changes in production for several months. Also:

  • Add a test suite CreateTablePartitioningValidationSuite for new analysis checks
  • Add a test suite for v2 SQL, DataSourceV2SQLSuite
  • Update catalyst DDLParserSuite to use multi-part identifiers (Seq[String])
  • Add test cases to PlanResolutionSuite for v2 CTAS: known catalog and v2 source implementation

@SparkQA
Copy link

SparkQA commented May 9, 2019

Test build #105292 has finished for PR 24570 at commit b13a8e2.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CreateTableAsSelect(
  • case class DataSourceResolution(
  • case class CreateTableAsSelectExec(

@SparkQA
Copy link

SparkQA commented May 10, 2019

Test build #105296 has finished for PR 24570 at commit a22c335.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 10, 2019

Test build #105297 has finished for PR 24570 at commit cdf3805.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented May 10, 2019

@mccheah, @cloud-fan, here's a v2 implementation for CTAS. Please review if you have time. Thanks!


val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec,
properties, provider, options, location, comment, ifNotExists)
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists

CreateTable(tableDesc, mode, Some(query))

case create: CreateTableAsSelectStatement =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we implement CREATE TABLE as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this PR was getting a little large, so I was going to do create table in a separate one. I can add it to this one if you'd prefer.

@rdblue
Copy link
Contributor Author

rdblue commented May 13, 2019

@cloud-fan, I've updated this PR and fixed your review comments, so please have another look. Also, I added a test suite that I accidentally didn't add, DataSourceV2SQLSuite. That is where I will be putting end-to-end test cases that use in-memory tables to validate the physical plans.

@SparkQA
Copy link

SparkQA commented May 13, 2019

Test build #105364 has finished for PR 24570 at commit b19a70d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// convert USING, LOCATION, and COMMENT clauses to table properties
properties += ("provider" -> ctas.provider)
ctas.comment.map(text => properties += ("comment" -> text))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if these overwrite table properties of the same type in the properties part of the SQL statement itself? Do these overwrite properties of the same name? Are these special property keys documented anywhere?

Copy link
Contributor

@cloud-fan cloud-fan May 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. When saving Spark tables to Hive metastore, we need to store some spark specific information in the table properties. And we always use spark.sql. as the prefix for the property keys.

Shall we follow it and add a prefix here as well?

EDIT:
Since data source implementations need to know these properties, shall we just document these special properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added validations so that the properties and the clauses cannot both be used. Setting "comment" and using a COMMENT clause will result in an AnalysisException.

Passing these as well-known properties was included in the SPIP, but exactly which properties are used should be documented. I'll open a documentation issues and add it as a blocker for the 3.0 release.

If we want to define a prefix for the clauses that are passed as properties, that sounds fine to me. What is a reasonable prefix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the latest changes, conflicting properties are no longer allowed.

Also, I opened https://issues.apache.org/jira/browse/SPARK-27708 to track documentation.

.withQueryId(UUID.randomUUID().toString)
val batchWrite = builder match {
case supportsSaveMode: SupportsSaveMode =>
supportsSaveMode.mode(SaveMode.Append).buildForBatch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we have to specifically tell the writer to use append mode. Can you elaborate? I think I'm missing something. It would be simpler to remove this branch entirely if possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SupportsSaveMode is a hack in TableProvider only. Seems we don't need to deal with it for TableCatalog. Anyway it will be removed soon.

assert(!plan.resolved)
assertAnalysisError(plan, Seq(
"Invalid partitioning",
"does_not_exist.z is missing or is in a map or array"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but we can improve the error message in the future. It's better to let users know which column/field is missing. For example, a.b, it's possible that column a exists but a is not a struct or it doesn't have a b field.


before {
spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
spark.conf.set("spark.sql.default.catalog", "testcat")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it has been moved to another PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has. Does this need to be removed? I made the tests that rely on it ignored, so we should just need to revert that. I'd rather keep it so we don't have to figure out why those tests are failing when the other PR is merged.

@SparkQA
Copy link

SparkQA commented May 14, 2019

Test build #105387 has finished for PR 24570 at commit 99ebc00.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 2da5b21 May 15, 2019
@rdblue
Copy link
Contributor Author

rdblue commented May 15, 2019

Thanks for reviewing, @cloud-fan and @mccheah! I'll get the next few PRs posted.

mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
This adds a v2 implementation for CTAS queries

* Update the SQL parser to parse CREATE queries using multi-part identifiers
* Update `CheckAnalysis` to validate partitioning references with the CTAS query schema
* Add `CreateTableAsSelect` v2 logical plan and `CreateTableAsSelectExec` v2 physical plan
* Update create conversion from `CreateTableAsSelectStatement` to support the new v2 logical plan
* Update `DataSourceV2Strategy` to convert v2 CTAS logical plan to the new physical plan
* Add `findNestedField` to `StructType` to support reference validation

We have been running these changes in production for several months. Also:

* Add a test suite `CreateTablePartitioningValidationSuite` for new analysis checks
* Add a test suite for v2 SQL, `DataSourceV2SQLSuite`
* Update catalyst `DDLParserSuite` to use multi-part identifiers (`Seq[String]`)
* Add test cases to `PlanResolutionSuite` for v2 CTAS: known catalog and v2 source implementation

Closes apache#24570 from rdblue/SPARK-24923-add-v2-ctas.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants