diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 7ccbb2dac90c8..b48bf2b79c6de 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -53,7 +53,7 @@ statement (PARTITIONED BY partitionColumnNames=identifierList)? bucketSpec? AS? query #createTableUsing | createTableHeader ('(' columns=colTypeList ')')? - (COMMENT STRING)? + (COMMENT comment=STRING)? (PARTITIONED BY '(' partitionColumns=colTypeList ')')? bucketSpec? skewSpec? rowFormat? createFileFormat? locationSpec? diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index b12606e17d380..f3ee99e045579 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -42,38 +42,54 @@ case class CatalogFunction( /** * Storage format, used to describe how a partition or a table is stored. + * + * @param locationUri the storage location of the table/partition, can be None if it has no storage, + * e.g. view, or it's not file-based, e.g. JDBC data source table. + * @param properties used to store some extra information for the storage. */ case class CatalogStorageFormat( locationUri: Option[String], - inputFormat: Option[String], - outputFormat: Option[String], - serde: Option[String], - compressed: Boolean, - serdeProperties: Map[String, String]) { + properties: Map[String, String]) { override def toString: String = { - val serdePropsToString = - if (serdeProperties.nonEmpty) { - s"Properties: " + serdeProperties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") + val propertiesToString = + if (properties.nonEmpty) { + s"Properties: " + properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") } else { "" } - val output = - Seq(locationUri.map("Location: " + _).getOrElse(""), - inputFormat.map("InputFormat: " + _).getOrElse(""), - outputFormat.map("OutputFormat: " + _).getOrElse(""), - if (compressed) "Compressed" else "", - serde.map("Serde: " + _).getOrElse(""), - serdePropsToString) + val output = Seq( + locationUri.map("Location: " + _).getOrElse(""), + propertiesToString) output.filter(_.nonEmpty).mkString("Storage(", ", ", ")") } + // TODO: remove these hive hacks + + def withInputFormat(inFmt: Option[String]): CatalogStorageFormat = { + inFmt.map(i => copy(properties = properties + ("inputFormat" -> i))).getOrElse(this) + } + + def withOutputFormat(outFmt: Option[String]): CatalogStorageFormat = { + outFmt.map(o => copy(properties = properties + ("outputFormat" -> o))).getOrElse(this) + } + + def withSerde(serde: Option[String]): CatalogStorageFormat = { + serde.map(s => copy(properties = properties + ("serde" -> s))).getOrElse(this) + } + + def getInputFormat: Option[String] = properties.get("inputFormat") + + def getOutputFormat: Option[String] = properties.get("outputFormat") + + def getSerde: Option[String] = properties.get("serde") + + def getProperties: Map[String, String] = properties - "inputFormat" - "outputFormat" - "serde" } object CatalogStorageFormat { /** Empty storage format for default values and copies. */ - val empty = CatalogStorageFormat(locationUri = None, inputFormat = None, - outputFormat = None, serde = None, compressed = false, serdeProperties = Map.empty) + val empty = CatalogStorageFormat(None, Map.empty) } /** @@ -115,6 +131,8 @@ case class CatalogTablePartition( * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the * future once we have a better understanding of how we want to handle skewed columns. * + * @param provider the name of the data source provider, e.g. parquet, json, etc. + * Note that `hive` is also one kind of provider. * @param unsupportedFeatures is a list of string descriptions of features that are used by the * underlying table but not supported by Spark SQL yet. */ @@ -123,6 +141,7 @@ case class CatalogTable( tableType: CatalogTableType, storage: CatalogStorageFormat, schema: Seq[CatalogColumn], + provider: Option[String] = None, partitionColumnNames: Seq[String] = Seq.empty, sortColumnNames: Seq[String] = Seq.empty, bucketColumnNames: Seq[String] = Seq.empty, @@ -158,16 +177,9 @@ case class CatalogTable( /** Return the fully qualified name of this table, assuming the database was specified. */ def qualifiedName: String = identifier.unquotedString - /** Syntactic sugar to update a field in `storage`. */ - def withNewStorage( - locationUri: Option[String] = storage.locationUri, - inputFormat: Option[String] = storage.inputFormat, - outputFormat: Option[String] = storage.outputFormat, - compressed: Boolean = false, - serde: Option[String] = storage.serde, - serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = { - copy(storage = CatalogStorageFormat( - locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties)) + /** Syntactic sugar to update the `storage`. */ + def mapStorage(f: CatalogStorageFormat => CatalogStorageFormat): CatalogTable = { + copy(storage = f(storage)) } override def toString: String = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index a9268535c40a8..4cdc633b2ca2a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -398,12 +398,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(oldPart2.storage.locationUri != Some(newLocation)) // alter other storage information catalog.alterPartitions("db2", "tbl2", Seq( - oldPart1.copy(storage = storageFormat.copy(serde = Some(newSerde))), - oldPart2.copy(storage = storageFormat.copy(serdeProperties = newSerdeProps)))) + oldPart1.copy(storage = storageFormat.withSerde(Some(newSerde))), + oldPart2.copy(storage = storageFormat.copy(properties = newSerdeProps)))) val newPart1b = catalog.getPartition("db2", "tbl2", part1.spec) val newPart2b = catalog.getPartition("db2", "tbl2", part2.spec) - assert(newPart1b.storage.serde == Some(newSerde)) - assert(newPart2b.storage.serdeProperties == newSerdeProps) + assert(newPart1b.storage.getSerde == Some(newSerde)) + assert(newPart2b.storage.getProperties == newSerdeProps) // alter but change spec, should fail because new partition specs do not exist yet val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2")) val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4")) @@ -550,7 +550,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("my_table", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")) ) @@ -569,7 +569,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat( Some(Utils.createTempDir().getAbsolutePath), - None, None, None, false, Map.empty), + Map.empty), schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")) ) catalog.createTable("db1", externalTable, ignoreIfExists = false) @@ -582,7 +582,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("tbl", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = Seq( CatalogColumn("col1", "int"), CatalogColumn("col2", "string"), @@ -609,7 +609,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac Map("a" -> "7", "b" -> "8"), CatalogStorageFormat( Some(Utils.createTempDir().getAbsolutePath), - None, None, None, false, Map.empty) + Map.empty) ) catalog.createPartitions("db1", "tbl", Seq(externalPartition), ignoreIfExists = false) assert(!exists(databaseDir, "tbl", "a=7", "b=8")) @@ -628,13 +628,9 @@ abstract class CatalogTestUtils { def newEmptyCatalog(): ExternalCatalog // These fields must be lazy because they rely on fields that are not implemented yet - lazy val storageFormat = CatalogStorageFormat( - locationUri = None, - inputFormat = Some(tableInputFormat), - outputFormat = Some(tableOutputFormat), - serde = None, - compressed = false, - serdeProperties = Map.empty) + lazy val storageFormat = CatalogStorageFormat.empty + .withInputFormat(Some(tableInputFormat)) + .withOutputFormat(Some(tableOutputFormat)) lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c5f4d58da43ac..74cad2b909e31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -949,21 +949,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // to include the partition columns here explicitly val schema = cols ++ partitionCols - // Storage format - val defaultStorage: CatalogStorageFormat = { + // default input/output format + val (defaultInputFormat, defaultOutputFormat) = { val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) - CatalogStorageFormat( - locationUri = None, - inputFormat = defaultHiveSerde.flatMap(_.inputFormat) - .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), - outputFormat = defaultHiveSerde.flatMap(_.outputFormat) - .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - // Note: Keep this unspecified because we use the presence of the serde to decide - // whether to convert a table created by CTAS to a datasource table. - serde = None, - compressed = false, - serdeProperties = Map()) + val inFmt = defaultHiveSerde.flatMap(_.inputFormat) + .getOrElse("org.apache.hadoop.mapred.TextInputFormat") + val outFmt = defaultHiveSerde.flatMap(_.outputFormat) + .getOrElse("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat") + inFmt -> outFmt } validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) @@ -977,11 +971,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val storage = CatalogStorageFormat( locationUri = location, - inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), - outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), - serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), - compressed = false, - serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties) + properties = rowStorage.getProperties ++ fileStorage.getProperties + ).withInputFormat(Some(defaultInputFormat)) + .withInputFormat(fileStorage.getInputFormat) + .withOutputFormat(Some(defaultOutputFormat)) + .withOutputFormat(fileStorage.getOutputFormat) + // if both file format and row format specifies serde, the one from row format wins. + .withSerde(fileStorage.getSerde) + .withSerde(rowStorage.getSerde) // If location is defined, we'll assume this is an external table. // Otherwise, we may accidentally delete existing data. val tableType = if (external || location.isDefined) { @@ -1085,9 +1082,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { */ override def visitTableFileFormat( ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { - CatalogStorageFormat.empty.copy( - inputFormat = Option(string(ctx.inFmt)), - outputFormat = Option(string(ctx.outFmt))) + CatalogStorageFormat.empty + .withInputFormat(Option(ctx.inFmt).map(string)) + .withOutputFormat(Option(ctx.outFmt).map(string)) } /** @@ -1098,10 +1095,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val source = ctx.identifier.getText HiveSerDe.sourceToSerDe(source, conf) match { case Some(s) => - CatalogStorageFormat.empty.copy( - inputFormat = s.inputFormat, - outputFormat = s.outputFormat, - serde = s.serde) + CatalogStorageFormat.empty + .withInputFormat(s.inputFormat) + .withOutputFormat(s.outputFormat) + .withSerde(s.serde) case None => operationNotAllowed(s"STORED AS with file format '$source'", ctx) } @@ -1138,9 +1135,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { override def visitRowFormatSerde( ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { import ctx._ - CatalogStorageFormat.empty.copy( - serde = Option(string(name)), - serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) + CatalogStorageFormat.empty + .copy(properties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) + .withSerde(Option(name).map(string)) } /** @@ -1168,7 +1165,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ctx) "line.delim" -> value } - CatalogStorageFormat.empty.copy(serdeProperties = entries.toMap) + CatalogStorageFormat.empty.copy(properties = entries.toMap) } /** @@ -1296,6 +1293,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) { ctx.colType.asScala.map { col => CatalogColumn( + // TODO: it should be case preserving. col.identifier.getText.toLowerCase, // Note: for types like "STRUCT" we can't // just convert the whole type string to lower case, otherwise the struct field names @@ -1344,7 +1342,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { case c: RowFormatSerdeContext => // Use a serde format. - val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) + val props = Option(c.props).map(visitPropertyKeyValues).getOrElse(Nil) + val name = string(c.name) // SPARK-10310: Special cases LazySimpleSerDe val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index c38eca5156e5a..3aa36ecdb5541 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -388,11 +388,7 @@ object CreateDataSourceTableUtils extends Logging { schema = Nil, storage = CatalogStorageFormat( locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = options + properties = options ), properties = tableProperties.toMap) } @@ -403,17 +399,18 @@ object CreateDataSourceTableUtils extends Logging { assert(partitionColumns.isEmpty) assert(relation.partitionSchema.isEmpty) + val storage = CatalogStorageFormat( + locationUri = Some(relation.location.paths.map(_.toUri.toString).head), + properties = options) + .withInputFormat(serde.inputFormat) + .withOutputFormat(serde.outputFormat) + .withSerde(serde.serde) + CatalogTable( identifier = tableIdent, tableType = tableType, - storage = CatalogStorageFormat( - locationUri = Some(relation.location.paths.map(_.toUri.toString).head), - inputFormat = serde.inputFormat, - outputFormat = serde.outputFormat, - serde = serde.serde, - compressed = false, - serdeProperties = options - ), + storage = storage, + provider = Some("hive"), schema = relation.schema.map { f => CatalogColumn(f.name, f.dataType.catalogString) }, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a3a057a5628fe..560cd27c46486 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -318,16 +318,19 @@ case class AlterTableSerDePropertiesCommand( "not supported for tables created with the datasource API") } if (partSpec.isEmpty) { - val newTable = table.withNewStorage( - serde = serdeClassName.orElse(table.storage.serde), - serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map())) + val newTable = table.mapStorage { storage => + storage + .copy(properties = storage.properties ++ serdeProperties.getOrElse(Map.empty)) + .withSerde(serdeClassName) + } catalog.alterTable(newTable) } else { val spec = partSpec.get val part = catalog.getPartition(tableName, spec) - val newPart = part.copy(storage = part.storage.copy( - serde = serdeClassName.orElse(part.storage.serde), - serdeProperties = part.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))) + val newStorage = part.storage.copy( + properties = part.storage.properties ++ serdeProperties.getOrElse(Map.empty) + ).withSerde(serdeClassName) + val newPart = part.copy(storage = newStorage) catalog.alterPartitions(tableName, Seq(newPart)) } Seq.empty[Row] @@ -464,11 +467,13 @@ case class AlterTableSetLocationCommand( // No partition spec is specified, so we set the location for the table itself val newTable = if (DDLUtils.isDatasourceTable(table)) { - table.withNewStorage( - locationUri = Some(location), - serdeProperties = table.storage.serdeProperties ++ Map("path" -> location)) + table.mapStorage { storage => + storage.copy( + locationUri = Some(location), + properties = storage.properties + ("path" -> location)) + } } else { - table.withNewStorage(locationUri = Some(location)) + table.mapStorage(_.copy(locationUri = Some(location))) } catalog.alterTable(newTable) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 5c815df0deb9e..960602b9ac49c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -43,15 +43,16 @@ case class CreateHiveTableAsSelectLogicalPlan( tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean) extends UnaryNode with Command { + assert(tableDesc.provider == Some("hive")) override def output: Seq[Attribute] = Seq.empty[Attribute] override lazy val resolved: Boolean = tableDesc.identifier.database.isDefined && tableDesc.schema.nonEmpty && - tableDesc.storage.serde.isDefined && - tableDesc.storage.inputFormat.isDefined && - tableDesc.storage.outputFormat.isDefined && + tableDesc.storage.getSerde.isDefined && + tableDesc.storage.getInputFormat.isDefined && + tableDesc.storage.getOutputFormat.isDefined && childrenResolved } @@ -84,7 +85,7 @@ case class CreateTableLikeCommand( identifier = targetTable, tableType = CatalogTableType.MANAGED, createTime = System.currentTimeMillis, - lastAccessTime = -1).withNewStorage(locationUri = None) + lastAccessTime = -1).mapStorage(_.copy(locationUri = None)) catalog.createTable(tableToCreate, ifNotExists) Seq.empty[Row] @@ -117,10 +118,11 @@ case class CreateTableLikeCommand( * }}} */ case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { + assert(table.provider == Some("hive")) override def run(sparkSession: SparkSession): Seq[Row] = { DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE") - DDLUtils.verifyTableProperties(table.storage.serdeProperties.keys.toSeq, "CREATE TABLE") + DDLUtils.verifyTableProperties(table.storage.getProperties.keys.toSeq, "CREATE TABLE") sparkSession.sessionState.catalog.createTable(table, ifNotExists) Seq.empty[Row] } @@ -166,8 +168,9 @@ case class AlterTableRenameCommand( val table = catalog.getTableMetadata(oldName) if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) { val newPath = catalog.defaultTablePath(newName) - val newTable = table.withNewStorage( - serdeProperties = table.storage.serdeProperties ++ Map("path" -> newPath)) + val newTable = table.mapStorage { storage => + storage.copy(properties = storage.properties + ("path" -> newPath)) + } catalog.alterTable(newTable) } // Invalidate the table last, otherwise uncaching the table would load the logical plan @@ -349,7 +352,7 @@ case class TruncateTableCommand( } val locations = if (isDatasourceTable) { - Seq(table.storage.serdeProperties.get("path")) + Seq(table.storage.properties.get("path")) } else if (table.partitionColumnNames.isEmpty) { Seq(table.storage.locationUri) } else { @@ -443,6 +446,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF partCols.foreach(col => append(buffer, col, "", "")) } } else { + assert(table.provider == Some("hive")) describeSchema(table.schema, buffer) if (table.partitionColumns.nonEmpty) { @@ -487,15 +491,15 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { append(buffer, "", "", "") append(buffer, "# Storage Information", "", "") - metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) - metadata.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) - metadata.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) - append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else "No", "") + + metadata.storage.getSerde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) + metadata.storage.getInputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) + metadata.storage.getOutputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) describeBucketingInfo(metadata, buffer) append(buffer, "Storage Desc Parameters:", "", "") - metadata.storage.serdeProperties.foreach { case (key, value) => - append(buffer, s" $key", value, "") + metadata.storage.getProperties.foreach { + case (key, value) => append(buffer, s" $key", value, "") } } @@ -741,6 +745,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { showCreateDataSourceTable(tableMetadata) } else { + assert(tableMetadata.provider == Some("hive")) showCreateHiveTable(tableMetadata) } @@ -821,10 +826,10 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman private def showHiveTableStorageInfo(metadata: CatalogTable, builder: StringBuilder): Unit = { val storage = metadata.storage - storage.serde.foreach { serde => + storage.getSerde.foreach { serde => builder ++= s"ROW FORMAT SERDE '$serde'\n" - val serdeProps = metadata.storage.serdeProperties.map { + val serdeProps = metadata.storage.getProperties.map { case (key, value) => s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'" } @@ -832,14 +837,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder ++= serdeProps.mkString("WITH SERDEPROPERTIES (\n ", ",\n ", "\n)\n") } - if (storage.inputFormat.isDefined || storage.outputFormat.isDefined) { + if (storage.getInputFormat.isDefined || storage.getOutputFormat.isDefined) { builder ++= "STORED AS\n" - storage.inputFormat.foreach { format => + storage.getInputFormat.foreach { format => builder ++= s" INPUTFORMAT '${escapeSingleQuotedString(format)}'\n" } - storage.outputFormat.foreach { format => + storage.getOutputFormat.foreach { format => builder ++= s" OUTPUTFORMAT '${escapeSingleQuotedString(format)}'\n" } } @@ -894,7 +899,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder ++= s"USING ${props(CreateDataSourceTableUtils.DATASOURCE_PROVIDER)}\n" - val dataSourceOptions = metadata.storage.serdeProperties.filterNot { + val dataSourceOptions = metadata.storage.properties.filterNot { case (key, value) => // If it's a managed table, omit PATH option. Spark SQL always creates external table // when the table creation DDL contains the PATH option. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 0841636d3309f..6bd11130902fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -214,7 +214,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[ val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table) - val options = table.storage.serdeProperties + val options = table.storage.getProperties val dataSource = DataSource( sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index b170a3a77ee04..02ac64ebe7c70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -246,9 +246,9 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTableCommand](query) val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == hiveSerde.get.serde) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + assert(ct.table.storage.getSerde == hiveSerde.get.serde) + assert(ct.table.storage.getInputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.getOutputFormat == hiveSerde.get.outputFormat) } } @@ -260,13 +260,13 @@ class DDLCommandSuite extends PlanTest { // No conflicting serdes here, OK val parsed1 = parseAs[CreateTableCommand](query1) - assert(parsed1.table.storage.serde == Some("anything")) - assert(parsed1.table.storage.inputFormat == Some("inputfmt")) - assert(parsed1.table.storage.outputFormat == Some("outputfmt")) + assert(parsed1.table.storage.getSerde == Some("anything")) + assert(parsed1.table.storage.getInputFormat == Some("inputfmt")) + assert(parsed1.table.storage.getOutputFormat == Some("outputfmt")) val parsed2 = parseAs[CreateTableCommand](query2) - assert(parsed2.table.storage.serde.isEmpty) - assert(parsed2.table.storage.inputFormat == Some("inputfmt")) - assert(parsed2.table.storage.outputFormat == Some("outputfmt")) + assert(parsed2.table.storage.getSerde.isEmpty) + assert(parsed2.table.storage.getInputFormat == Some("inputfmt")) + assert(parsed2.table.storage.getOutputFormat == Some("outputfmt")) } test("create table - row format serde and generic file format") { @@ -279,9 +279,9 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTableCommand](query) val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == Some("anything")) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + assert(ct.table.storage.getSerde == Some("anything")) + assert(ct.table.storage.getInputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.getOutputFormat == hiveSerde.get.outputFormat) } else { assertUnsupported(query, Seq("row format serde", "incompatible", s)) } @@ -298,9 +298,9 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTableCommand](query) val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == hiveSerde.get.serde) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + assert(ct.table.storage.getSerde == hiveSerde.get.serde) + assert(ct.table.storage.getInputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.getOutputFormat == hiveSerde.get.outputFormat) } else { assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 169250d9bb1c2..413e789077f12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -80,11 +80,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val storage = CatalogStorageFormat( locationUri = Some(catalog.defaultTablePath(name)), - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = Map()) + properties = Map.empty) CatalogTable( identifier = name, tableType = CatalogTableType.EXTERNAL, @@ -107,7 +103,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { spec: TablePartitionSpec, tableName: TableIdentifier): Unit = { val part = CatalogTablePartition( - spec, CatalogStorageFormat(None, None, None, None, false, Map())) + spec, CatalogStorageFormat.empty) catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false) } @@ -891,9 +887,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { convertToDatasourceTable(catalog, tableIdent) } assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined) - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.getProperties.isEmpty) assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty) - assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty) + assert(catalog.getPartition(tableIdent, partSpec).storage.getProperties.isEmpty) // Verify that the location is set to the expected string def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = { val storageFormat = spec @@ -901,10 +897,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { .getOrElse { catalog.getTableMetadata(tableIdent).storage } if (isDatasourceTable) { if (spec.isDefined) { - assert(storageFormat.serdeProperties.isEmpty) + assert(storageFormat.getProperties.isEmpty) assert(storageFormat.locationUri.isEmpty) } else { - assert(storageFormat.serdeProperties.get("path") === Some(expected)) + assert(storageFormat.properties.get("path") === Some(expected)) assert(storageFormat.locationUri === Some(expected)) } } else { @@ -946,8 +942,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { if (isDatasourceTable) { convertToDatasourceTable(catalog, tableIdent) } - assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty) - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.getSerde.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.getProperties.isEmpty) // set table serde and/or properties (should fail on datasource tables) if (isDatasourceTable) { val e1 = intercept[AnalysisException] { @@ -961,22 +957,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e2.getMessage.contains("datasource")) } else { sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'") - assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop")) - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.getSerde == Some("org.apache.jadoop")) + assert(catalog.getTableMetadata(tableIdent).storage.getProperties.isEmpty) sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " + "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')") - assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop")) - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties == + assert(catalog.getTableMetadata(tableIdent).storage.getSerde == Some("org.apache.madoop")) + assert(catalog.getTableMetadata(tableIdent).storage.getProperties == Map("k" -> "v", "kay" -> "vee")) } // set serde properties only sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')") - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties == + assert(catalog.getTableMetadata(tableIdent).storage.getProperties == Map("k" -> "vvv", "kay" -> "vee")) // set things without explicitly specifying database catalog.setCurrentDatabase("dbx") sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')") - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties == + assert(catalog.getTableMetadata(tableIdent).storage.getProperties == Map("k" -> "vvv", "kay" -> "veee")) // table to alter does not exist intercept[AnalysisException] { @@ -1002,8 +998,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { if (isDatasourceTable) { convertToDatasourceTable(catalog, tableIdent) } - assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty) - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty) + assert(catalog.getPartition(tableIdent, spec).storage.getSerde.isEmpty) + assert(catalog.getPartition(tableIdent, spec).storage.getProperties.isEmpty) // set table serde and/or properties (should fail on datasource tables) if (isDatasourceTable) { val e1 = intercept[AnalysisException] { @@ -1017,26 +1013,26 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e2.getMessage.contains("datasource")) } else { sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'") - assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop")) - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty) + assert(catalog.getPartition(tableIdent, spec).storage.getSerde == Some("org.apache.jadoop")) + assert(catalog.getPartition(tableIdent, spec).storage.getProperties.isEmpty) sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " + "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')") - assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop")) - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties == + assert(catalog.getPartition(tableIdent, spec).storage.getSerde == Some("org.apache.madoop")) + assert(catalog.getPartition(tableIdent, spec).storage.getProperties == Map("k" -> "v", "kay" -> "vee")) } // set serde properties only maybeWrapException(isDatasourceTable) { sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " + "SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')") - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties == + assert(catalog.getPartition(tableIdent, spec).storage.getProperties == Map("k" -> "vvv", "kay" -> "vee")) } // set things without explicitly specifying database catalog.setCurrentDatabase("dbx") maybeWrapException(isDatasourceTable) { sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')") - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties == + assert(catalog.getPartition(tableIdent, spec).storage.getProperties == Map("k" -> "vvv", "kay" -> "veee")) } // table to alter does not exist diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index cf2b92fb898df..eb486a14c4eae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -179,7 +179,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu try { client.createTable( - tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)), + tableDefinition.mapStorage(_.copy(locationUri = Some(tempPath.toString))), ignoreIfExists) } finally { FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2be51ed0e87e7..02f0749d4c1dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -119,7 +119,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort")) } - val options = table.storage.serdeProperties + val options = table.storage.getProperties val dataSource = DataSource( sparkSession, @@ -441,10 +441,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case p: LogicalPlan if p.resolved => p case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) => - val desc = if (table.storage.serde.isEmpty) { + val desc = if (table.storage.getSerde.isEmpty) { // add default serde - table.withNewStorage( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + table.mapStorage(_.withSerde(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))) } else { table } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 3ab1bdabb99b3..406cebcc0e359 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -84,28 +84,13 @@ private[hive] case class MetastoreRelation( case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString }) - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() - tTable.setSd(sd) - // Note: In Hive the schema and partition columns must be disjoint sets val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c => catalogTable.partitionColumnNames.contains(c.getName) } - sd.setCols(schema.asJava) + val sd = toHiveStorage(catalogTable.storage, schema) + tTable.setSd(sd) tTable.setPartitionKeys(partCols.asJava) - - catalogTable.storage.locationUri.foreach(sd.setLocation) - catalogTable.storage.inputFormat.foreach(sd.setInputFormat) - catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) - - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib) - sd.setSerdeInfo(serdeInfo) - - val serdeParameters = new java.util.HashMap[String, String]() - catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - serdeInfo.setParameters(serdeParameters) - new HiveTable(tTable) } @@ -160,25 +145,28 @@ private[hive] case class MetastoreRelation( tPartition.setTableName(tableName) tPartition.setValues(partitionKeys.map(a => p.spec(a.name)).asJava) - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() + val sd = toHiveStorage(p.storage, catalogTable.schema.map(toHiveColumn)) tPartition.setSd(sd) - sd.setCols(catalogTable.schema.map(toHiveColumn).asJava) - p.storage.locationUri.foreach(sd.setLocation) - p.storage.inputFormat.foreach(sd.setInputFormat) - p.storage.outputFormat.foreach(sd.setOutputFormat) + new Partition(hiveQlTable, tPartition) + } + } - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - sd.setSerdeInfo(serdeInfo) - // maps and lists should be set only after all elements are ready (see HIVE-7975) - p.storage.serde.foreach(serdeInfo.setSerializationLib) + private def toHiveStorage(storage: CatalogStorageFormat, schema: Seq[FieldSchema]) = { + val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() + sd.setCols(schema.asJava) + storage.locationUri.foreach(sd.setLocation) + storage.getInputFormat.foreach(sd.setInputFormat) + storage.getOutputFormat.foreach(sd.setOutputFormat) - val serdeParameters = new java.util.HashMap[String, String]() - catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - serdeInfo.setParameters(serdeParameters) + val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo + sd.setSerdeInfo(serdeInfo) + // maps and lists should be set only after all elements are ready (see HIVE-7975) + storage.getSerde.foreach(serdeInfo.setSerializationLib) - new Partition(hiveQlTable, tPartition) - } + val serdeParameters = new java.util.HashMap[String, String]() + storage.getProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + serdeInfo.setParameters(serdeParameters) + sd } /** Only compare database and tablename, not alias. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7e0cef3e355d5..8395120c97791 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -355,6 +355,26 @@ private[hive] class HiveClientImpl( val properties = h.getParameters.asScala.toMap + val storage = CatalogStorageFormat( + locationUri = shim.getDataLocation(h).filterNot { _ => + // SPARK-15269: Persisted data source tables always store the location URI as a SerDe + // property named "path" instead of standard Hive `dataLocation`, because Hive only + // allows directory paths as location URIs while Spark SQL data source tables also + // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL + // data source tables. + DDLUtils.isDatasourceTable(properties) && + h.getTableType == HiveTableType.EXTERNAL_TABLE && + // Spark SQL may also save external data source in Hive compatible format when + // possible, so that these tables can be directly accessed by Hive. For these tables, + // `dataLocation` is still necessary. Here we also check for input format class + // because only these Hive compatible tables set this field. + h.getInputFormatClass == null + }, + properties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap + ).withInputFormat(Option(h.getInputFormatClass).map(_.getName)) + .withOutputFormat(Option(h.getOutputFormatClass).map(_.getName)) + .withSerde(Option(h.getSerializationLib)) + CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { @@ -364,6 +384,7 @@ private[hive] class HiveClientImpl( case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW }, schema = schema, + provider = Some("hive"), partitionColumnNames = partCols.map(_.name), sortColumnNames = Seq(), // TODO: populate this bucketColumnNames = h.getBucketCols.asScala, @@ -371,27 +392,7 @@ private[hive] class HiveClientImpl( owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, - storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).filterNot { _ => - // SPARK-15269: Persisted data source tables always store the location URI as a SerDe - // property named "path" instead of standard Hive `dataLocation`, because Hive only - // allows directory paths as location URIs while Spark SQL data source tables also - // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL - // data source tables. - DDLUtils.isDatasourceTable(properties) && - h.getTableType == HiveTableType.EXTERNAL_TABLE && - // Spark SQL may also save external data source in Hive compatible format when - // possible, so that these tables can be directly accessed by Hive. For these tables, - // `dataLocation` is still necessary. Here we also check for input format class - // because only these Hive compatible tables set this field. - h.getInputFormatClass == null - }, - inputFormat = Option(h.getInputFormatClass).map(_.getName), - outputFormat = Option(h.getOutputFormatClass).map(_.getName), - serde = Option(h.getSerializationLib), - compressed = h.getTTable.getSd.isCompressed, - serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap - ), + storage = storage, properties = properties, viewOriginalText = Option(h.getViewOriginalText), viewText = Option(h.getViewExpandedText), @@ -770,11 +771,11 @@ private[hive] class HiveClientImpl( hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } - table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) - table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) + table.storage.getInputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) + table.storage.getOutputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( - table.storage.serde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } + table.storage.getSerde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + table.storage.getProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) } table.comment.foreach { c => hiveTable.setProperty("comment", c) } table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) } @@ -795,10 +796,10 @@ private[hive] class HiveClientImpl( val storageDesc = new StorageDescriptor val serdeInfo = new SerDeInfo p.storage.locationUri.foreach(storageDesc.setLocation) - p.storage.inputFormat.foreach(storageDesc.setInputFormat) - p.storage.outputFormat.foreach(storageDesc.setOutputFormat) - p.storage.serde.foreach(serdeInfo.setSerializationLib) - serdeInfo.setParameters(p.storage.serdeProperties.asJava) + p.storage.getInputFormat.foreach(storageDesc.setInputFormat) + p.storage.getOutputFormat.foreach(storageDesc.setOutputFormat) + p.storage.getSerde.foreach(serdeInfo.setSerializationLib) + serdeInfo.setParameters(p.storage.getProperties.asJava) storageDesc.setSerdeInfo(serdeInfo) tpart.setDbName(ht.getDbName) tpart.setTableName(ht.getTableName) @@ -809,14 +810,16 @@ private[hive] class HiveClientImpl( private def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition + + val storage = CatalogStorageFormat( + locationUri = Option(apiPartition.getSd.getLocation), + properties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap + ).withInputFormat(Option(apiPartition.getSd.getInputFormat)) + .withOutputFormat(Option(apiPartition.getSd.getOutputFormat)) + .withSerde(Option(apiPartition.getSd.getSerdeInfo.getSerializationLib)) + CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), - storage = CatalogStorageFormat( - locationUri = Option(apiPartition.getSd.getLocation), - inputFormat = Option(apiPartition.getSd.getInputFormat), - outputFormat = Option(apiPartition.getSd.getOutputFormat), - serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), - compressed = apiPartition.getSd.isCompressed, - serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) + storage = storage) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 2762e0cdd56ab..ffe6ac2d4ce37 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -52,15 +52,13 @@ case class CreateHiveTableAsSelectCommand( import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.TextInputFormat - val withFormat = - tableDesc.withNewStorage( - inputFormat = - tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)), - outputFormat = - tableDesc.storage.outputFormat - .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)), - serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)), - compressed = tableDesc.storage.compressed) + val withFormat = tableDesc.mapStorage { storage => + storage + .withInputFormat(storage.getInputFormat.orElse(Some(classOf[TextInputFormat].getName))) + .withOutputFormat(storage.getOutputFormat + .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName))) + .withSerde(storage.getSerde.orElse(Some(classOf[LazySimpleSerDe].getName))) + } val withSchema = if (withFormat.schema.isEmpty) { // Hive doesn't support specifying the column list for target table in CTAS diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 867aadb5f5569..b47f4dd95533c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -69,9 +69,9 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.partitionColumns == Seq.empty[CatalogColumn]) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.storage.serde == + assert(desc.storage.getInputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.getOutputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.getSerde == Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) } @@ -100,10 +100,10 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.partitionColumns == Seq.empty[CatalogColumn]) - assert(desc.storage.serdeProperties == Map()) - assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) - assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) - assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe")) + assert(desc.storage.getProperties == Map()) + assert(desc.storage.getInputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) + assert(desc.storage.getOutputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) + assert(desc.storage.getSerde == Some("parquet.hive.serde.ParquetHiveSerDe")) assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) } @@ -118,11 +118,11 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText assert(desc.viewOriginalText.isEmpty) - assert(desc.storage.serdeProperties == Map()) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) - assert(desc.storage.outputFormat == + assert(desc.storage.getProperties == Map()) + assert(desc.storage.getInputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(desc.storage.getOutputFormat == Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.getSerde.isEmpty) assert(desc.properties == Map()) } @@ -154,10 +154,10 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText assert(desc.viewOriginalText.isEmpty) - assert(desc.storage.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) + assert(desc.storage.getProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) + assert(desc.storage.getInputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.getOutputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.getSerde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22"))) } @@ -300,12 +300,12 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.storage.locationUri.isEmpty) - assert(desc.storage.inputFormat == + assert(desc.storage.getInputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) - assert(desc.storage.outputFormat == + assert(desc.storage.getOutputFormat == Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde.isEmpty) - assert(desc.storage.serdeProperties.isEmpty) + assert(desc.storage.getSerde.isEmpty) + assert(desc.storage.getProperties.isEmpty) assert(desc.properties.isEmpty) assert(desc.comment.isEmpty) } @@ -390,11 +390,11 @@ class HiveDDLCommandSuite extends PlanTest { val (desc1, _) = extractTableDesc(query1) val (desc2, _) = extractTableDesc(query2) val (desc3, _) = extractTableDesc(query3) - assert(desc1.storage.serde == Some("org.apache.poof.serde.Baff")) - assert(desc1.storage.serdeProperties.isEmpty) - assert(desc2.storage.serde == Some("org.apache.poof.serde.Baff")) - assert(desc2.storage.serdeProperties == Map("k1" -> "v1")) - assert(desc3.storage.serdeProperties == Map( + assert(desc1.storage.getSerde == Some("org.apache.poof.serde.Baff")) + assert(desc1.storage.getProperties.isEmpty) + assert(desc2.storage.getSerde == Some("org.apache.poof.serde.Baff")) + assert(desc2.storage.getProperties == Map("k1" -> "v1")) + assert(desc3.storage.getProperties == Map( "field.delim" -> "x", "escape.delim" -> "y", "serialization.format" -> "x", @@ -409,12 +409,13 @@ class HiveDDLCommandSuite extends PlanTest { val query2 = s"$baseQuery ORC" val (desc1, _) = extractTableDesc(query1) val (desc2, _) = extractTableDesc(query2) - assert(desc1.storage.inputFormat == Some("winput")) - assert(desc1.storage.outputFormat == Some("wowput")) - assert(desc1.storage.serde.isEmpty) - assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) - assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) - assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + assert(desc1.storage.getInputFormat == Some("winput")) + assert(desc1.storage.getOutputFormat == Some("wowput")) + assert(desc1.storage.getSerde.isEmpty) + assert(desc2.storage.getInputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) + assert(desc2.storage.getOutputFormat == + Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) + assert(desc2.storage.getSerde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) } test("create table - storage handler") { @@ -460,10 +461,10 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.storage.locationUri == Some("/path/to/mercury")) - assert(desc.storage.inputFormat == Some("winput")) - assert(desc.storage.outputFormat == Some("wowput")) - assert(desc.storage.serde == Some("org.apache.poof.serde.Baff")) - assert(desc.storage.serdeProperties == Map("k1" -> "v1")) + assert(desc.storage.getInputFormat == Some("winput")) + assert(desc.storage.getOutputFormat == Some("wowput")) + assert(desc.storage.getSerde == Some("org.apache.poof.serde.Baff")) + assert(desc.storage.getProperties == Map("k1" -> "v1")) assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2")) assert(desc.comment == Some("no comment")) } @@ -479,10 +480,10 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == Option("SELECT * FROM tab1")) assert(desc.viewOriginalText == Option("SELECT * FROM tab1")) - assert(desc.storage.serdeProperties == Map()) - assert(desc.storage.inputFormat.isEmpty) - assert(desc.storage.outputFormat.isEmpty) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.getProperties == Map()) + assert(desc.storage.getInputFormat.isEmpty) + assert(desc.storage.getOutputFormat.isEmpty) + assert(desc.storage.getSerde.isEmpty) assert(desc.properties == Map()) } @@ -505,10 +506,10 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("col3", null, nullable = true, None) :: Nil) assert(desc.viewText == Option("SELECT * FROM tab1")) assert(desc.viewOriginalText == Option("SELECT * FROM tab1")) - assert(desc.storage.serdeProperties == Map()) - assert(desc.storage.inputFormat.isEmpty) - assert(desc.storage.outputFormat.isEmpty) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.getProperties == Map()) + assert(desc.storage.getInputFormat.isEmpty) + assert(desc.storage.getOutputFormat.isEmpty) + assert(desc.storage.getSerde.isEmpty) assert(desc.properties == Map("prop1Key" -> "prop1Val")) assert(desc.comment == Option("BLABLA")) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 754aabb5ac936..84ec5941e0c55 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -93,9 +93,9 @@ class DataSourceWithHiveMetastoreCatalogSuite } val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) - assert(hiveTable.storage.inputFormat === Some(inputFormat)) - assert(hiveTable.storage.outputFormat === Some(outputFormat)) - assert(hiveTable.storage.serde === Some(serde)) + assert(hiveTable.storage.getInputFormat === Some(inputFormat)) + assert(hiveTable.storage.getOutputFormat === Some(outputFormat)) + assert(hiveTable.storage.getSerde === Some(serde)) assert(hiveTable.partitionColumnNames.isEmpty) assert(hiveTable.tableType === CatalogTableType.MANAGED) @@ -125,9 +125,9 @@ class DataSourceWithHiveMetastoreCatalogSuite val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) - assert(hiveTable.storage.inputFormat === Some(inputFormat)) - assert(hiveTable.storage.outputFormat === Some(outputFormat)) - assert(hiveTable.storage.serde === Some(serde)) + assert(hiveTable.storage.getInputFormat === Some(inputFormat)) + assert(hiveTable.storage.getOutputFormat === Some(outputFormat)) + assert(hiveTable.storage.getSerde === Some(serde)) assert(hiveTable.tableType === CatalogTableType.EXTERNAL) assert(hiveTable.storage.locationUri === @@ -157,9 +157,9 @@ class DataSourceWithHiveMetastoreCatalogSuite val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) - assert(hiveTable.storage.inputFormat === Some(inputFormat)) - assert(hiveTable.storage.outputFormat === Some(outputFormat)) - assert(hiveTable.storage.serde === Some(serde)) + assert(hiveTable.storage.getInputFormat === Some(inputFormat)) + assert(hiveTable.storage.getOutputFormat === Some(outputFormat)) + assert(hiveTable.storage.getSerde === Some(serde)) assert(hiveTable.partitionColumnNames.isEmpty) assert(hiveTable.tableType === CatalogTableType.EXTERNAL) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 12d250d4fb604..9cace78c517fe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -729,11 +729,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv schema = Seq.empty, storage = CatalogStorageFormat( locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = Map( + properties = Map( "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), properties = Map( @@ -1171,8 +1167,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv checkAnswer(table("t"), Seq(Row(1, 2, 3), Row(2, 3, 4))) val catalogTable = sharedState.externalCatalog.getTable("default", "t") // there should not be a lowercase key 'path' now - assert(catalogTable.storage.serdeProperties.get("path").isEmpty) - assert(catalogTable.storage.serdeProperties.get("PATH").isDefined) + assert(catalogTable.storage.properties.get("path").isEmpty) + assert(catalogTable.storage.properties.get("PATH").isDefined) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 83f1b192f7c94..7ba880e476137 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -29,7 +29,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val expectedPath = spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName - assert(metastoreTable.storage.serdeProperties("path") === expectedPath) + assert(metastoreTable.storage.properties("path") === expectedPath) } private def getTableNames(dbName: Option[String] = None): Array[String] = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index a972f61e25858..bfe8568f722fd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -147,14 +147,10 @@ class VersionsSuite extends SparkFunSuite with Logging { identifier = TableIdentifier(tableName, Some(database)), tableType = CatalogTableType.MANAGED, schema = Seq(CatalogColumn("key", "int")), - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = Some(classOf[TextInputFormat].getName), - outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), - serde = Some(classOf[LazySimpleSerDe].getName()), - compressed = false, - serdeProperties = Map.empty - )) + storage = CatalogStorageFormat.empty + .withInputFormat(Some(classOf[TextInputFormat].getName)) + .withOutputFormat(Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName)) + .withSerde(Some(classOf[LazySimpleSerDe].getName))) } /////////////////////////////////////////////////////////////////////////// @@ -269,13 +265,7 @@ class VersionsSuite extends SparkFunSuite with Logging { // Partition related API /////////////////////////////////////////////////////////////////////////// - val storageFormat = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = Map.empty) + val storageFormat = CatalogStorageFormat.empty test(s"$version: sql create partitioned table") { client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)") @@ -366,10 +356,9 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: alterPartitions") { val spec = Map("key1" -> "1", "key2" -> "2") val newLocation = Utils.createTempDir().getPath() - val storage = storageFormat.copy( - locationUri = Some(newLocation), + val storage = storageFormat.copy(locationUri = Some(newLocation)) // needed for 0.12 alter partitions - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + .withSerde(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) val partition = CatalogTablePartition(spec, storage) client.alterPartitions("default", "src_part", Seq(partition)) assert(client.getPartition("default", "src_part", spec) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 343d7bae98bff..aa15d411caacb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -355,16 +355,16 @@ class HiveDDLSuite val expectedSerdePropsString = expectedSerdeProps.map { case (k, v) => s"'$k'='$v'" }.mkString(", ") val oldPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" -> "4")) - assume(oldPart.storage.serde != Some(expectedSerde), "bad test: serde was already set") - assume(oldPart.storage.serdeProperties.filterKeys(expectedSerdeProps.contains) != + assume(oldPart.storage.getSerde != Some(expectedSerde), "bad test: serde was already set") + assume(oldPart.storage.getProperties.filterKeys(expectedSerdeProps.contains) != expectedSerdeProps, "bad test: serde properties were already set") sql(s"""ALTER TABLE boxes PARTITION (width=4) | SET SERDE '$expectedSerde' | WITH SERDEPROPERTIES ($expectedSerdePropsString) |""".stripMargin) val newPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" -> "4")) - assert(newPart.storage.serde == Some(expectedSerde)) - assume(newPart.storage.serdeProperties.filterKeys(expectedSerdeProps.contains) == + assert(newPart.storage.getSerde == Some(expectedSerde)) + assume(newPart.storage.getProperties.filterKeys(expectedSerdeProps.contains) == expectedSerdeProps) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 961d95c268b2c..4c1ecce1f96d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -434,7 +434,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { case None => // OK. } // Also make sure that the format is the desired format. - assert(catalogTable.storage.inputFormat.get.toLowerCase.contains(format)) + assert(catalogTable.storage.getInputFormat.get.toLowerCase.contains(format)) } // When a user-specified location is defined, the table type needs to be EXTERNAL.