diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 72b2caf907151..e40d6362fd23f 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -414,7 +414,7 @@ jobs: - name: Build with SBT run: | ./dev/change-scala-version.sh 2.13 - ./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pscala-2.13 compile test:compile + ./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pscala-2.13 compile test:compile hadoop-2: name: Hadoop 2 build with SBT diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 2c86e7a932637..65a769da70aea 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -22,6 +22,10 @@ license: | * Table of contents {:toc} +## Upgrading from Spark SQL 3.1 to 3.2 + + - In Spark 3.2, `spark.sql.adaptive.enabled` is enabled by default. To restore the behavior before Spark 3.2, you can set `spark.sql.adaptive.enabled` to `false`. + ## Upgrading from Spark SQL 3.0 to 3.1 - In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`. diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index a23994f456f75..b08451d8a6cfa 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -198,7 +198,7 @@ statement | SHOW TABLES ((FROM | IN) multipartIdentifier)? (LIKE? pattern=STRING)? #showTables | SHOW TABLE EXTENDED ((FROM | IN) ns=multipartIdentifier)? - LIKE pattern=STRING partitionSpec? #showTable + LIKE pattern=STRING partitionSpec? #showTableExtended | SHOW TBLPROPERTIES table=multipartIdentifier ('(' key=tablePropertyKey ')')? #showTblProperties | SHOW COLUMNS (FROM | IN) table=multipartIdentifier diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6541961f5613e..6b0cf4be7de74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -150,7 +150,7 @@ object AnalysisContext { * [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]]. */ class Analyzer(override val catalogManager: CatalogManager) - extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog with SQLConfHelper { + extends RuleExecutor[LogicalPlan] with CheckAnalysis with SQLConfHelper { private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog @@ -277,7 +277,7 @@ class Analyzer(override val catalogManager: CatalogManager) TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, - Seq(ResolveNoopDropTable) ++ + Seq(ResolveCommandsWithIfExists) ++ postHocResolutionRules: _*), Batch("Normalize Alter Table", Once, ResolveAlterTableChanges), Batch("Remove Unresolved Hints", Once, @@ -847,6 +847,8 @@ class Analyzer(override val catalogManager: CatalogManager) def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case s @ ShowTables(UnresolvedNamespace(Seq()), _) => s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace)) + case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _) => + s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace)) case s @ ShowViews(UnresolvedNamespace(Seq()), _) => s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace)) case UnresolvedNamespace(Seq()) => @@ -887,6 +889,11 @@ class Analyzer(override val catalogManager: CatalogManager) u.failAnalysis(s"${ident.quoted} is a temp view. '$cmd' expects a table") } u + case u @ UnresolvedView(ident, _, _) => + lookupTempView(ident).map { _ => + ResolvedView(ident.asIdentifier, isTemp = true) + } + .getOrElse(u) case u @ UnresolvedTableOrView(ident, cmd, allowTempView) => lookupTempView(ident) .map { _ => @@ -1111,6 +1118,14 @@ class Analyzer(override val catalogManager: CatalogManager) case table => table }.getOrElse(u) + case u @ UnresolvedView(identifier, cmd, relationTypeMismatchHint) => + lookupTableOrView(identifier).map { + case v: ResolvedView => v + case _ => + u.failAnalysis(s"${identifier.quoted} is a table. '$cmd' expects a view." + + relationTypeMismatchHint.map(" " + _).getOrElse("")) + }.getOrElse(u) + case u @ UnresolvedTableOrView(identifier, _, _) => lookupTableOrView(identifier).getOrElse(u) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 9f5eefc744135..39cdea2bd4d2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils} -import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table} +import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table} import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -34,7 +34,7 @@ import org.apache.spark.sql.types._ /** * Throws user facing errors when passed invalid queries that fail to analyze. */ -trait CheckAnalysis extends PredicateHelper { +trait CheckAnalysis extends PredicateHelper with LookupCatalog { protected def isView(nameParts: Seq[String]): Boolean @@ -104,6 +104,15 @@ trait CheckAnalysis extends PredicateHelper { case u: UnresolvedTable => u.failAnalysis(s"Table not found for '${u.commandName}': ${u.multipartIdentifier.quoted}") + case u @ UnresolvedView(NonSessionCatalogAndIdentifier(catalog, ident), cmd, _) => + u.failAnalysis( + s"Cannot specify catalog `${catalog.name}` for view ${ident.quoted} " + + "because view support in v2 catalog has not been implemented yet. " + + s"$cmd expects a view.") + + case u: UnresolvedView => + u.failAnalysis(s"View not found for '${u.commandName}': ${u.multipartIdentifier.quoted}") + case u: UnresolvedTableOrView => val viewStr = if (u.allowTempView) "view" else "permanent view" u.failAnalysis( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 6d89414ba106d..b4dfee1330036 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -187,11 +187,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) writeOptions = c.writeOptions, orCreate = c.orCreate) - case DropViewStatement(NonSessionCatalogAndTable(catalog, viewName), _) => - throw new AnalysisException( - s"Can not specify catalog `${catalog.name}` for view ${viewName.quoted} " + - s"because view support in catalog has not been implemented yet") - case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _) if !isSessionCatalog(catalog) => CreateNamespace(catalog.asNamespaceCatalog, ns, c.ifNotExists, c.properties) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveNoopDropTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala similarity index 63% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveNoopDropTable.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala index f9da9174f85e6..196a07a7f9904 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveNoopDropTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala @@ -17,17 +17,19 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopDropTable} +import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand} import org.apache.spark.sql.catalyst.rules.Rule /** - * A rule for handling [[DropTable]] logical plan when the table or temp view is not resolved. - * If "ifExists" flag is set to true, the plan is resolved to [[NoopDropTable]], - * which is a no-op command. + * A rule for handling commands when the table or temp view is not resolved. + * These commands support a flag, "ifExists", so that they do not fail when a relation is not + * resolved. If the "ifExists" flag is set to true. the plan is resolved to [[NoopCommand]], */ -object ResolveNoopDropTable extends Rule[LogicalPlan] { +object ResolveCommandsWithIfExists extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists => - NoopDropTable(u.multipartIdentifier) + NoopCommand("DROP TABLE", u.multipartIdentifier) + case DropView(u: UnresolvedView, ifExists) if ifExists => + NoopCommand("DROP VIEW", u.multipartIdentifier) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 1518f064d78db..2737b5d58bf42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -45,6 +45,19 @@ case class UnresolvedTable( override def output: Seq[Attribute] = Nil } +/** + * Holds the name of a view that has yet to be looked up in a catalog. It will be resolved to + * [[ResolvedView]] during analysis. + */ +case class UnresolvedView( + multipartIdentifier: Seq[String], + commandName: String, + relationTypeMismatchHint: Option[String] = None) extends LeafNode { + override lazy val resolved: Boolean = false + + override def output: Seq[Attribute] = Nil +} + /** * Holds the name of a table or view that has yet to be looked up in a catalog. It will * be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 4cdaf10dd3c60..7666c4a53e5dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -543,27 +543,33 @@ object LikeSimplification extends Rule[LogicalPlan] { private val equalTo = "([^_%]*)".r def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { - case Like(input, Literal(pattern, StringType), escapeChar) => + case l @ Like(input, Literal(pattern, StringType), escapeChar) => if (pattern == null) { // If pattern is null, return null value directly, since "col like null" == null. Literal(null, BooleanType) } else { - val escapeStr = String.valueOf(escapeChar) pattern.toString match { - case startsWith(prefix) if !prefix.endsWith(escapeStr) => + // There are three different situations when pattern containing escapeChar: + // 1. pattern contains invalid escape sequence, e.g. 'm\aca' + // 2. pattern contains escaped wildcard character, e.g. 'ma\%ca' + // 3. pattern contains escaped escape character, e.g. 'ma\\ca' + // Although there are patterns can be optimized if we handle the escape first, we just + // skip this rule if pattern contains any escapeChar for simplicity. + case p if p.contains(escapeChar) => l + case startsWith(prefix) => StartsWith(input, Literal(prefix)) case endsWith(postfix) => EndsWith(input, Literal(postfix)) // 'a%a' pattern is basically same with 'a%' && '%a'. // However, the additional `Length` condition is required to prevent 'a' match 'a%a'. - case startsAndEndsWith(prefix, postfix) if !prefix.endsWith(escapeStr) => + case startsAndEndsWith(prefix, postfix) => And(GreaterThanOrEqual(Length(input), Literal(prefix.length + postfix.length)), And(StartsWith(input, Literal(prefix)), EndsWith(input, Literal(postfix)))) - case contains(infix) if !infix.endsWith(escapeStr) => + case contains(infix) => Contains(input, Literal(infix)) case equalTo(str) => EqualTo(input, Literal(str)) - case _ => Like(input, Literal.create(pattern, StringType), escapeChar) + case _ => l } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index afa9764939fa2..7787e199d3770 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3155,11 +3155,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a [[DropViewStatement]] command. + * Create a [[DropView]] command. */ override def visitDropView(ctx: DropViewContext): AnyRef = withOrigin(ctx) { - DropViewStatement( - visitMultipartIdentifier(ctx.multipartIdentifier()), + DropView( + UnresolvedView( + visitMultipartIdentifier(ctx.multipartIdentifier()), + "DROP VIEW", + Some("Please use DROP TABLE instead.")), ctx.EXISTS != null) } @@ -3190,13 +3193,18 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a [[ShowTableStatement]] command. + * Create a [[ShowTableExtended]] command. */ - override def visitShowTable(ctx: ShowTableContext): LogicalPlan = withOrigin(ctx) { - ShowTableStatement( - Option(ctx.ns).map(visitMultipartIdentifier), + override def visitShowTableExtended( + ctx: ShowTableExtendedContext): LogicalPlan = withOrigin(ctx) { + val multiPart = Option(ctx.multipartIdentifier).map(visitMultipartIdentifier) + val partitionKeys = Option(ctx.partitionSpec).map { specCtx => + UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(specCtx), None) + } + ShowTableExtended( + UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])), string(ctx.pattern), - Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) + partitionKeys) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 95ef304db57cd..b731b8a2fd8fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -338,13 +338,6 @@ case class AlterViewAsStatement( originalText: String, query: LogicalPlan) extends ParsedStatement -/** - * A DROP VIEW statement, as parsed from SQL. - */ -case class DropViewStatement( - viewName: Seq[String], - ifExists: Boolean) extends ParsedStatement - /** * An INSERT INTO statement, as parsed from SQL. * @@ -377,15 +370,6 @@ case class InsertIntoStatement( override def children: Seq[LogicalPlan] = query :: Nil } -/** - * A SHOW TABLE EXTENDED statement, as parsed from SQL. - */ -case class ShowTableStatement( - namespace: Option[Seq[String]], - pattern: String, - partitionSpec: Option[TablePartitionSpec]) - extends ParsedStatement - /** * A CREATE NAMESPACE statement, as parsed from SQL. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 2085b4454c4e1..e014048f723f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType} +import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType} /** * Base trait for DataSourceV2 write commands @@ -419,9 +419,11 @@ case class DropTable( } /** - * The logical plan for handling non-existing table for DROP TABLE command. + * The logical plan for no-op command handling non-existing table. */ -case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command +case class NoopCommand( + commandName: String, + multipartIdentifier: Seq[String]) extends Command /** * The logical plan of the ALTER TABLE command. @@ -466,7 +468,7 @@ case class RenameTable( } /** - * The logical plan of the SHOW TABLE command. + * The logical plan of the SHOW TABLES command. */ case class ShowTables( namespace: LogicalPlan, @@ -478,6 +480,22 @@ case class ShowTables( AttributeReference("tableName", StringType, nullable = false)()) } +/** + * The logical plan of the SHOW TABLE EXTENDED command. + */ +case class ShowTableExtended( + namespace: LogicalPlan, + pattern: String, + partitionSpec: Option[PartitionSpec]) extends Command { + override def children: Seq[LogicalPlan] = namespace :: Nil + + override val output: Seq[Attribute] = Seq( + AttributeReference("namespace", StringType, nullable = false)(), + AttributeReference("tableName", StringType, nullable = false)(), + AttributeReference("isTemporary", BooleanType, nullable = false)(), + AttributeReference("information", StringType, nullable = false)()) +} + /** * The logical plan of the SHOW VIEWS command. * @@ -709,6 +727,15 @@ case class ShowPartitions( AttributeReference("partition", StringType, nullable = false)()) } +/** + * The logical plan of the DROP VIEW command. + */ +case class DropView( + child: LogicalPlan, + ifExists: Boolean) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} + /** * The logical plan of the MSCK REPAIR TABLE command. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bc62213bdb740..11fe6c7894f76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -404,7 +404,7 @@ object SQLConf { "middle of query execution, based on accurate runtime statistics.") .version("1.6.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply") .internal() diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LikeSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LikeSimplificationSuite.scala index 436f62e4225c8..1812dce0da426 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LikeSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LikeSimplificationSuite.scala @@ -116,4 +116,52 @@ class LikeSimplificationSuite extends PlanTest { val optimized2 = Optimize.execute(originalQuery2.analyze) comparePlans(optimized2, originalQuery2.analyze) } + + test("SPARK-33677: LikeSimplification should be skipped if pattern contains any escapeChar") { + val originalQuery1 = + testRelation + .where(('a like "abc%") || ('a like "\\abc%")) + val optimized1 = Optimize.execute(originalQuery1.analyze) + val correctAnswer1 = testRelation + .where(StartsWith('a, "abc") || ('a like "\\abc%")) + .analyze + comparePlans(optimized1, correctAnswer1) + + val originalQuery2 = + testRelation + .where(('a like "%xyz") || ('a like "%xyz\\")) + val optimized2 = Optimize.execute(originalQuery2.analyze) + val correctAnswer2 = testRelation + .where(EndsWith('a, "xyz") || ('a like "%xyz\\")) + .analyze + comparePlans(optimized2, correctAnswer2) + + val originalQuery3 = + testRelation + .where(('a like ("@bc%def", '@')) || ('a like "abc%def")) + val optimized3 = Optimize.execute(originalQuery3.analyze) + val correctAnswer3 = testRelation + .where(('a like ("@bc%def", '@')) || + (Length('a) >= 6 && (StartsWith('a, "abc") && EndsWith('a, "def")))) + .analyze + comparePlans(optimized3, correctAnswer3) + + val originalQuery4 = + testRelation + .where(('a like "%mn%") || ('a like ("%mn%", '%'))) + val optimized4 = Optimize.execute(originalQuery4.analyze) + val correctAnswer4 = testRelation + .where(Contains('a, "mn") || ('a like ("%mn%", '%'))) + .analyze + comparePlans(optimized4, correctAnswer4) + + val originalQuery5 = + testRelation + .where(('a like "abc") || ('a like ("abbc", 'b'))) + val optimized5 = Optimize.execute(originalQuery5.analyze) + val correctAnswer5 = testRelation + .where(('a === "abc") || ('a like ("abbc", 'b'))) + .analyze + comparePlans(optimized5, correctAnswer5) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index bff49bb61fe2d..947154eae12c8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -721,13 +721,18 @@ class DDLParserSuite extends AnalysisTest { } test("drop view") { + val cmd = "DROP VIEW" + val hint = Some("Please use DROP TABLE instead.") parseCompare(s"DROP VIEW testcat.db.view", - DropViewStatement(Seq("testcat", "db", "view"), ifExists = false)) - parseCompare(s"DROP VIEW db.view", DropViewStatement(Seq("db", "view"), ifExists = false)) + DropView(UnresolvedView(Seq("testcat", "db", "view"), cmd, hint), ifExists = false)) + parseCompare(s"DROP VIEW db.view", + DropView(UnresolvedView(Seq("db", "view"), cmd, hint), ifExists = false)) parseCompare(s"DROP VIEW IF EXISTS db.view", - DropViewStatement(Seq("db", "view"), ifExists = true)) - parseCompare(s"DROP VIEW view", DropViewStatement(Seq("view"), ifExists = false)) - parseCompare(s"DROP VIEW IF EXISTS view", DropViewStatement(Seq("view"), ifExists = true)) + DropView(UnresolvedView(Seq("db", "view"), cmd, hint), ifExists = true)) + parseCompare(s"DROP VIEW view", + DropView(UnresolvedView(Seq("view"), cmd, hint), ifExists = false)) + parseCompare(s"DROP VIEW IF EXISTS view", + DropView(UnresolvedView(Seq("view"), cmd, hint), ifExists = true)) } private def testCreateOrReplaceDdl( diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js index 301183f749a84..d1def1b0a42ff 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js @@ -87,14 +87,14 @@ function preprocessGraphLayout(g) { var node = g.node(nodes[i]); node.padding = "5"; - var firstSearator; + var firstSeparator; var secondSeparator; var splitter; if (node.isCluster) { - firstSearator = secondSeparator = labelSeparator; + firstSeparator = secondSeparator = labelSeparator; splitter = "\\n"; } else { - firstSearator = ""; + firstSeparator = ""; secondSeparator = ""; splitter = "
"; } @@ -104,7 +104,7 @@ function preprocessGraphLayout(g) { if (newTexts) { node.label = node.label.replace( newTexts[0], - newTexts[1] + firstSearator + newTexts[2] + secondSeparator + newTexts[3]); + newTexts[1] + firstSeparator + newTexts[2] + secondSeparator + newTexts[3]); } }); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 05d6647afd958..6afbbce3ff8d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1363,7 +1363,7 @@ class Dataset[T] private[sql]( // Attach the dataset id and column position to the column reference, so that we can detect // ambiguous self-join correctly. See the rule `DetectAmbiguousSelfJoin`. // This must be called before we return a `Column` that contains `AttributeReference`. - // Note that, the metadata added here are only avaiable in the analyzer, as the analyzer rule + // Note that, the metadata added here are only available in the analyzer, as the analyzer rule // `DetectAmbiguousSelfJoin` will remove it. private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = { val newExpr = expr transform { @@ -1665,10 +1665,10 @@ class Dataset[T] private[sql]( * See [[RelationalGroupedDataset]] for all the available aggregate functions. * * {{{ - * // Compute the average for all numeric columns rolluped by department and group. + * // Compute the average for all numeric columns rolled up by department and group. * ds.rollup($"department", $"group").avg() * - * // Compute the max age and average salary, rolluped by department and gender. + * // Compute the max age and average salary, rolled up by department and gender. * ds.rollup($"department", $"gender").agg(Map( * "salary" -> "avg", * "age" -> "max" @@ -1794,10 +1794,10 @@ class Dataset[T] private[sql]( * (i.e. cannot construct expressions). * * {{{ - * // Compute the average for all numeric columns rolluped by department and group. + * // Compute the average for all numeric columns rolled up by department and group. * ds.rollup("department", "group").avg() * - * // Compute the max age and average salary, rolluped by department and gender. + * // Compute the max age and average salary, rolled up by department and gender. * ds.rollup($"department", $"gender").agg(Map( * "salary" -> "avg", * "age" -> "max" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index e557d18bad8b1..817a63aa9aa6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -352,9 +352,8 @@ class ResolveSessionCatalog( } DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = false, purge = purge) - // v1 DROP TABLE supports temp view. - case DropViewStatement(TempViewOrV1Table(name), ifExists) => - DropTableCommand(name.asTableIdentifier, ifExists, isView = true, purge = false) + case DropView(r: ResolvedView, ifExists) => + DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = true, purge = false) case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _) if isSessionCatalog(catalog) => @@ -383,14 +382,20 @@ class ResolveSessionCatalog( } ShowTablesCommand(Some(ns.head), pattern) - case ShowTableStatement(ns, pattern, partitionsSpec) => - val db = ns match { - case Some(ns) if ns.length != 1 => - throw new AnalysisException( - s"The database name is not valid: ${ns.quoted}") - case _ => ns.map(_.head) + case ShowTableExtended( + SessionCatalogAndNamespace(_, ns), + pattern, + partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _)))) => + assert(ns.nonEmpty) + if (ns.length != 1) { + throw new AnalysisException( + s"The database name is not valid: ${ns.quoted}") } - ShowTablesCommand(db, Some(pattern), true, partitionsSpec) + ShowTablesCommand( + databaseName = Some(ns.head), + tableIdentifierPattern = Some(pattern), + isExtended = true, + partitionSpec.map(_.asInstanceOf[UnresolvedPartitionSpec].spec)) // ANALYZE TABLE works on permanent views if the views are cached. case AnalyzeTable(ResolvedV1TableOrViewIdentifier(ident), partitionSpec, noScan) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 44636beeec7fc..df3b9f2a4e9cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -284,7 +284,7 @@ case class FileSourceScanExec( // // Sort ordering would be over the prefix subset of `sort columns` being read // from the table. - // eg. + // e.g. // Assume (col0, col2, col3) are the columns read from the table // If sort columns are (col0, col1), then sort ordering would be considered as (col0) // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2 @@ -379,12 +379,12 @@ case class FileSourceScanExec( case (key, _) if (key.equals("Location")) => val location = relation.location val numPaths = location.rootPaths.length - val abbreviatedLoaction = if (numPaths <= 1) { + val abbreviatedLocation = if (numPaths <= 1) { location.rootPaths.mkString("[", ", ", "]") } else { "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]" } - s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLoaction)}" + s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLocation)}" case (key, value) => s"$key: ${redact(value)}" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index b54bd6a579b66..20e6fb6f96eaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -28,14 +28,14 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveS object ExplainUtils extends AdaptiveSparkPlanHelper { /** * Given a input physical plan, performs the following tasks. - * 1. Computes the operator id for current operator and records it in the operaror + * 1. Computes the operator id for current operator and records it in the operator * by setting a tag. * 2. Computes the whole stage codegen id for current operator and records it in the * operator by setting a tag. * 3. Generate the two part explain output for this plan. * 1. First part explains the operator tree with each operator tagged with an unique * identifier. - * 2. Second part explans each operator in a verbose manner. + * 2. Second part explains each operator in a verbose manner. * * Note : This function skips over subqueries. They are handled by its caller. * @@ -117,7 +117,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { } /** - * Traverses the supplied input plan in a bottem-up fashion does the following : + * Traverses the supplied input plan in a bottom-up fashion does the following : * 1. produces a map : operator identifier -> operator * 2. Records the operator id via setting a tag in the operator. * Note : @@ -210,7 +210,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { /** * Given a input plan, returns an array of tuples comprising of : - * 1. Hosting opeator id. + * 1. Hosting operator id. * 2. Hosting expression * 3. Subquery plan */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala index 993627847c08c..c5e5de588ba9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala @@ -87,7 +87,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray( def isEmpty: Boolean = numRows == 0 /** - * Clears up resources (eg. memory) held by the backing storage + * Clears up resources (e.g. memory) held by the backing storage */ def clear(): Unit = { if (spillableArray != null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c82e3818b48cc..7a31b0dcdd43d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -386,25 +386,25 @@ class SparkSqlAstBuilder extends AstBuilder { * - '/path/to/fileOrJar' */ override def visitManageResource(ctx: ManageResourceContext): LogicalPlan = withOrigin(ctx) { - val mayebePaths = if (ctx.STRING != null) string(ctx.STRING) else remainder(ctx.identifier).trim + val maybePaths = if (ctx.STRING != null) string(ctx.STRING) else remainder(ctx.identifier).trim ctx.op.getType match { case SqlBaseParser.ADD => ctx.identifier.getText.toLowerCase(Locale.ROOT) match { - case "file" => AddFileCommand(mayebePaths) - case "jar" => AddJarCommand(mayebePaths) + case "file" => AddFileCommand(maybePaths) + case "jar" => AddJarCommand(maybePaths) case other => operationNotAllowed(s"ADD with resource type '$other'", ctx) } case SqlBaseParser.LIST => ctx.identifier.getText.toLowerCase(Locale.ROOT) match { case "files" | "file" => - if (mayebePaths.length > 0) { - ListFilesCommand(mayebePaths.split("\\s+")) + if (maybePaths.length > 0) { + ListFilesCommand(maybePaths.split("\\s+")) } else { ListFilesCommand() } case "jars" | "jar" => - if (mayebePaths.length > 0) { - ListJarsCommand(mayebePaths.split("\\s+")) + if (maybePaths.length > 0) { + ListJarsCommand(maybePaths.split("\\s+")) } else { ListJarsCommand() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index b2963457e22db..c6ea99cfdad7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -670,7 +670,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) } ${ctx.registerComment( - s"""Codegend pipeline for stage (id=$codegenStageId) + s"""Codegened pipeline for stage (id=$codegenStageId) |${this.treeString.trim}""".stripMargin, "wsc_codegenPipeline")} ${ctx.registerComment(s"codegenStageId=$codegenStageId", "wsc_codegenStageId", true)} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala index 6ba375910a4eb..eecfa40e8d0bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala @@ -115,7 +115,7 @@ trait AdaptiveSparkPlanHelper { /** * Returns a sequence containing the subqueries in this plan, also including the (nested) - * subquries in its children + * subqueries in its children */ def subqueriesAll(p: SparkPlan): Seq[SparkPlan] = { val subqueries = flatMap(p)(_.subqueries) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 08d31fdda2dc8..d065bc0dab4cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources._ * @param storage storage format used to describe how the query result is stored. * @param provider the data source type to be used * @param query the logical plan representing data to write to - * @param overwrite whthere overwrites existing directory + * @param overwrite whether overwrites existing directory */ case class InsertIntoDataSourceDirCommand( storage: CatalogStorageFormat, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 69425cfed285f..6d631e044e917 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -89,8 +89,8 @@ case class CreateDatabaseCommand( * A command for users to remove a database from the system. * * 'ifExists': - * - true, if database_name does't exist, no action - * - false (default), if database_name does't exist, a warning message will be issued + * - true, if database_name doesn't exist, no action + * - false (default), if database_name doesn't exist, a warning message will be issued * 'cascade': * - true, the dependent objects are automatically dropped before dropping database. * - false (default), it is in the Restrict mode. The database cannot be dropped if diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 640051384e94c..431a103063c68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -352,7 +352,7 @@ case class LoadDataCommand( // entire string will be considered while making a Path instance,this is mainly done // by considering the wild card scenario in mind.as per old logic query param is // been considered while creating URI instance and if path contains wild card char '?' - // the remaining charecters after '?' will be removed while forming URI instance + // the remaining characters after '?' will be removed while forming URI instance LoadDataCommand.makeQualified(defaultFS, uriPath, loadPath) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 34ded5d456d09..4783789b91f3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -211,7 +211,7 @@ case class DataSource( s"Unable to infer schema for $format. It must be specified manually.") } - // We just print a waring message if the data schema and partition schema have the duplicate + // We just print a warning message if the data schema and partition schema have the duplicate // columns. This is because we allow users to do so in the previous Spark releases and // we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`). // See SPARK-18108 and SPARK-21144 for related discussions. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index edb49d3f90ca3..6de9b1d7cea4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -167,7 +167,7 @@ class DynamicPartitionDataWriter( private var fileCounter: Int = _ private var recordsInFile: Long = _ - private var currentPartionValues: Option[UnsafeRow] = None + private var currentPartitionValues: Option[UnsafeRow] = None private var currentBucketId: Option[Int] = None /** Extracts the partition values out of an input row. */ @@ -247,11 +247,11 @@ class DynamicPartitionDataWriter( val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None - if (currentPartionValues != nextPartitionValues || currentBucketId != nextBucketId) { + if (currentPartitionValues != nextPartitionValues || currentBucketId != nextBucketId) { // See a new partition or bucket - write to a new partition dir (or a new bucket file). - if (isPartitioned && currentPartionValues != nextPartitionValues) { - currentPartionValues = Some(nextPartitionValues.get.copy()) - statsTrackers.foreach(_.newPartition(currentPartionValues.get)) + if (isPartitioned && currentPartitionValues != nextPartitionValues) { + currentPartitionValues = Some(nextPartitionValues.get.copy()) + statsTrackers.foreach(_.newPartition(currentPartitionValues.get)) } if (isBucketed) { currentBucketId = nextBucketId @@ -259,7 +259,7 @@ class DynamicPartitionDataWriter( } fileCounter = 0 - newOutputWriter(currentPartionValues, currentBucketId) + newOutputWriter(currentPartitionValues, currentBucketId) } else if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) { // Exceeded the threshold in terms of the number of records per file. @@ -268,7 +268,7 @@ class DynamicPartitionDataWriter( assert(fileCounter < MAX_FILE_COUNTER, s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") - newOutputWriter(currentPartionValues, currentBucketId) + newOutputWriter(currentPartitionValues, currentBucketId) } val outputRow = getOutputRow(record) currentWriter.write(outputRow) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index a71aeb47872ce..48ebd6f0c610f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -164,7 +164,7 @@ object FileFormatWriter extends Logging { SQLExecution.checkSQLExecutionId(sparkSession) - // propagate the decription UUID into the jobs, so that committers + // propagate the description UUID into the jobs, so that committers // get an ID guaranteed to be unique. job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index ea437d200eaab..69123ee7af5b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -453,7 +453,7 @@ object PartitioningUtils { val decimalTry = Try { // `BigDecimal` conversion can fail when the `field` is not a form of number. val bigDecimal = new JBigDecimal(raw) - // It reduces the cases for decimals by disallowing values having scale (eg. `1.1`). + // It reduces the cases for decimals by disallowing values having scale (e.g. `1.1`). require(bigDecimal.scale <= 0) // `DecimalType` conversion can fail when // 1. The precision is bigger than 38. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 15f009372c173..37a4dcf081be4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -251,7 +251,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DropTable(r: ResolvedTable, ifExists, purge) => DropTableExec(r.catalog, r.identifier, ifExists, purge, invalidateCache(r)) :: Nil - case _: NoopDropTable => + case _: NoopCommand => LocalTableScanExec(Nil, Nil) :: Nil case AlterTable(catalog, ident, _, changes) => @@ -295,6 +295,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case r @ ShowTables(ResolvedNamespace(catalog, ns), pattern) => ShowTablesExec(r.output, catalog.asTableCatalog, ns, pattern) :: Nil + case _: ShowTableExtended => + throw new AnalysisException("SHOW TABLE EXTENDED is not supported for v2 tables.") + case SetCatalogAndNamespace(catalogManager, catalogName, ns) => SetCatalogAndNamespaceExec(catalogManager, catalogName, ns) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 47aad2bcb2c56..f5f77d38b8716 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -168,7 +168,7 @@ case class ReplaceTableAsSelectExec( * A new table will be created using the schema of the query, and rows from the query are appended. * If the table exists, its contents and schema should be replaced with the schema and the contents * of the query. This implementation is atomic. The table replacement is staged, and the commit - * operation at the end should perform tne replacement of the table's metadata and contents. If the + * operation at the end should perform the replacement of the table's metadata and contents. If the * write fails, the table is instructed to roll back staged changes and any previously written table * is left untouched. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 3c5ed40551206..a91cc0782e1f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -426,9 +426,9 @@ private[joins] class UnsafeHashedRelation( readBuffer(valuesBuffer, 0, valuesSize) val loc = binaryMap.lookup(keyBuffer, Platform.BYTE_ARRAY_OFFSET, keySize) - val putSuceeded = loc.append(keyBuffer, Platform.BYTE_ARRAY_OFFSET, keySize, + val putSucceeded = loc.append(keyBuffer, Platform.BYTE_ARRAY_OFFSET, keySize, valuesBuffer, Platform.BYTE_ARRAY_OFFSET, valuesSize) - if (!putSuceeded) { + if (!putSucceeded) { binaryMap.free() throw new IOException("Could not allocate memory to grow BytesToBytesMap") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index dab2723d25726..b79bcd176b7b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -102,7 +102,7 @@ object ExtractGroupingPythonUDFFromAggregate extends Rule[LogicalPlan] { case p: PythonUDF => // This is just a sanity check, the rule PullOutNondeterministic should // already pull out those nondeterministic expressions. - assert(p.udfDeterministic, "Non-determinstic PythonUDFs should not appear " + + assert(p.udfDeterministic, "Non-deterministic PythonUDFs should not appear " + "in grouping expression") val canonicalized = p.canonicalized.asInstanceOf[PythonUDF] if (attributeMap.contains(canonicalized)) { @@ -174,7 +174,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { } private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = { - // If fisrt UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF, + // If first UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF, // otherwise check if subsequent UDFs are of the same type as the first UDF. (since we can only // extract UDFs of the same eval type) @@ -268,7 +268,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { case PythonEvalType.SQL_SCALAR_PANDAS_UDF | PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF => ArrowEvalPython(validUdfs, resultAttrs, child, evalType) case _ => - throw new AnalysisException("Unexcepted UDF evalType") + throw new AnalysisException("Unexpected UDF evalType") } attributeMap ++= validUdfs.map(canonicalizeDeterministic).zip(resultAttrs) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 3c76306f20cd7..835c7c4d5261f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -288,7 +288,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( /** * Delete expired log entries that proceed the currentBatchId and retain - * sufficient minimum number of batches (given by minBatchsToRetain). This + * sufficient minimum number of batches (given by minBatchesToRetain). This * equates to retaining the earliest compaction log that proceeds * batch id position currentBatchId + 1 - minBatchesToRetain. All log entries * prior to the earliest compaction log proceeding that position will be removed. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d6be33c76e937..6b0d33b819a20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -685,6 +685,6 @@ object StreamExecution { /** * A special thread to run the stream query. Some codes require to run in the QueryExecutionThread - * and will use `classOf[QueryxecutionThread]` to check. + * and will use `classOf[QueryExecutionThread]` to check. */ abstract class QueryExecutionThread(name: String) extends UninterruptibleThread(name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper.scala index 0a16a3819b778..cc785ee4247c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper.scala @@ -77,7 +77,7 @@ object FlatMapGroupsWithStateExecHelper { // =========================== Private implementations of StateManager =========================== // =============================================================================================== - /** Commmon methods for StateManager implementations */ + /** Common methods for StateManager implementations */ private abstract class StateManagerImplBase(shouldStoreTimestamp: Boolean) extends StateManager { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index 64b7e7fe7923a..cfcfeabbf1f6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -65,7 +65,7 @@ object HiveSerDe { outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"), serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))) - // `HiveSerDe` in `serdeMap` should be dintinct. + // `HiveSerDe` in `serdeMap` should be distinct. val serdeInverseMap: Map[HiveSerDe, String] = serdeMap.flatMap { case ("sequencefile", _) => None case ("rcfile", _) => None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 01e626e5436a4..9e8dff37bcfd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -387,8 +387,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } val sink = new MemorySink() val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink, df.schema.toAttributes)) - val recoverFromChkpoint = outputMode == OutputMode.Complete() - val query = startQuery(sink, extraOptions, recoverFromCheckpoint = recoverFromChkpoint) + val recoverFromCheckpoint = outputMode == OutputMode.Complete() + val query = startQuery(sink, extraOptions, recoverFromCheckpoint = recoverFromCheckpoint) resultDf.createOrReplaceTempView(query.name) query } else if (source == SOURCE_NAME_FOREACH) { diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java index dd3755d3f904e..de88f80eb53b8 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java @@ -34,43 +34,43 @@ public class Java8DatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase @Test public void testTypedAggregationAverage() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg( + Dataset> aggregated = grouped.agg( org.apache.spark.sql.expressions.javalang.typed.avg(v -> (double)(v._2() * 2))); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 6.0)), - agged.collectAsList()); + aggregated.collectAsList()); } @SuppressWarnings("deprecation") @Test public void testTypedAggregationCount() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg( + Dataset> aggregated = grouped.agg( org.apache.spark.sql.expressions.javalang.typed.count(v -> v)); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 2L), new Tuple2<>("b", 1L)), - agged.collectAsList()); + aggregated.collectAsList()); } @SuppressWarnings("deprecation") @Test public void testTypedAggregationSumDouble() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg( + Dataset> aggregated = grouped.agg( org.apache.spark.sql.expressions.javalang.typed.sum(v -> (double)v._2())); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 3.0)), - agged.collectAsList()); + aggregated.collectAsList()); } @SuppressWarnings("deprecation") @Test public void testTypedAggregationSumLong() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg( + Dataset> aggregated = grouped.agg( org.apache.spark.sql.expressions.javalang.typed.sumLong(v -> (long)v._2())); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 3L), new Tuple2<>("b", 3L)), - agged.collectAsList()); + aggregated.collectAsList()); } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java index 8a90624f2070b..979b7751fa9a8 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java @@ -38,18 +38,18 @@ public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { public void testTypedAggregationAnonClass() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg(new IntSumOf().toColumn()); + Dataset> aggregated = grouped.agg(new IntSumOf().toColumn()); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 3), new Tuple2<>("b", 3)), - agged.collectAsList()); + aggregated.collectAsList()); - Dataset> agged2 = grouped.agg(new IntSumOf().toColumn()) + Dataset> aggregated2 = grouped.agg(new IntSumOf().toColumn()) .as(Encoders.tuple(Encoders.STRING(), Encoders.INT())); Assert.assertEquals( Arrays.asList( new Tuple2<>("a", 3), new Tuple2<>("b", 3)), - agged2.collectAsList()); + aggregated2.collectAsList()); } static class IntSumOf extends Aggregator, Integer, Integer> { @@ -88,43 +88,43 @@ public Encoder outputEncoder() { @Test public void testTypedAggregationAverage() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg( + Dataset> aggregated = grouped.agg( org.apache.spark.sql.expressions.javalang.typed.avg(value -> value._2() * 2.0)); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 6.0)), - agged.collectAsList()); + aggregated.collectAsList()); } @SuppressWarnings("deprecation") @Test public void testTypedAggregationCount() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg( + Dataset> aggregated = grouped.agg( org.apache.spark.sql.expressions.javalang.typed.count(value -> value)); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 2L), new Tuple2<>("b", 1L)), - agged.collectAsList()); + aggregated.collectAsList()); } @SuppressWarnings("deprecation") @Test public void testTypedAggregationSumDouble() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg( + Dataset> aggregated = grouped.agg( org.apache.spark.sql.expressions.javalang.typed.sum(value -> (double) value._2())); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 3.0)), - agged.collectAsList()); + aggregated.collectAsList()); } @SuppressWarnings("deprecation") @Test public void testTypedAggregationSumLong() { KeyValueGroupedDataset> grouped = generateGroupedDataset(); - Dataset> agged = grouped.agg( + Dataset> aggregated = grouped.agg( org.apache.spark.sql.expressions.javalang.typed.sumLong(value -> (long) value._2())); Assert.assertEquals( Arrays.asList(new Tuple2<>("a", 3L), new Tuple2<>("b", 3L)), - agged.collectAsList()); + aggregated.collectAsList()); } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/ansi/decimalArithmeticOperations.sql b/sql/core/src/test/resources/sql-tests/inputs/ansi/decimalArithmeticOperations.sql index d190f38345d6b..d843847e6a149 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/ansi/decimalArithmeticOperations.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/ansi/decimalArithmeticOperations.sql @@ -1,6 +1,6 @@ -- SPARK-23179: SQL ANSI 2011 states that in case of overflow during arithmetic operations, -- an exception should be thrown instead of returning NULL. --- This is what most of the SQL DBs do (eg. SQLServer, DB2). +-- This is what most of the SQL DBs do (e.g. SQLServer, DB2). -- tests for decimals handling in operations create table decimals_test(id int, a decimal(38,18), b decimal(38,18)) using parquet; diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/create_view.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/create_view.sql index 21ffd85f7d01f..2889941c1fcc1 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/create_view.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/create_view.sql @@ -636,7 +636,7 @@ DESC TABLE vv6; -- Check cases involving dropped/altered columns in a function's rowtype result -- --- Skip the tests below because Spark does't support PostgreSQL-specific UDFs/transactions +-- Skip the tests below because Spark doesn't support PostgreSQL-specific UDFs/transactions -- create table tt14t (f1 text, f2 text, f3 text, f4 text); -- insert into tt14t values('foo', 'bar', 'baz', '42'); -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index d0150616cd67e..3765093f83bc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -835,7 +835,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } } - test("SPARK-19993 nested subquery caching and scalar + predicate subqueris") { + test("SPARK-19993 nested subquery caching and scalar + predicate subqueries") { withTempView("t1", "t2", "t3", "t4") { Seq(1).toDF("c1").createOrReplaceTempView("t1") Seq(2).toDF("c1").createOrReplaceTempView("t2") @@ -886,17 +886,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } private def checkIfNoJobTriggered[T](f: => T): T = { - var numJobTrigered = 0 + var numJobTriggered = 0 val jobListener = new SparkListener { override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - numJobTrigered += 1 + numJobTriggered += 1 } } sparkContext.addSparkListener(jobListener) try { val result = f sparkContext.listenerBus.waitUntilEmpty() - assert(numJobTrigered === 0) + assert(numJobTriggered === 0) result } finally { sparkContext.removeSparkListener(jobListener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index a45bf12e8f841..4fecd625031ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -805,7 +805,7 @@ class DataFrameSuite extends QueryTest assert(df2.drop("`a.b`").columns.size == 2) } - test("drop(name: String) search and drop all top level columns that matchs the name") { + test("drop(name: String) search and drop all top level columns that matches the name") { val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((3, 4)).toDF("a", "b") checkAnswer(df1.crossJoin(df2), Row(1, 2, 3, 4)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index 5c144dad23c30..009ccb9a45354 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -102,18 +102,19 @@ class DatasetCacheSuite extends QueryTest test("persist and then groupBy columns asKey, map") { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupByKey(_._1) - val agged = grouped.mapGroups { (g, iter) => (g, iter.map(_._2).sum) } - agged.persist() + val aggregated = grouped.mapGroups { (g, iter) => (g, iter.map(_._2).sum) } + aggregated.persist() checkDataset( - agged.filter(_._1 == "b"), + aggregated.filter(_._1 == "b"), ("b", 3)) - assertCached(agged.filter(_._1 == "b")) + assertCached(aggregated.filter(_._1 == "b")) ds.unpersist(blocking = true) assert(ds.storageLevel == StorageLevel.NONE, "The Dataset ds should not be cached.") - agged.unpersist(blocking = true) - assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should not be cached.") + aggregated.unpersist(blocking = true) + assert(aggregated.storageLevel == StorageLevel.NONE, + "The Dataset aggregated should not be cached.") } test("persist and then withColumn") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index ac51634febc99..8547d96e0f457 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -170,23 +170,23 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSparkSession { test("groupBy function, map") { val ds = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11).toDS() val grouped = ds.groupByKey(_ % 2) - val agged = grouped.mapGroups { (g, iter) => + val aggregated = grouped.mapGroups { (g, iter) => val name = if (g == 0) "even" else "odd" (name, iter.size) } checkDatasetUnorderly( - agged, + aggregated, ("even", 5), ("odd", 6)) } test("groupBy function, flatMap") { val ds = Seq("a", "b", "c", "xyz", "hello").toDS() val grouped = ds.groupByKey(_.length) - val agged = grouped.flatMapGroups { (g, iter) => Iterator(g.toString, iter.mkString) } + val aggregated = grouped.flatMapGroups { (g, iter) => Iterator(g.toString, iter.mkString) } checkDatasetUnorderly( - agged, + aggregated, "1", "abc", "3", "xyz", "5", "hello") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 953a58760cd5c..67e3ad6a80642 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -528,42 +528,42 @@ class DatasetSuite extends QueryTest test("groupBy function, map") { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupByKey(v => (v._1, "word")) - val agged = grouped.mapGroups { (g, iter) => (g._1, iter.map(_._2).sum) } + val aggregated = grouped.mapGroups { (g, iter) => (g._1, iter.map(_._2).sum) } checkDatasetUnorderly( - agged, + aggregated, ("a", 30), ("b", 3), ("c", 1)) } test("groupBy function, flatMap") { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupByKey(v => (v._1, "word")) - val agged = grouped.flatMapGroups { (g, iter) => + val aggregated = grouped.flatMapGroups { (g, iter) => Iterator(g._1, iter.map(_._2).sum.toString) } checkDatasetUnorderly( - agged, + aggregated, "a", "30", "b", "3", "c", "1") } test("groupBy function, mapValues, flatMap") { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val keyValue = ds.groupByKey(_._1).mapValues(_._2) - val agged = keyValue.mapGroups { (g, iter) => (g, iter.sum) } - checkDataset(agged, ("a", 30), ("b", 3), ("c", 1)) + val aggregated = keyValue.mapGroups { (g, iter) => (g, iter.sum) } + checkDataset(aggregated, ("a", 30), ("b", 3), ("c", 1)) val keyValue1 = ds.groupByKey(t => (t._1, "key")).mapValues(t => (t._2, "value")) - val agged1 = keyValue1.mapGroups { (g, iter) => (g._1, iter.map(_._1).sum) } - checkDataset(agged1, ("a", 30), ("b", 3), ("c", 1)) + val aggregated1 = keyValue1.mapGroups { (g, iter) => (g._1, iter.map(_._1).sum) } + checkDataset(aggregated1, ("a", 30), ("b", 3), ("c", 1)) } test("groupBy function, reduce") { val ds = Seq("abc", "xyz", "hello").toDS() - val agged = ds.groupByKey(_.length).reduceGroups(_ + _) + val aggregated = ds.groupByKey(_.length).reduceGroups(_ + _) checkDatasetUnorderly( - agged, + aggregated, 3 -> "abcxyz", 5 -> "hello") } @@ -914,11 +914,11 @@ class DatasetSuite extends QueryTest test("grouping key and grouped value has field with same name") { val ds = Seq(ClassData("a", 1), ClassData("a", 2)).toDS() - val agged = ds.groupByKey(d => ClassNullableData(d.a, null)).mapGroups { + val aggregated = ds.groupByKey(d => ClassNullableData(d.a, null)).mapGroups { (key, values) => key.a + values.map(_.b).sum } - checkDataset(agged, "a3") + checkDataset(aggregated, "a3") } test("cogroup's left and right side has field with same name") { @@ -1286,7 +1286,7 @@ class DatasetSuite extends QueryTest Route("b", "c", 6)) val ds = sparkContext.parallelize(data).toDF.as[Route] - val grped = ds.map(r => GroupedRoutes(r.src, r.dest, Seq(r))) + val grouped = ds.map(r => GroupedRoutes(r.src, r.dest, Seq(r))) .groupByKey(r => (r.src, r.dest)) .reduceGroups { (g1: GroupedRoutes, g2: GroupedRoutes) => GroupedRoutes(g1.src, g1.dest, g1.routes ++ g2.routes) @@ -1303,7 +1303,7 @@ class DatasetSuite extends QueryTest implicit def ordering[GroupedRoutes]: Ordering[GroupedRoutes] = (x: GroupedRoutes, y: GroupedRoutes) => x.toString.compareTo(y.toString) - checkDatasetUnorderly(grped, expected: _*) + checkDatasetUnorderly(grouped, expected: _*) } test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") { @@ -1383,7 +1383,7 @@ class DatasetSuite extends QueryTest } } } else { - // Local checkpoints dont require checkpoint_dir + // Local checkpoints don't require checkpoint_dir f } } @@ -1474,7 +1474,7 @@ class DatasetSuite extends QueryTest } test("SPARK-18717: code generation works for both scala.collection.Map" + - " and scala.collection.imutable.Map") { + " and scala.collection.immutable.Map") { val ds = Seq(WithImmutableMap("hi", Map(42L -> "foo"))).toDS checkDataset(ds.map(t => t), WithImmutableMap("hi", Map(42L -> "foo"))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 9caa4c0377009..d7bbf597ff983 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -454,7 +454,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { assert(e.getCause.isInstanceOf[IllegalArgumentException]) assert(e.getMessage.contains("You may get a different result due to the upgrading of Spark")) - // february + // February val x1 = "2016-02-29" val x2 = "2017-02-29" val df1 = Seq(x1, x2).toDF("x") @@ -629,7 +629,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { e.getMessage.contains("You may get a different result due to the upgrading of Spark")) } - // february + // February val y1 = "2016-02-29" val y2 = "2017-02-29" val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") @@ -680,7 +680,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) - // february + // February val y1 = "2016-02-29" val y2 = "2017-02-29" val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 727482e551a8b..ebfe8bdd7a749 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1316,7 +1316,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark ) } - test("oder by asc by default when not specify ascending and descending") { + test("order by asc by default when not specify ascending and descending") { checkAnswer( sql("SELECT a, b FROM testData2 ORDER BY a desc, b"), Seq(Row(3, 1), Row(3, 2), Row(2, 1), Row(2, 2), Row(1, 1), Row(1, 2)) @@ -2812,7 +2812,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } - test("SRARK-22266: the same aggregate function was calculated multiple times") { + test("SPARK-22266: the same aggregate function was calculated multiple times") { val query = "SELECT a, max(b+1), max(b+1) + 1 FROM testData2 GROUP BY a" val df = sql(query) val physical = df.queryExecution.sparkPlan @@ -3092,7 +3092,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark assert(scan.isInstanceOf[ParquetScan]) assert(scan.asInstanceOf[ParquetScan].pushedFilters === filters) case _ => - fail(s"unknow format $format") + fail(s"unknown format $format") } } @@ -3718,6 +3718,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } } + + test("SPARK-33677: LikeSimplification should be skipped if pattern contains any escapeChar") { + withTempView("df") { + Seq("m@ca").toDF("s").createOrReplaceTempView("df") + + val e = intercept[AnalysisException] { + sql("SELECT s LIKE 'm%@ca' ESCAPE '%' FROM df").collect() + } + assert(e.message.contains("the pattern 'm%@ca' is invalid, " + + "the escape character is not allowed to precede '@'")) + + checkAnswer(sql("SELECT s LIKE 'm@@ca' ESCAPE '@' FROM df"), Row(true)) + } + } } case class Foo(bar: Option[String]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 36e55c0994f18..02c6fba9725d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -278,18 +278,18 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper val allCode = importedCode ++ code val tempQueries = if (allCode.exists(_.trim.startsWith("--QUERY-DELIMITER"))) { // Although the loop is heavy, only used for bracketed comments test. - val querys = new ArrayBuffer[String] + val queries = new ArrayBuffer[String] val otherCodes = new ArrayBuffer[String] var tempStr = "" var start = false for (c <- allCode) { if (c.trim.startsWith("--QUERY-DELIMITER-START")) { start = true - querys ++= splitWithSemicolon(otherCodes.toSeq) + queries ++= splitWithSemicolon(otherCodes.toSeq) otherCodes.clear() } else if (c.trim.startsWith("--QUERY-DELIMITER-END")) { start = false - querys += s"\n${tempStr.stripSuffix(";")}" + queries += s"\n${tempStr.stripSuffix(";")}" tempStr = "" } else if (start) { tempStr += s"\n$c" @@ -298,9 +298,9 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper } } if (otherCodes.nonEmpty) { - querys ++= splitWithSemicolon(otherCodes.toSeq) + queries ++= splitWithSemicolon(otherCodes.toSeq) } - querys.toSeq + queries.toSeq } else { splitWithSemicolon(allCode).toSeq } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 576ad26505d27..5e1c6ba92803d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -166,13 +166,13 @@ class SparkSessionExtensionSuite extends SparkFunSuite { // inject rule that will run during AQE query stage optimization and will verify that the // custom tags were written in the preparation phase extensions.injectColumnar(session => - MyColumarRule(MyNewQueryStageRule(), MyNewQueryStageRule())) + MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule())) } withSession(extensions) { session => session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true) assert(session.sessionState.queryStagePrepRules.contains(MyQueryStagePrepRule())) assert(session.sessionState.columnarRules.contains( - MyColumarRule(MyNewQueryStageRule(), MyNewQueryStageRule()))) + MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule()))) import session.sqlContext.implicits._ val data = Seq((100L), (200L), (300L)).toDF("vals").repartition(1) val df = data.selectExpr("vals + 1") @@ -205,12 +205,12 @@ class SparkSessionExtensionSuite extends SparkFunSuite { val extensions = create { extensions => extensions.injectColumnar(session => - MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) + MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } withSession(extensions) { session => session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE) assert(session.sessionState.columnarRules.contains( - MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) + MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) import session.sqlContext.implicits._ // perform a join to inject a broadcast exchange val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2") @@ -244,12 +244,12 @@ class SparkSessionExtensionSuite extends SparkFunSuite { .config(COLUMN_BATCH_SIZE.key, 2) .withExtensions { extensions => extensions.injectColumnar(session => - MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } + MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } .getOrCreate() try { assert(session.sessionState.columnarRules.contains( - MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) + MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) import session.sqlContext.implicits._ val input = Seq((100L), (200L), (300L)) @@ -277,7 +277,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { assert(session.sessionState.functionRegistry .lookupFunction(MyExtensions.myFunction._1).isDefined) assert(session.sessionState.columnarRules.contains( - MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) + MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) } finally { stop(session) } @@ -824,7 +824,7 @@ case class MyPostRule() extends Rule[SparkPlan] { } } -case class MyColumarRule(pre: Rule[SparkPlan], post: Rule[SparkPlan]) extends ColumnarRule { +case class MyColumnarRule(pre: Rule[SparkPlan], post: Rule[SparkPlan]) extends ColumnarRule { override def preColumnarTransitions: Rule[SparkPlan] = pre override def postColumnarTransitions: Rule[SparkPlan] = post } @@ -838,7 +838,7 @@ class MyExtensions extends (SparkSessionExtensions => Unit) { e.injectOptimizerRule(MyRule) e.injectParser(MyParser) e.injectFunction(MyExtensions.myFunction) - e.injectColumnar(session => MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) + e.injectColumnar(session => MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSTableStats.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSTableStats.scala index f39b4b8b56c2e..ee9cf7b67225f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSTableStats.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSTableStats.scala @@ -376,7 +376,7 @@ object TPCDSTableStats { "s_closed_date_sk" -> CatalogColumnStat(Some(70L), Some("2450823"), Some("2451313"), Some(296), Some(4), Some(4), None, CatalogColumnStat.VERSION), "s_store_id" -> CatalogColumnStat(Some(210L), None, None, Some(0), Some(16), Some(16), None, CatalogColumnStat.VERSION), "s_geography_class" -> CatalogColumnStat(Some(1L), None, None, Some(3), Some(7), Some(7), None, CatalogColumnStat.VERSION), - "s_tax_precentage" -> CatalogColumnStat(Some(12L), Some("0.00"), Some("0.11"), Some(5), Some(8), Some(8), None, CatalogColumnStat.VERSION) + "s_tax_percentage" -> CatalogColumnStat(Some(12L), Some("0.00"), Some("0.11"), Some(5), Some(8), Some(8), None, CatalogColumnStat.VERSION) )), "store_returns" -> CatalogStatistics(4837573440L, Some(28795080L), Map( "sr_item_sk" -> CatalogColumnStat(Some(197284L), Some("1"), Some("204000"), Some(0), Some(8), Some(8), None, CatalogColumnStat.VERSION), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index c5782796b6ab8..9020065449cef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -285,7 +285,7 @@ class DataSourceV2SQLSuite } } - test("CreateTable/RepalceTable: invalid schema if has interval type") { + test("CreateTable/ReplaceTable: invalid schema if has interval type") { Seq("CREATE", "REPLACE").foreach { action => val e1 = intercept[AnalysisException]( sql(s"$action TABLE table_name (id int, value interval) USING $v2Format")) @@ -1360,9 +1360,9 @@ class DataSourceV2SQLSuite test("ShowNamespaces: default v2 catalog doesn't support namespace") { spark.conf.set( - "spark.sql.catalog.testcat_no_namspace", + "spark.sql.catalog.testcat_no_namespace", classOf[BasicInMemoryTableCatalog].getName) - spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat_no_namspace") + spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat_no_namespace") val exception = intercept[AnalysisException] { sql("SHOW NAMESPACES") @@ -1373,11 +1373,11 @@ class DataSourceV2SQLSuite test("ShowNamespaces: v2 catalog doesn't support namespace") { spark.conf.set( - "spark.sql.catalog.testcat_no_namspace", + "spark.sql.catalog.testcat_no_namespace", classOf[BasicInMemoryTableCatalog].getName) val exception = intercept[AnalysisException] { - sql("SHOW NAMESPACES in testcat_no_namspace") + sql("SHOW NAMESPACES in testcat_no_namespace") } assert(exception.getMessage.contains("does not support namespaces")) @@ -2268,7 +2268,7 @@ class DataSourceV2SQLSuite val e = intercept[AnalysisException] { // Since the following multi-part name starts with `globalTempDB`, it is resolved to - // the session catalog, not the `gloabl_temp` v2 catalog. + // the session catalog, not the `global_temp` v2 catalog. sql(s"CREATE TABLE $globalTempDB.ns1.ns2.tbl (id bigint, data string) USING json") } assert(e.message.contains( @@ -2594,6 +2594,13 @@ class DataSourceV2SQLSuite } } + test("DROP VIEW is not supported for v2 catalogs") { + assertAnalysisError( + "DROP VIEW testcat.v", + "Cannot specify catalog `testcat` for view v because view support in v2 catalog " + + "has not been implemented yet. DROP VIEW expects a view.") + } + private def testNotSupportedV2Command( sqlCommand: String, sqlParams: String, @@ -2605,13 +2612,6 @@ class DataSourceV2SQLSuite assert(e.message.contains(s"$cmdStr is not supported for v2 tables")) } - private def testV1CommandSupportingTempView(sqlCommand: String, sqlParams: String): Unit = { - val e = intercept[AnalysisException] { - sql(s"$sqlCommand $sqlParams") - } - assert(e.message.contains(s"$sqlCommand is only supported with temp views or v1 tables")) - } - private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = { val errMsg = intercept[AnalysisException] { sql(sqlStatement) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index 67ec1028f1998..eec396b2e3998 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -372,7 +372,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { spark.catalog.dropTempView("nums") } - test("window function: mutiple window expressions specified by range in a single expression") { + test("window function: multiple window expressions specified by range in a single expression") { val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") nums.createOrReplaceTempView("nums") withTempView("nums") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 1a826c00c81f2..81ba09f206b92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -184,7 +184,7 @@ class SparkSqlParserSuite extends AnalysisTest { intercept("REFRESH", "Resource paths cannot be empty in REFRESH statements") } - test("SPARK-33118 CREATE TMEPORARY TABLE with LOCATION") { + test("SPARK-33118 CREATE TEMPORARY TABLE with LOCATION") { assertEqual("CREATE TEMPORARY TABLE t USING parquet OPTIONS (path '/data/tmp/testspark1')", CreateTempViewUsing(TableIdentifier("t", None), None, false, false, "parquet", Map("path" -> "/data/tmp/testspark1"))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index fe40d7dce344d..eb5643df4c752 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -398,8 +398,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession // Case2: The parent of a LocalTableScanExec supports WholeStageCodegen. // In this case, the LocalTableScanExec should be within a WholeStageCodegen domain // and no more InputAdapter is inserted as the direct parent of the LocalTableScanExec. - val aggedDF = Seq(1, 2, 3).toDF.groupBy("value").sum() - val executedPlan = aggedDF.queryExecution.executedPlan + val aggregatedDF = Seq(1, 2, 3).toDF.groupBy("value").sum() + val executedPlan = aggregatedDF.queryExecution.executedPlan // HashAggregateExec supports WholeStageCodegen and it's the parent of // LocalTableScanExec so LocalTableScanExec should be within a WholeStageCodegen domain. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 45ba2202d83d3..69f1565c2f8de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -755,9 +755,9 @@ class AdaptiveQueryExecSuite Utils.deleteRecursively(tableDir) df1.write.parquet(tableDir.getAbsolutePath) - val agged = spark.table("bucketed_table").groupBy("i").count() + val aggregated = spark.table("bucketed_table").groupBy("i").count() val error = intercept[Exception] { - agged.count() + aggregated.count() } assert(error.getCause().toString contains "Invalid bucket file") assert(error.getSuppressed.size === 0) @@ -962,9 +962,9 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.UI_EXPLAIN_MODE.key -> mode, SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { - val dfApdaptive = sql("SELECT * FROM testData JOIN testData2 ON key = a WHERE value = '1'") + val dfAdaptive = sql("SELECT * FROM testData JOIN testData2 ON key = a WHERE value = '1'") try { - checkAnswer(dfApdaptive, Row(1, "1", 1, 1) :: Row(1, "1", 1, 2) :: Nil) + checkAnswer(dfAdaptive, Row(1, "1", 1, 1) :: Row(1, "1", 1, 2) :: Nil) spark.sparkContext.listenerBus.waitUntilEmpty() assert(checkDone) } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 1e6e59456c887..d861bbbf67b1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -1210,7 +1210,7 @@ class ArrowConvertersSuite extends SharedSparkSession { testQuietly("interval is unsupported for arrow") { val e = intercept[SparkException] { - calenderIntervalData.toDF().toArrowBatchRdd.collect() + calendarIntervalData.toDF().toArrowBatchRdd.collect() } assert(e.getCause.isInstanceOf[UnsupportedOperationException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 4f79e71419a10..82d3e2dfe2212 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -549,9 +549,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { import testImplicits._ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") - // Case 1: with partitioning columns but no schema: Option("inexistentColumns") + // Case 1: with partitioning columns but no schema: Option("nonexistentColumns") // Case 2: without schema and partitioning columns: None - Seq(Option("inexistentColumns"), None).foreach { partitionCols => + Seq(Option("nonexistentColumns"), None).foreach { partitionCols => withTempPath { pathToPartitionedTable => df.write.format("parquet").partitionBy("num") .save(pathToPartitionedTable.getCanonicalPath) @@ -589,9 +589,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { import testImplicits._ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") - // Case 1: with partitioning columns but no schema: Option("inexistentColumns") + // Case 1: with partitioning columns but no schema: Option("nonexistentColumns") // Case 2: without schema and partitioning columns: None - Seq(Option("inexistentColumns"), None).foreach { partitionCols => + Seq(Option("nonexistentColumns"), None).foreach { partitionCols => withTempPath { pathToNonPartitionedTable => df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) checkSchemaInCreatedDataSourceTable( @@ -608,7 +608,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { import testImplicits._ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") - // Case 1: with partitioning columns but no schema: Option("inexistentColumns") + // Case 1: with partitioning columns but no schema: Option("nonexistentColumns") // Case 2: without schema and partitioning columns: None Seq(Option("num"), None).foreach { partitionCols => withTempPath { pathToNonPartitionedTable => @@ -1363,12 +1363,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { createDatabase(catalog, "dbx") createTable(catalog, tableIdent) assert(catalog.listTables("dbx") == Seq(tableIdent)) - val e = intercept[AnalysisException] { sql("DROP VIEW dbx.tab1") } - assert( - e.getMessage.contains("Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")) + assert(e.getMessage.contains( + "dbx.tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead.")) } protected def testSetProperties(isDatasourceTable: Boolean): Unit = { @@ -1911,7 +1910,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { |OPTIONS ( | path '${tempDir.getCanonicalPath}' |) - |CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS + |CLUSTERED BY (nonexistentColumnA) SORTED BY (nonexistentColumnB) INTO 2 BUCKETS """.stripMargin) } assert(e.message == "Cannot specify bucketing information if the table schema is not " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 38719311f1aef..758540f1a42f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -78,6 +78,14 @@ class PlanResolutionSuite extends AnalysisTest { V1Table(t) } + private val view: V1Table = { + val t = mock(classOf[CatalogTable]) + when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string")) + when(t.tableType).thenReturn(CatalogTableType.VIEW) + when(t.provider).thenReturn(Some(v1Format)) + V1Table(t) + } + private val testCat: TableCatalog = { val newCatalog = mock(classOf[TableCatalog]) when(newCatalog.loadTable(any())).thenAnswer((invocation: InvocationOnMock) => { @@ -101,6 +109,7 @@ class PlanResolutionSuite extends AnalysisTest { case "v2Table" => table case "v2Table1" => table case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability + case "view" => view case name => throw new NoSuchTableException(name) } }) @@ -148,7 +157,10 @@ class PlanResolutionSuite extends AnalysisTest { manager } - def parseAndResolve(query: String, withDefault: Boolean = false): LogicalPlan = { + def parseAndResolve( + query: String, + withDefault: Boolean = false, + checkAnalysis: Boolean = false): LogicalPlan = { val catalogManager = if (withDefault) { catalogManagerWithDefault } else { @@ -158,8 +170,13 @@ class PlanResolutionSuite extends AnalysisTest { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Seq( new ResolveSessionCatalog(catalogManager, _ == Seq("v"), _ => false)) } - // We don't check analysis here, as we expect the plan to be unresolved such as `CreateTable`. - analyzer.execute(CatalystSqlParser.parsePlan(query)) + // We don't check analysis here by default, as we expect the plan to be unresolved + // such as `CreateTable`. + val analyzed = analyzer.execute(CatalystSqlParser.parsePlan(query)) + if (checkAnalysis) { + analyzer.checkAnalysis(analyzed) + } + analyzed } private def parseResolveCompare(query: String, expected: LogicalPlan): Unit = @@ -677,6 +694,8 @@ class PlanResolutionSuite extends AnalysisTest { val viewIdent1 = TableIdentifier("view", Option("db")) val viewName2 = "view" val viewIdent2 = TableIdentifier("view", Option("default")) + val tempViewName = "v" + val tempViewIdent = TableIdentifier("v") parseResolveCompare(s"DROP VIEW $viewName1", DropTableCommand(viewIdent1, ifExists = false, isView = true, purge = false)) @@ -686,11 +705,15 @@ class PlanResolutionSuite extends AnalysisTest { DropTableCommand(viewIdent2, ifExists = false, isView = true, purge = false)) parseResolveCompare(s"DROP VIEW IF EXISTS $viewName2", DropTableCommand(viewIdent2, ifExists = true, isView = true, purge = false)) + parseResolveCompare(s"DROP VIEW $tempViewName", + DropTableCommand(tempViewIdent, ifExists = false, isView = true, purge = false)) + parseResolveCompare(s"DROP VIEW IF EXISTS $tempViewName", + DropTableCommand(tempViewIdent, ifExists = true, isView = true, purge = false)) } test("drop view in v2 catalog") { intercept[AnalysisException] { - parseAndResolve("DROP VIEW testcat.db.view") + parseAndResolve("DROP VIEW testcat.db.view", checkAnalysis = true) }.getMessage.toLowerCase(Locale.ROOT).contains( "view support in catalog has not been implemented") } @@ -1164,26 +1187,26 @@ class PlanResolutionSuite extends AnalysisTest { ) } - DSV2ResolutionTests.foreach { case (sql, isSessionCatlog) => + DSV2ResolutionTests.foreach { case (sql, isSessionCatalog) => test(s"Data source V2 relation resolution '$sql'") { val parsed = parseAndResolve(sql, withDefault = true) - val catlogIdent = if (isSessionCatlog) v2SessionCatalog else testCat - val tableIdent = if (isSessionCatlog) "v2Table" else "tab" + val catalogIdent = if (isSessionCatalog) v2SessionCatalog else testCat + val tableIdent = if (isSessionCatalog) "v2Table" else "tab" parsed match { case AlterTable(_, _, r: DataSourceV2Relation, _) => - assert(r.catalog.exists(_ == catlogIdent)) + assert(r.catalog.exists(_ == catalogIdent)) assert(r.identifier.exists(_.name() == tableIdent)) case Project(_, AsDataSourceV2Relation(r)) => - assert(r.catalog.exists(_ == catlogIdent)) + assert(r.catalog.exists(_ == catalogIdent)) assert(r.identifier.exists(_.name() == tableIdent)) case AppendData(r: DataSourceV2Relation, _, _, _) => - assert(r.catalog.exists(_ == catlogIdent)) + assert(r.catalog.exists(_ == catalogIdent)) assert(r.identifier.exists(_.name() == tableIdent)) case DescribeRelation(r: ResolvedTable, _, _) => - assert(r.catalog == catlogIdent) + assert(r.catalog == catalogIdent) assert(r.identifier.name() == tableIdent) case ShowTableProperties(r: ResolvedTable, _) => - assert(r.catalog == catlogIdent) + assert(r.catalog == catalogIdent) assert(r.identifier.name() == tableIdent) case ShowTablePropertiesCommand(t: TableIdentifier, _) => assert(t.identifier == tableIdent) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesParserSuite.scala index 16f3dea8d75ef..d68e1233f7ab2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesParserSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedNamespace} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedNamespace, UnresolvedPartitionSpec} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan -import org.apache.spark.sql.catalyst.plans.logical.{ShowTables, ShowTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.{ShowTableExtended, ShowTables} import org.apache.spark.sql.test.SharedSparkSession class ShowTablesParserSuite extends AnalysisTest with SharedSparkSession { @@ -52,25 +52,32 @@ class ShowTablesParserSuite extends AnalysisTest with SharedSparkSession { test("show table extended") { comparePlans( parsePlan("SHOW TABLE EXTENDED LIKE '*test*'"), - ShowTableStatement(None, "*test*", None)) + ShowTableExtended(UnresolvedNamespace(Seq.empty[String]), "*test*", None)) comparePlans( parsePlan(s"SHOW TABLE EXTENDED FROM $catalog.ns1.ns2 LIKE '*test*'"), - ShowTableStatement(Some(Seq(catalog, "ns1", "ns2")), "*test*", None)) + ShowTableExtended(UnresolvedNamespace(Seq(catalog, "ns1", "ns2")), "*test*", None)) comparePlans( parsePlan(s"SHOW TABLE EXTENDED IN $catalog.ns1.ns2 LIKE '*test*'"), - ShowTableStatement(Some(Seq(catalog, "ns1", "ns2")), "*test*", None)) + ShowTableExtended(UnresolvedNamespace(Seq(catalog, "ns1", "ns2")), "*test*", None)) comparePlans( parsePlan("SHOW TABLE EXTENDED LIKE '*test*' PARTITION(ds='2008-04-09', hr=11)"), - ShowTableStatement(None, "*test*", Some(Map("ds" -> "2008-04-09", "hr" -> "11")))) + ShowTableExtended( + UnresolvedNamespace(Seq.empty[String]), + "*test*", + Some(UnresolvedPartitionSpec(Map("ds" -> "2008-04-09", "hr" -> "11"))))) comparePlans( parsePlan(s"SHOW TABLE EXTENDED FROM $catalog.ns1.ns2 LIKE '*test*' " + "PARTITION(ds='2008-04-09')"), - ShowTableStatement(Some(Seq(catalog, "ns1", "ns2")), "*test*", - Some(Map("ds" -> "2008-04-09")))) + ShowTableExtended( + UnresolvedNamespace(Seq(catalog, "ns1", "ns2")), + "*test*", + Some(UnresolvedPartitionSpec(Map("ds" -> "2008-04-09"))))) comparePlans( parsePlan(s"SHOW TABLE EXTENDED IN $catalog.ns1.ns2 LIKE '*test*' " + "PARTITION(ds='2008-04-09')"), - ShowTableStatement(Some(Seq(catalog, "ns1", "ns2")), "*test*", - Some(Map("ds" -> "2008-04-09")))) + ShowTableExtended( + UnresolvedNamespace(Seq(catalog, "ns1", "ns2")), + "*test*", + Some(UnresolvedPartitionSpec(Map("ds" -> "2008-04-09"))))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowTablesSuite.scala index aff1729a000b6..370c8358e64da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowTablesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowTablesSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command.v2 import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, Row} -import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.connector.InMemoryTableCatalog import org.apache.spark.sql.execution.command import org.apache.spark.sql.test.SharedSparkSession @@ -74,7 +73,7 @@ class ShowTablesSuite extends command.ShowTablesSuiteBase with SharedSparkSessio val e = intercept[AnalysisException] { sql(sqlCommand) } - assert(e.message.contains(s"The database name is not valid: ${namespace}")) + assert(e.message.contains(s"SHOW TABLE EXTENDED is not supported for v2 tables")) } val namespace = s"$catalog.ns1.ns2" @@ -101,10 +100,10 @@ class ShowTablesSuite extends command.ShowTablesSuiteBase with SharedSparkSessio val table = "people" withTable(s"$catalog.$table") { sql(s"CREATE TABLE $catalog.$table (name STRING, id INT) $defaultUsing") - val errMsg = intercept[NoSuchDatabaseException] { + val errMsg = intercept[AnalysisException] { sql(s"SHOW TABLE EXTENDED FROM $catalog LIKE '*$table*'").collect() }.getMessage - assert(errMsg.contains(s"Database '$catalog' not found")) + assert(errMsg.contains("SHOW TABLE EXTENDED is not supported for v2 tables")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index dc97b7a55ee9a..6ba3d2723412b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -141,14 +141,14 @@ class DataSourceSuite extends SharedSparkSession with PrivateMethodTester { } test("Data source options should be propagated in method checkAndGlobPathIfNecessary") { - val dataSourceOptions = Map("fs.defaultFS" -> "nonexistsFs://nonexistsFs") + val dataSourceOptions = Map("fs.defaultFS" -> "nonexistentFs://nonexistentFs") val dataSource = DataSource(spark, "parquet", Seq("/path3"), options = dataSourceOptions) val checkAndGlobPathIfNecessary = PrivateMethod[Seq[Path]]('checkAndGlobPathIfNecessary) val message = intercept[java.io.IOException] { dataSource invokePrivate checkAndGlobPathIfNecessary(false, false) }.getMessage - val expectMessage = "No FileSystem for scheme nonexistsFs" + val expectMessage = "No FileSystem for scheme nonexistentFs" assert(message.filterNot(Set(':', '"').contains) == expectMessage) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index 2b5cb27d59ad9..c90732183cb7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -623,9 +623,9 @@ abstract class SchemaPruningSuite spark.read.format(dataSourceName).schema(schema).load(path + "/contacts") .createOrReplaceTempView("contacts") - val departmentScahem = "`depId` INT,`depName` STRING,`contactId` INT, " + + val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT, " + "`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>" - spark.read.format(dataSourceName).schema(departmentScahem).load(path + "/departments") + spark.read.format(dataSourceName).schema(departmentSchema).load(path + "/departments") .createOrReplaceTempView("departments") testThunk @@ -651,9 +651,9 @@ abstract class SchemaPruningSuite spark.read.format(dataSourceName).schema(schema).load(path + "/contacts") .createOrReplaceTempView("contacts") - val departmentScahem = "`depId` INT,`depName` STRING,`contactId` INT, " + + val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT, " + "`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>" - spark.read.format(dataSourceName).schema(departmentScahem).load(path + "/departments") + spark.read.format(dataSourceName).schema(departmentSchema).load(path + "/departments") .createOrReplaceTempView("departments") testThunk diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 8c5f7bed7c50d..2fe5953cbe12e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -183,7 +183,7 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS val oneBlockColumnMeta = oneBlockMeta.getColumns().get(0) // This is the important assert. Column stats are written, but they are ignored // when the data is read back as mentioned above, b/c int96 is unsigned. This - // assert makes sure this holds even if we change parquet versions (if eg. there + // assert makes sure this holds even if we change parquet versions (if e.g. there // were ever statistics even on unsigned columns). assert(!oneBlockColumnMeta.getStatistics.hasNonNullValue) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 5c41614c45b6f..400f4d8e1b156 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -1157,7 +1157,7 @@ class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite { test("SPARK-21463: MetadataLogFileIndex should respect userSpecifiedSchema for partition cols") { withTempDir { tempDir => val output = new File(tempDir, "output").toString - val checkpoint = new File(tempDir, "chkpoint").toString + val checkpoint = new File(tempDir, "checkpoint").toString try { val stream = MemoryStream[(String, Int)] val df = stream.toDS().toDF("time", "value") @@ -1303,7 +1303,7 @@ class ParquetV2PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite { test("SPARK-21463: MetadataLogFileIndex should respect userSpecifiedSchema for partition cols") { withTempDir { tempDir => val output = new File(tempDir, "output").toString - val checkpoint = new File(tempDir, "chkpoint").toString + val checkpoint = new File(tempDir, "checkpoint").toString try { val stream = MemoryStream[(String, Int)] val df = stream.toDS().toDF("time", "value") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 05d305a9b52ba..8f85fe3c52583 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -857,7 +857,7 @@ class ParquetV1QuerySuite extends ParquetQuerySuite { val df = spark.range(10).select(Seq.tabulate(11) {i => ('id + i).as(s"c$i")} : _*) df.write.mode(SaveMode.Overwrite).parquet(path) - // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) + // do not return batch - whole stage codegen is disabled for wide table (>200 columns) val df2 = spark.read.parquet(path) val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsColumnar) @@ -890,7 +890,7 @@ class ParquetV2QuerySuite extends ParquetQuerySuite { val df = spark.range(10).select(Seq.tabulate(11) {i => ('id + i).as(s"c$i")} : _*) df.write.mode(SaveMode.Overwrite).parquet(path) - // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) + // do not return batch - whole stage codegen is disabled for wide table (>200 columns) val df2 = spark.read.parquet(path) val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get val parquetScan2 = fileScan2.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index 296cbc3f3ad52..061799f439e5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -60,7 +60,7 @@ class EnsureRequirementsSuite extends SharedSparkSession { case other => fail(other.toString) } - // Both sides are PartitioningCollection, but left side cannot be reorderd to match + // Both sides are PartitioningCollection, but left side cannot be reordered to match // and it should fall back to the right side. val smjExec3 = SortMergeJoinExec( exprA :: exprC :: Nil, exprB :: exprA :: Nil, Inner, None, plan1, plan1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index b4f921efcac81..21d17f40abb34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -181,7 +181,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils assert(probes.toDouble > 1.0) } else { val mainValue = probes.split("\n").apply(1).stripPrefix("(").stripSuffix(")") - // Extract min, med, max from the string and strip off everthing else. + // Extract min, med, max from the string and strip off everything else. val index = mainValue.indexOf(" (", 0) mainValue.slice(0, index).split(", ").foreach { probe => assert(probe.toDouble > 1.0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 67dd88cbab63b..980d532dd4779 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -199,7 +199,7 @@ class HDFSMetadataLogSuite extends SharedSparkSession { intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), Some(5L))) intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), Some(5L))) - // Related to SPARK-26629, this capatures the behavior for verifyBatchIds when startId > endId + // Related to SPARK-26629, this captures the behavior for verifyBatchIds when startId > endId intercept[IllegalStateException](verifyBatchIds(Seq(), Some(2L), Some(1L))) intercept[AssertionError](verifyBatchIds(Seq(2), Some(2L), Some(1L))) intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala index a702e00ff9f92..dfc64a41d9f86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala @@ -24,10 +24,10 @@ class SparkPlanInfoSuite extends SharedSparkSession{ import testImplicits._ - def vaidateSparkPlanInfo(sparkPlanInfo: SparkPlanInfo): Unit = { + def validateSparkPlanInfo(sparkPlanInfo: SparkPlanInfo): Unit = { sparkPlanInfo.nodeName match { case "InMemoryTableScan" => assert(sparkPlanInfo.children.length == 1) - case _ => sparkPlanInfo.children.foreach(vaidateSparkPlanInfo) + case _ => sparkPlanInfo.children.foreach(validateSparkPlanInfo) } } @@ -39,6 +39,6 @@ class SparkPlanInfoSuite extends SharedSparkSession{ val planInfoResult = SparkPlanInfo.fromSparkPlan(dfWithCache.queryExecution.executedPlan) - vaidateSparkPlanInfo(planInfoResult) + validateSparkPlanInfo(planInfoResult) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index 567524ac75c2e..13b22dba1168b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -108,7 +108,7 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { .queryExecution.executedPlan) assert(res.length == 2) assert(res.forall { case (_, code, _) => - (code.contains("* Codegend pipeline") == flag) && + (code.contains("* Codegened pipeline") == flag) && (code.contains("// input[") == flag) }) } @@ -175,7 +175,7 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { df.hint("broadcast") } - // set local propert and assert + // set local property and assert val df2 = generateBroadcastDataFrame(confKey, confValue1) spark.sparkContext.setLocalProperty(confKey, confValue1) val checks = df1.join(df2).collect() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index f0b19071a969b..ede5fe538a028 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1418,7 +1418,7 @@ class JDBCSuite extends QueryTest } test("SPARK-24327 verify and normalize a partition column based on a JDBC resolved schema") { - def testJdbcParitionColumn(partColName: String, expectedColumnName: String): Unit = { + def testJdbcPartitionColumn(partColName: String, expectedColumnName: String): Unit = { val df = spark.read.format("jdbc") .option("url", urlWithUserAndPass) .option("dbtable", "TEST.PARTITION") @@ -1439,16 +1439,16 @@ class JDBCSuite extends QueryTest } } - testJdbcParitionColumn("THEID", "THEID") - testJdbcParitionColumn("\"THEID\"", "THEID") + testJdbcPartitionColumn("THEID", "THEID") + testJdbcPartitionColumn("\"THEID\"", "THEID") withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - testJdbcParitionColumn("ThEiD", "THEID") + testJdbcPartitionColumn("ThEiD", "THEID") } - testJdbcParitionColumn("THE ID", "THE ID") + testJdbcPartitionColumn("THE ID", "THE ID") def testIncorrectJdbcPartitionColumn(partColName: String): Unit = { val errMsg = intercept[AnalysisException] { - testJdbcParitionColumn(partColName, "THEID") + testJdbcPartitionColumn(partColName, "THEID") }.getMessage assert(errMsg.contains(s"User-defined partition column $partColName not found " + "in the JDBC relation:")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 0ff9303421ade..4ae8cdbeb4f1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -639,13 +639,14 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table") val tbl = spark.table("bucketed_table") - val agged = tbl.groupBy("i", "j").agg(max("k")) + val aggregated = tbl.groupBy("i", "j").agg(max("k")) checkAnswer( - agged.sort("i", "j"), + aggregated.sort("i", "j"), df1.groupBy("i", "j").agg(max("k")).sort("i", "j")) - assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty) + assert( + aggregated.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty) } } @@ -679,13 +680,14 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") val tbl = spark.table("bucketed_table") - val agged = tbl.groupBy("i", "j").agg(max("k")) + val aggregated = tbl.groupBy("i", "j").agg(max("k")) checkAnswer( - agged.sort("i", "j"), + aggregated.sort("i", "j"), df1.groupBy("i", "j").agg(max("k")).sort("i", "j")) - assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty) + assert( + aggregated.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty) } } @@ -806,9 +808,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { Utils.deleteRecursively(tableDir) df1.write.parquet(tableDir.getAbsolutePath) - val agged = spark.table("bucketed_table").groupBy("i").count() + val aggregated = spark.table("bucketed_table").groupBy("i").count() val error = intercept[Exception] { - agged.count() + aggregated.count() } assert(error.getCause().toString contains "Invalid bucket file") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 9464f7e4c1241..9a7c7e0edc409 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -234,7 +234,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession { } } - test("create table using as select - with overriden max number of buckets") { + test("create table using as select - with overridden max number of buckets") { def createTableSql(numBuckets: Int): String = s""" |CREATE TABLE t USING PARQUET diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index ca3e714665818..0da6b487e31ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -359,7 +359,7 @@ class TableScanSuite extends DataSourceTest with SharedSparkSession { val schemaNotMatch = intercept[Exception] { sql( s""" - |CREATE $tableType relationProvierWithSchema (i int) + |CREATE $tableType relationProviderWithSchema (i int) |USING org.apache.spark.sql.sources.SimpleScanSource |OPTIONS ( | From '1', @@ -373,7 +373,7 @@ class TableScanSuite extends DataSourceTest with SharedSparkSession { val schemaNeeded = intercept[Exception] { sql( s""" - |CREATE $tableType schemaRelationProvierWithoutSchema + |CREATE $tableType schemaRelationProviderWithoutSchema |USING org.apache.spark.sql.sources.AllDataTypesScanSource |OPTIONS ( | From '1', @@ -387,7 +387,7 @@ class TableScanSuite extends DataSourceTest with SharedSparkSession { test("read the data source tables that do not extend SchemaRelationProvider") { Seq("TEMPORARY VIEW", "TABLE").foreach { tableType => - val tableName = "relationProvierWithSchema" + val tableName = "relationProviderWithSchema" withTable (tableName) { sql( s""" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 3c74e316f260e..b240d2058a018 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1946,9 +1946,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest { test("SourceFileArchiver - fail when base archive path matches source pattern") { val fakeFileSystem = new FakeFileSystem("fake") - def assertThrowIllegalArgumentException(sourcePatttern: Path, baseArchivePath: Path): Unit = { + def assertThrowIllegalArgumentException(sourcePattern: Path, baseArchivePath: Path): Unit = { intercept[IllegalArgumentException] { - new SourceFileArchiver(fakeFileSystem, sourcePatttern, fakeFileSystem, baseArchivePath) + new SourceFileArchiver(fakeFileSystem, sourcePattern, fakeFileSystem, baseArchivePath) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index e64d5f6f3587e..ed284df10aced 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1064,13 +1064,13 @@ class StreamSuite extends StreamTest { } test("SPARK-30657: streaming limit should not apply on limits on state subplans") { - val streanData = MemoryStream[Int] - val streamingDF = streanData.toDF().toDF("value") + val streamData = MemoryStream[Int] + val streamingDF = streamData.toDF().toDF("value") val staticDF = spark.createDataset(Seq(1)).toDF("value").orderBy("value") testStream(streamingDF.join(staticDF.limit(1), "value"))( - AddData(streanData, 1, 2, 3), + AddData(streamData, 1, 2, 3), CheckAnswer(Row(1)), - AddData(streanData, 1, 3, 5), + AddData(streamData, 1, 3, 5), CheckAnswer(Row(1), Row(1))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 0296366f3578b..9cf649605ed1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -107,12 +107,12 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { } test("read: read table without streaming capability support") { - val tableIdentifer = "testcat.table_name" + val tableIdentifier = "testcat.table_name" - spark.sql(s"CREATE TABLE $tableIdentifer (id bigint, data string) USING foo") + spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo") intercept[AnalysisException] { - spark.readStream.table(tableIdentifer) + spark.readStream.table(tableIdentifier) }.message.contains("does not support either micro-batch or continuous scan") } @@ -213,7 +213,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { } test("write: write to non-exist table with custom catalog") { - val tableIdentifier = "testcat.nonexisttable" + val tableIdentifier = "testcat.nonexistenttable" withTable(tableIdentifier) { runTestWithStreamAppend(tableIdentifier) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala index c51faaf10f5dd..a1fd4a0215b1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala @@ -169,10 +169,10 @@ private[sql] trait SQLTestData { self => rdd } - protected lazy val calenderIntervalData: RDD[IntervalData] = { + protected lazy val calendarIntervalData: RDD[IntervalData] = { val rdd = spark.sparkContext.parallelize( IntervalData(new CalendarInterval(1, 1, 1)) :: Nil) - rdd.toDF().createOrReplaceTempView("calenderIntervalData") + rdd.toDF().createOrReplaceTempView("calendarIntervalData") rdd } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index cfc92a780308d..ed2e309fa075a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} trait SharedSparkSession extends SQLTestUtils with SharedSparkSessionBase { /** - * Suites extending [[SharedSparkSession]] are sharing resources (eg. SparkSession) in their + * Suites extending [[SharedSparkSession]] are sharing resources (e.g. SparkSession) in their * tests. That trait initializes the spark session in its [[beforeAll()]] implementation before * the automatic thread snapshot is performed, so the audit code could fail to report threads * leaked by that shared session. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index ce31e39985971..d6a4d76386889 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1048,7 +1048,8 @@ class HiveDDLSuite val message = intercept[AnalysisException] { sql("DROP VIEW tab1") }.getMessage - assert(message.contains("Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")) + assert(message.contains( + "tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead.")) } }