Skip to content

Commit

Permalink
[SPARK-33703][SQL] Migrate MSCK REPAIR TABLE to use UnresolvedTable t…
Browse files Browse the repository at this point in the history
…o resolve the identifier

### What changes were proposed in this pull request?

This PR proposes to migrate `MSCK REPAIR TABLE` to use `UnresolvedTable` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

Note that `MSCK REPAIR TABLE` is not supported for v2 tables.

### Why are the changes needed?

The PR makes the resolution consistent behavior consistent. For example,
```scala
sql("CREATE DATABASE test")
sql("CREATE TABLE spark_catalog.test.t (id bigint, val string) USING csv PARTITIONED BY (id)")
sql("CREATE TEMPORARY VIEW t AS SELECT 2")
sql("USE spark_catalog.test")
sql("MSCK REPAIR TABLE t") // works fine
```
, but after this PR:
```
sql("MSCK REPAIR TABLE t")
org.apache.spark.sql.AnalysisException: t is a temp view. 'MSCK REPAIR TABLE' expects a table; line 1 pos 0
```
, which is the consistent behavior with other commands.

### Does this PR introduce _any_ user-facing change?

After this PR, `MSCK REPAIR TABLE t` in the above example is resolved to a temp view `t` first instead of `spark_catalog.test.t`.

### How was this patch tested?

Updated existing tests.

Closes apache#30664 from imback82/repair_table_V2.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
imback82 authored and cloud-fan committed Dec 9, 2020
1 parent f021f6d commit 29fed23
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3547,15 +3547,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Create a [[RepairTableStatement]].
* Create a [[RepairTable]].
*
* For example:
* {{{
* MSCK REPAIR TABLE multi_part_name
* }}}
*/
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
RepairTable(
UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier()), "MSCK REPAIR TABLE"))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,6 @@ case class CreateNamespaceStatement(
*/
case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends ParsedStatement

/**
* A REPAIR TABLE statement, as parsed from SQL
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement

/**
* A TRUNCATE TABLE statement, as parsed from SQL
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,10 @@ case class DropView(
ifExists: Boolean) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the MSCK REPAIR TABLE command.
*/
case class RepairTable(child: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,7 @@ class DDLParserSuite extends AnalysisTest {
test("MSCK REPAIR TABLE") {
comparePlans(
parsePlan("MSCK REPAIR TABLE a.b.c"),
RepairTableStatement(Seq("a", "b", "c")))
RepairTable(UnresolvedTable(Seq("a", "b", "c"), "MSCK REPAIR TABLE")))
}

test("LOAD DATA INTO table") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,8 @@ class ResolveSessionCatalog(
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns)

case RepairTableStatement(tbl) =>
val v1TableName = parseV1Table(tbl, "MSCK REPAIR TABLE")
AlterTableRecoverPartitionsCommand(
v1TableName.asTableIdentifier,
"MSCK REPAIR TABLE")
case RepairTable(ResolvedV1TableIdentifier(ident)) =>
AlterTableRecoverPartitionsCommand(ident.asTableIdentifier, "MSCK REPAIR TABLE")

case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
LoadDataCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
table,
pattern.map(_.asInstanceOf[ResolvedPartitionSpec])) :: Nil

case RepairTable(_: ResolvedTable) =>
throw new AnalysisException("MSCK REPAIR TABLE is not supported for v2 tables.")

case _ => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2011,7 +2011,7 @@ class DataSourceV2SQLSuite
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testV1Command("MSCK REPAIR TABLE", t)
testNotSupportedV2Command("MSCK REPAIR TABLE", t)
}
}

Expand Down Expand Up @@ -2612,13 +2612,6 @@ class DataSourceV2SQLSuite
assert(e.message.contains(s"$cmdStr is not supported for v2 tables"))
}

private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
}
assert(e.message.contains(s"$sqlCommand is only supported with v1 tables"))
}

private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)
Expand Down

0 comments on commit 29fed23

Please sign in to comment.