Skip to content

Commit

Permalink
Merge branch 'master' into repair_table_V2
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Dec 8, 2020
2 parents 3e8aa99 + a093d6f commit f6b776e
Show file tree
Hide file tree
Showing 84 changed files with 473 additions and 303 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ jobs:
- name: Build with SBT
run: |
./dev/change-scala-version.sh 2.13
./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pscala-2.13 compile test:compile
./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pscala-2.13 compile test:compile
hadoop-2:
name: Hadoop 2 build with SBT
Expand Down
4 changes: 4 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ license: |
* Table of contents
{:toc}

## Upgrading from Spark SQL 3.1 to 3.2

- In Spark 3.2, `spark.sql.adaptive.enabled` is enabled by default. To restore the behavior before Spark 3.2, you can set `spark.sql.adaptive.enabled` to `false`.

## Upgrading from Spark SQL 3.0 to 3.1

- In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ statement
| SHOW TABLES ((FROM | IN) multipartIdentifier)?
(LIKE? pattern=STRING)? #showTables
| SHOW TABLE EXTENDED ((FROM | IN) ns=multipartIdentifier)?
LIKE pattern=STRING partitionSpec? #showTable
LIKE pattern=STRING partitionSpec? #showTableExtended
| SHOW TBLPROPERTIES table=multipartIdentifier
('(' key=tablePropertyKey ')')? #showTblProperties
| SHOW COLUMNS (FROM | IN) table=multipartIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ object AnalysisContext {
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
*/
class Analyzer(override val catalogManager: CatalogManager)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog with SQLConfHelper {
extends RuleExecutor[LogicalPlan] with CheckAnalysis with SQLConfHelper {

private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog

Expand Down Expand Up @@ -277,7 +277,7 @@ class Analyzer(override val catalogManager: CatalogManager)
TypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once,
Seq(ResolveNoopDropTable) ++
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Batch("Remove Unresolved Hints", Once,
Expand Down Expand Up @@ -847,6 +847,8 @@ class Analyzer(override val catalogManager: CatalogManager)
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case s @ ShowTables(UnresolvedNamespace(Seq()), _) =>
s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _) =>
s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
case s @ ShowViews(UnresolvedNamespace(Seq()), _) =>
s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
case UnresolvedNamespace(Seq()) =>
Expand Down Expand Up @@ -887,6 +889,11 @@ class Analyzer(override val catalogManager: CatalogManager)
u.failAnalysis(s"${ident.quoted} is a temp view. '$cmd' expects a table")
}
u
case u @ UnresolvedView(ident, _, _) =>
lookupTempView(ident).map { _ =>
ResolvedView(ident.asIdentifier, isTemp = true)
}
.getOrElse(u)
case u @ UnresolvedTableOrView(ident, cmd, allowTempView) =>
lookupTempView(ident)
.map { _ =>
Expand Down Expand Up @@ -1111,6 +1118,14 @@ class Analyzer(override val catalogManager: CatalogManager)
case table => table
}.getOrElse(u)

case u @ UnresolvedView(identifier, cmd, relationTypeMismatchHint) =>
lookupTableOrView(identifier).map {
case v: ResolvedView => v
case _ =>
u.failAnalysis(s"${identifier.quoted} is a table. '$cmd' expects a view." +
relationTypeMismatchHint.map(" " + _).getOrElse(""))
}.getOrElse(u)

case u @ UnresolvedTableOrView(identifier, _, _) =>
lookupTableOrView(identifier).getOrElse(u)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table}
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* Throws user facing errors when passed invalid queries that fail to analyze.
*/
trait CheckAnalysis extends PredicateHelper {
trait CheckAnalysis extends PredicateHelper with LookupCatalog {

protected def isView(nameParts: Seq[String]): Boolean

Expand Down Expand Up @@ -104,6 +104,15 @@ trait CheckAnalysis extends PredicateHelper {
case u: UnresolvedTable =>
u.failAnalysis(s"Table not found for '${u.commandName}': ${u.multipartIdentifier.quoted}")

case u @ UnresolvedView(NonSessionCatalogAndIdentifier(catalog, ident), cmd, _) =>
u.failAnalysis(
s"Cannot specify catalog `${catalog.name}` for view ${ident.quoted} " +
"because view support in v2 catalog has not been implemented yet. " +
s"$cmd expects a view.")

case u: UnresolvedView =>
u.failAnalysis(s"View not found for '${u.commandName}': ${u.multipartIdentifier.quoted}")

case u: UnresolvedTableOrView =>
val viewStr = if (u.allowTempView) "view" else "permanent view"
u.failAnalysis(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
writeOptions = c.writeOptions,
orCreate = c.orCreate)

case DropViewStatement(NonSessionCatalogAndTable(catalog, viewName), _) =>
throw new AnalysisException(
s"Can not specify catalog `${catalog.name}` for view ${viewName.quoted} " +
s"because view support in catalog has not been implemented yet")

case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
if !isSessionCatalog(catalog) =>
CreateNamespace(catalog.asNamespaceCatalog, ns, c.ifNotExists, c.properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopDropTable}
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand}
import org.apache.spark.sql.catalyst.rules.Rule

/**
* A rule for handling [[DropTable]] logical plan when the table or temp view is not resolved.
* If "ifExists" flag is set to true, the plan is resolved to [[NoopDropTable]],
* which is a no-op command.
* A rule for handling commands when the table or temp view is not resolved.
* These commands support a flag, "ifExists", so that they do not fail when a relation is not
* resolved. If the "ifExists" flag is set to true. the plan is resolved to [[NoopCommand]],
*/
object ResolveNoopDropTable extends Rule[LogicalPlan] {
object ResolveCommandsWithIfExists extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists =>
NoopDropTable(u.multipartIdentifier)
NoopCommand("DROP TABLE", u.multipartIdentifier)
case DropView(u: UnresolvedView, ifExists) if ifExists =>
NoopCommand("DROP VIEW", u.multipartIdentifier)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ case class UnresolvedTable(
override def output: Seq[Attribute] = Nil
}

/**
* Holds the name of a view that has yet to be looked up in a catalog. It will be resolved to
* [[ResolvedView]] during analysis.
*/
case class UnresolvedView(
multipartIdentifier: Seq[String],
commandName: String,
relationTypeMismatchHint: Option[String] = None) extends LeafNode {
override lazy val resolved: Boolean = false

override def output: Seq[Attribute] = Nil
}

/**
* Holds the name of a table or view that has yet to be looked up in a catalog. It will
* be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,27 +543,33 @@ object LikeSimplification extends Rule[LogicalPlan] {
private val equalTo = "([^_%]*)".r

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Like(input, Literal(pattern, StringType), escapeChar) =>
case l @ Like(input, Literal(pattern, StringType), escapeChar) =>
if (pattern == null) {
// If pattern is null, return null value directly, since "col like null" == null.
Literal(null, BooleanType)
} else {
val escapeStr = String.valueOf(escapeChar)
pattern.toString match {
case startsWith(prefix) if !prefix.endsWith(escapeStr) =>
// There are three different situations when pattern containing escapeChar:
// 1. pattern contains invalid escape sequence, e.g. 'm\aca'
// 2. pattern contains escaped wildcard character, e.g. 'ma\%ca'
// 3. pattern contains escaped escape character, e.g. 'ma\\ca'
// Although there are patterns can be optimized if we handle the escape first, we just
// skip this rule if pattern contains any escapeChar for simplicity.
case p if p.contains(escapeChar) => l
case startsWith(prefix) =>
StartsWith(input, Literal(prefix))
case endsWith(postfix) =>
EndsWith(input, Literal(postfix))
// 'a%a' pattern is basically same with 'a%' && '%a'.
// However, the additional `Length` condition is required to prevent 'a' match 'a%a'.
case startsAndEndsWith(prefix, postfix) if !prefix.endsWith(escapeStr) =>
case startsAndEndsWith(prefix, postfix) =>
And(GreaterThanOrEqual(Length(input), Literal(prefix.length + postfix.length)),
And(StartsWith(input, Literal(prefix)), EndsWith(input, Literal(postfix))))
case contains(infix) if !infix.endsWith(escapeStr) =>
case contains(infix) =>
Contains(input, Literal(infix))
case equalTo(str) =>
EqualTo(input, Literal(str))
case _ => Like(input, Literal.create(pattern, StringType), escapeChar)
case _ => l
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3155,11 +3155,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Create a [[DropViewStatement]] command.
* Create a [[DropView]] command.
*/
override def visitDropView(ctx: DropViewContext): AnyRef = withOrigin(ctx) {
DropViewStatement(
visitMultipartIdentifier(ctx.multipartIdentifier()),
DropView(
UnresolvedView(
visitMultipartIdentifier(ctx.multipartIdentifier()),
"DROP VIEW",
Some("Please use DROP TABLE instead.")),
ctx.EXISTS != null)
}

Expand Down Expand Up @@ -3190,13 +3193,18 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Create a [[ShowTableStatement]] command.
* Create a [[ShowTableExtended]] command.
*/
override def visitShowTable(ctx: ShowTableContext): LogicalPlan = withOrigin(ctx) {
ShowTableStatement(
Option(ctx.ns).map(visitMultipartIdentifier),
override def visitShowTableExtended(
ctx: ShowTableExtendedContext): LogicalPlan = withOrigin(ctx) {
val multiPart = Option(ctx.multipartIdentifier).map(visitMultipartIdentifier)
val partitionKeys = Option(ctx.partitionSpec).map { specCtx =>
UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(specCtx), None)
}
ShowTableExtended(
UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])),
string(ctx.pattern),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
partitionKeys)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,6 @@ case class AlterViewAsStatement(
originalText: String,
query: LogicalPlan) extends ParsedStatement

/**
* A DROP VIEW statement, as parsed from SQL.
*/
case class DropViewStatement(
viewName: Seq[String],
ifExists: Boolean) extends ParsedStatement

/**
* An INSERT INTO statement, as parsed from SQL.
*
Expand Down Expand Up @@ -377,15 +370,6 @@ case class InsertIntoStatement(
override def children: Seq[LogicalPlan] = query :: Nil
}

/**
* A SHOW TABLE EXTENDED statement, as parsed from SQL.
*/
case class ShowTableStatement(
namespace: Option[Seq[String]],
pattern: String,
partitionSpec: Option[TablePartitionSpec])
extends ParsedStatement

/**
* A CREATE NAMESPACE statement, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType}
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}

/**
* Base trait for DataSourceV2 write commands
Expand Down Expand Up @@ -419,9 +419,11 @@ case class DropTable(
}

/**
* The logical plan for handling non-existing table for DROP TABLE command.
* The logical plan for no-op command handling non-existing table.
*/
case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command
case class NoopCommand(
commandName: String,
multipartIdentifier: Seq[String]) extends Command

/**
* The logical plan of the ALTER TABLE command.
Expand Down Expand Up @@ -466,7 +468,7 @@ case class RenameTable(
}

/**
* The logical plan of the SHOW TABLE command.
* The logical plan of the SHOW TABLES command.
*/
case class ShowTables(
namespace: LogicalPlan,
Expand All @@ -478,6 +480,22 @@ case class ShowTables(
AttributeReference("tableName", StringType, nullable = false)())
}

/**
* The logical plan of the SHOW TABLE EXTENDED command.
*/
case class ShowTableExtended(
namespace: LogicalPlan,
pattern: String,
partitionSpec: Option[PartitionSpec]) extends Command {
override def children: Seq[LogicalPlan] = namespace :: Nil

override val output: Seq[Attribute] = Seq(
AttributeReference("namespace", StringType, nullable = false)(),
AttributeReference("tableName", StringType, nullable = false)(),
AttributeReference("isTemporary", BooleanType, nullable = false)(),
AttributeReference("information", StringType, nullable = false)())
}

/**
* The logical plan of the SHOW VIEWS command.
*
Expand Down Expand Up @@ -709,6 +727,15 @@ case class ShowPartitions(
AttributeReference("partition", StringType, nullable = false)())
}

/**
* The logical plan of the DROP VIEW command.
*/
case class DropView(
child: LogicalPlan,
ifExists: Boolean) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the MSCK REPAIR TABLE command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ object SQLConf {
"middle of query execution, based on accurate runtime statistics.")
.version("1.6.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply")
.internal()
Expand Down
Loading

0 comments on commit f6b776e

Please sign in to comment.