From 83320f324fc5038a600194cb34c7eb7806ab8109 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 7 Jul 2016 01:08:43 +0800 Subject: [PATCH 1/3] make CatalogTable more general and less hive specific --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../sql/catalyst/catalog/interface.scala | 66 +++--- .../catalog/ExternalCatalogSuite.scala | 26 +-- .../spark/sql/execution/SparkSqlParser.scala | 204 ++++++++---------- .../command/createDataSourceTables.scala | 24 +-- .../spark/sql/execution/command/ddl.scala | 25 ++- .../spark/sql/execution/command/tables.scala | 45 ++-- .../datasources/DataSourceStrategy.scala | 2 +- .../execution/command/DDLCommandSuite.scala | 30 +-- .../sql/execution/command/DDLSuite.scala | 49 ++--- .../spark/sql/hive/HiveExternalCatalog.scala | 4 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 8 +- .../spark/sql/hive/MetastoreRelation.scala | 52 ++--- .../sql/hive/client/HiveClientImpl.scala | 87 ++++---- .../CreateHiveTableAsSelectCommand.scala | 22 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 85 ++++---- .../sql/hive/HiveMetastoreCatalogSuite.scala | 18 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 11 +- .../spark/sql/hive/MultiDatabaseSuite.scala | 2 +- .../spark/sql/hive/client/VersionsSuite.scala | 25 +-- .../sql/hive/execution/HiveDDLSuite.scala | 8 +- .../sql/hive/execution/SQLQuerySuite.scala | 2 +- 22 files changed, 388 insertions(+), 409 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 7ccbb2dac90c8..b48bf2b79c6de 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -53,7 +53,7 @@ statement (PARTITIONED BY partitionColumnNames=identifierList)? bucketSpec? AS? query #createTableUsing | createTableHeader ('(' columns=colTypeList ')')? - (COMMENT STRING)? + (COMMENT comment=STRING)? (PARTITIONED BY '(' partitionColumns=colTypeList ')')? bucketSpec? skewSpec? rowFormat? createFileFormat? locationSpec? diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index b12606e17d380..882612883abed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -42,38 +42,58 @@ case class CatalogFunction( /** * Storage format, used to describe how a partition or a table is stored. + * + * @param locationUri the storage location of the table/partition, can be None if it has no storage, + * e.g. view, or it's not file-based, e.g. JDBC data source table. + * @param provider the name of the data source provider, e.g. parquet, json, etc. Can be None if it + * has no storage. Note that `hive` is also one kind of provider. + * @param properties used to store some extra information for the storage. */ case class CatalogStorageFormat( locationUri: Option[String], - inputFormat: Option[String], - outputFormat: Option[String], - serde: Option[String], - compressed: Boolean, - serdeProperties: Map[String, String]) { + provider: Option[String], + properties: Map[String, String]) { override def toString: String = { - val serdePropsToString = - if (serdeProperties.nonEmpty) { - s"Properties: " + serdeProperties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") + val propertiesToString = + if (properties.nonEmpty) { + s"Properties: " + properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") } else { "" } - val output = - Seq(locationUri.map("Location: " + _).getOrElse(""), - inputFormat.map("InputFormat: " + _).getOrElse(""), - outputFormat.map("OutputFormat: " + _).getOrElse(""), - if (compressed) "Compressed" else "", - serde.map("Serde: " + _).getOrElse(""), - serdePropsToString) + val output = Seq( + locationUri.map("Location: " + _).getOrElse(""), + provider.map("Provider: " + _).getOrElse(""), + propertiesToString) output.filter(_.nonEmpty).mkString("Storage(", ", ", ")") } + // TODO: remove these hive hacks + + def withInputFormat(inFmt: String): CatalogStorageFormat = { + this.copy(properties = properties + ("inputFormat" -> inFmt)) + } + + def withOutputFormat(outFmt: String): CatalogStorageFormat = { + this.copy(properties = properties + ("outputFormat" -> outFmt)) + } + + def withSerde(serde: String): CatalogStorageFormat = { + this.copy(properties = properties + ("serde" -> serde)) + } + + def getInputFormat: Option[String] = properties.get("inputFormat") + + def getOutputFormat: Option[String] = properties.get("outputFormat") + + def getSerde: Option[String] = properties.get("serde") + + def getProperties: Map[String, String] = properties - "inputFormat" - "outputFormat" - "serde" } object CatalogStorageFormat { /** Empty storage format for default values and copies. */ - val empty = CatalogStorageFormat(locationUri = None, inputFormat = None, - outputFormat = None, serde = None, compressed = false, serdeProperties = Map.empty) + val empty = CatalogStorageFormat(None, None, Map.empty) } /** @@ -158,18 +178,6 @@ case class CatalogTable( /** Return the fully qualified name of this table, assuming the database was specified. */ def qualifiedName: String = identifier.unquotedString - /** Syntactic sugar to update a field in `storage`. */ - def withNewStorage( - locationUri: Option[String] = storage.locationUri, - inputFormat: Option[String] = storage.inputFormat, - outputFormat: Option[String] = storage.outputFormat, - compressed: Boolean = false, - serde: Option[String] = storage.serde, - serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = { - copy(storage = CatalogStorageFormat( - locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties)) - } - override def toString: String = { val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") val partitionColumns = partitionColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index a9268535c40a8..1574e3d1b4cdc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -398,12 +398,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(oldPart2.storage.locationUri != Some(newLocation)) // alter other storage information catalog.alterPartitions("db2", "tbl2", Seq( - oldPart1.copy(storage = storageFormat.copy(serde = Some(newSerde))), - oldPart2.copy(storage = storageFormat.copy(serdeProperties = newSerdeProps)))) + oldPart1.copy(storage = storageFormat.withSerde(newSerde)), + oldPart2.copy(storage = storageFormat.copy(properties = newSerdeProps)))) val newPart1b = catalog.getPartition("db2", "tbl2", part1.spec) val newPart2b = catalog.getPartition("db2", "tbl2", part2.spec) - assert(newPart1b.storage.serde == Some(newSerde)) - assert(newPart2b.storage.serdeProperties == newSerdeProps) + assert(newPart1b.storage.getSerde == Some(newSerde)) + assert(newPart2b.storage.getProperties == newSerdeProps) // alter but change spec, should fail because new partition specs do not exist yet val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2")) val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4")) @@ -550,7 +550,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("my_table", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")) ) @@ -569,7 +569,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat( Some(Utils.createTempDir().getAbsolutePath), - None, None, None, false, Map.empty), + None, Map.empty), schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")) ) catalog.createTable("db1", externalTable, ignoreIfExists = false) @@ -582,7 +582,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("tbl", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = Seq( CatalogColumn("col1", "int"), CatalogColumn("col2", "string"), @@ -609,7 +609,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac Map("a" -> "7", "b" -> "8"), CatalogStorageFormat( Some(Utils.createTempDir().getAbsolutePath), - None, None, None, false, Map.empty) + None, Map.empty) ) catalog.createPartitions("db1", "tbl", Seq(externalPartition), ignoreIfExists = false) assert(!exists(databaseDir, "tbl", "a=7", "b=8")) @@ -628,13 +628,9 @@ abstract class CatalogTestUtils { def newEmptyCatalog(): ExternalCatalog // These fields must be lazy because they rely on fields that are not implemented yet - lazy val storageFormat = CatalogStorageFormat( - locationUri = None, - inputFormat = Some(tableInputFormat), - outputFormat = Some(tableOutputFormat), - serde = None, - compressed = false, - serdeProperties = Map.empty) + lazy val storageFormat = CatalogStorageFormat.empty + .withInputFormat(tableInputFormat) + .withOutputFormat(tableOutputFormat) lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c5f4d58da43ac..fe7e4acd50f88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.Try import org.antlr.v4.runtime.{ParserRuleContext, Token} @@ -922,7 +923,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (ctx.bucketSpec != null) { operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) } - val comment = Option(ctx.STRING).map(string) + val comment = Option(ctx.comment).map(string) val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns) val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns) val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) @@ -949,42 +950,33 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // to include the partition columns here explicitly val schema = cols ++ partitionCols - // Storage format - val defaultStorage: CatalogStorageFormat = { - val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") - val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) - CatalogStorageFormat( - locationUri = None, - inputFormat = defaultHiveSerde.flatMap(_.inputFormat) - .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), - outputFormat = defaultHiveSerde.flatMap(_.outputFormat) - .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - // Note: Keep this unspecified because we use the presence of the serde to decide - // whether to convert a table created by CTAS to a datasource table. - serde = None, - compressed = false, - serdeProperties = Map()) - } validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) - val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) - .getOrElse(CatalogStorageFormat.empty) - val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) - .getOrElse(CatalogStorageFormat.empty) - val location = Option(ctx.locationSpec).map(visitLocationSpec) + var storage = CatalogStorageFormat( + locationUri = Option(ctx.locationSpec).map(visitLocationSpec), + provider = Some("hive"), + properties = Map.empty) + Option(ctx.createFileFormat).foreach(ctx => storage = getFileFormat(ctx, storage)) + Option(ctx.rowFormat).foreach(ctx => storage = getRowFormat(ctx, storage)) + + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) + if (storage.getInputFormat.isEmpty) { + storage = storage.withInputFormat(defaultHiveSerde.flatMap(_.inputFormat) + .getOrElse("org.apache.hadoop.mapred.TextInputFormat")) + } + if (storage.getOutputFormat.isEmpty) { + storage = storage.withOutputFormat(defaultHiveSerde.flatMap(_.outputFormat) + .getOrElse("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + } + // If we are creating an EXTERNAL table, then the LOCATION field is required - if (external && location.isEmpty) { + if (external && storage.locationUri.isEmpty) { operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) } - val storage = CatalogStorageFormat( - locationUri = location, - inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), - outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), - serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), - compressed = false, - serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties) + // If location is defined, we'll assume this is an external table. // Otherwise, we may accidentally delete existing data. - val tableType = if (external || location.isDefined) { + val tableType = if (external || storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -1023,8 +1015,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties // are empty Maps. - val optionsWithPath = if (location.isDefined) { - Map("path" -> location.get) + val optionsWithPath = if (storage.locationUri.isDefined) { + Map("path" -> storage.locationUri.get) } else { Map.empty[String, String] } @@ -1060,19 +1052,39 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a [[CatalogStorageFormat]] for creating tables. + * Parse the `createFileFormat` and put the storage information into the passed in map. + * + * Example format: + * {{{ + * STORED AS INPUTFORMAT formatName1 OUTPUTFORMAT formatName2 + * }}} + * + * OR * - * Format: STORED AS ... + * {{{ + * STORED AS fileFormatName + * }}} */ - override def visitCreateFileFormat( - ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + private def getFileFormat( + ctx: CreateFileFormatContext, + storage: CatalogStorageFormat): CatalogStorageFormat = { (ctx.fileFormat, ctx.storageHandler) match { // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format case (c: TableFileFormatContext, null) => - visitTableFileFormat(c) + storage.withInputFormat(string(c.inFmt)).withOutputFormat(string(c.outFmt)) // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO case (c: GenericFileFormatContext, null) => - visitGenericFileFormat(c) + val source = c.identifier.getText + HiveSerDe.sourceToSerDe(source, conf) match { + case Some(s) => + var result = storage + s.inputFormat.foreach(inFmt => result = result.withInputFormat(inFmt)) + s.outputFormat.foreach(outFmt => result = result.withOutputFormat(outFmt)) + s.serde.foreach(serde => result = result.withSerde(serde)) + result + case None => + operationNotAllowed(s"STORED AS with file format '$source'", ctx) + } case (null, storageHandler) => operationNotAllowed("STORED BY", ctx) case _ => @@ -1081,34 +1093,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a [[CatalogStorageFormat]]. - */ - override def visitTableFileFormat( - ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { - CatalogStorageFormat.empty.copy( - inputFormat = Option(string(ctx.inFmt)), - outputFormat = Option(string(ctx.outFmt))) - } - - /** - * Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]]. - */ - override def visitGenericFileFormat( - ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { - val source = ctx.identifier.getText - HiveSerDe.sourceToSerDe(source, conf) match { - case Some(s) => - CatalogStorageFormat.empty.copy( - inputFormat = s.inputFormat, - outputFormat = s.outputFormat, - serde = s.serde) - case None => - operationNotAllowed(s"STORED AS with file format '$source'", ctx) - } - } - - /** - * Create a [[CatalogStorageFormat]] used for creating tables. + * Parse the `rowFormat` and put the storage information into the passed in map. * * Example format: * {{{ @@ -1125,50 +1110,39 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * [NULL DEFINED AS char] * }}} */ - private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) { + private def getRowFormat( + ctx: RowFormatContext, + storage: CatalogStorageFormat): CatalogStorageFormat = withOrigin(ctx) { ctx match { - case serde: RowFormatSerdeContext => visitRowFormatSerde(serde) - case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited) - } - } - - /** - * Create SERDE row format name and properties pair. - */ - override def visitRowFormatSerde( - ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { - import ctx._ - CatalogStorageFormat.empty.copy( - serde = Option(string(name)), - serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) - } - - /** - * Create a delimited row format properties object. - */ - override def visitRowFormatDelimited( - ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) { - // Collect the entries if any. - def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).toSeq.map(x => key -> string(x)) - } - // TODO we need proper support for the NULL format. - val entries = - entry("field.delim", ctx.fieldsTerminatedBy) ++ - entry("serialization.format", ctx.fieldsTerminatedBy) ++ - entry("escape.delim", ctx.escapedBy) ++ - // The following typo is inherited from Hive... - entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ - entry("mapkey.delim", ctx.keysTerminatedBy) ++ - Option(ctx.linesSeparatedBy).toSeq.map { token => - val value = string(token) - assert( - value == "\n", - s"LINES TERMINATED BY only supports newline '\\n' right now: $value", - ctx) - "line.delim" -> value + case serde: RowFormatSerdeContext => + var result = storage.withSerde(string(serde.name)) + Option(serde.props).map(visitPropertyKeyValues).foreach { props => + result = result.copy(properties = result.properties ++ props) } - CatalogStorageFormat.empty.copy(serdeProperties = entries.toMap) + result + case delimited: RowFormatDelimitedContext => + // Collect the entries if any. + def entry(key: String, value: Token): Option[(String, String)] = { + Option(value).map(x => key -> string(x)) + } + // TODO we need proper support for the NULL format. + val entries = + entry("field.delim", delimited.fieldsTerminatedBy) ++ + entry("serialization.format", delimited.fieldsTerminatedBy) ++ + entry("escape.delim", delimited.escapedBy) ++ + // The following typo is inherited from Hive... + entry("colelction.delim", delimited.collectionItemsTerminatedBy) ++ + entry("mapkey.delim", delimited.keysTerminatedBy) ++ + Option(delimited.linesSeparatedBy).toSeq.map { token => + val value = string(token) + assert( + value == "\n", + s"LINES TERMINATED BY only supports newline '\\n' right now: $value", + ctx) + "line.delim" -> value + } + storage.copy(properties = storage.properties ++ entries) + } } /** @@ -1296,6 +1270,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) { ctx.colType.asScala.map { col => CatalogColumn( + // TODO: it should be case preserving. col.identifier.getText.toLowerCase, // Note: for types like "STRUCT" we can't // just convert the whole type string to lower case, otherwise the struct field names @@ -1331,8 +1306,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // expects a seq of pairs in which the old parsers' token names are used as keys. // Transforming the result of visitRowFormatDelimited would be quite a bit messier than // retrieving the key value pairs ourselves. - def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).map(t => key -> t.getText).toSeq + def entry(key: String, value: Token): Option[(String, String)] = { + Option(value).map(t => key -> t.getText) } val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ @@ -1340,12 +1315,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) - (entries, None, Seq.empty, None) + (entries.toSeq, None, Seq.empty, None) case c: RowFormatSerdeContext => - // Use a serde format. - val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) - + val props = Option(c.props).map(visitPropertyKeyValues).getOrElse(Nil) + val name = string(c.name) // SPARK-10310: Special cases LazySimpleSerDe val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { Try(conf.getConfString(configKey)).toOption diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index c38eca5156e5a..b5653e8052e5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -388,11 +388,8 @@ object CreateDataSourceTableUtils extends Logging { schema = Nil, storage = CatalogStorageFormat( locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = options + provider = None, + properties = options ), properties = tableProperties.toMap) } @@ -403,17 +400,18 @@ object CreateDataSourceTableUtils extends Logging { assert(partitionColumns.isEmpty) assert(relation.partitionSchema.isEmpty) + var storage = CatalogStorageFormat( + locationUri = None, + provider = Some("hive"), + properties = options) + serde.inputFormat.foreach(inFmt => storage = storage.withInputFormat(inFmt)) + serde.outputFormat.foreach(outFmt => storage = storage.withOutputFormat(outFmt)) + serde.serde.foreach(sd => storage = storage.withSerde(sd)) + CatalogTable( identifier = tableIdent, tableType = tableType, - storage = CatalogStorageFormat( - locationUri = Some(relation.location.paths.map(_.toUri.toString).head), - inputFormat = serde.inputFormat, - outputFormat = serde.outputFormat, - serde = serde.serde, - compressed = false, - serdeProperties = options - ), + storage = storage, schema = relation.schema.map { f => CatalogColumn(f.name, f.dataType.catalogString) }, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a3a057a5628fe..504b1f7f986a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -318,16 +318,22 @@ case class AlterTableSerDePropertiesCommand( "not supported for tables created with the datasource API") } if (partSpec.isEmpty) { - val newTable = table.withNewStorage( - serde = serdeClassName.orElse(table.storage.serde), - serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map())) + var newStorage = table.storage + serdeClassName.foreach(serde => newStorage = newStorage.withSerde(serde)) + serdeProperties.foreach { props => + newStorage = newStorage.copy(properties = newStorage.properties ++ props) + } + val newTable = table.copy(storage = newStorage) catalog.alterTable(newTable) } else { val spec = partSpec.get val part = catalog.getPartition(tableName, spec) - val newPart = part.copy(storage = part.storage.copy( - serde = serdeClassName.orElse(part.storage.serde), - serdeProperties = part.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))) + var newStorage = part.storage + serdeClassName.foreach(serde => newStorage = newStorage.withSerde(serde)) + serdeProperties.foreach { props => + newStorage = newStorage.copy(properties = newStorage.properties ++ props) + } + val newPart = part.copy(storage = newStorage) catalog.alterPartitions(tableName, Seq(newPart)) } Seq.empty[Row] @@ -464,11 +470,12 @@ case class AlterTableSetLocationCommand( // No partition spec is specified, so we set the location for the table itself val newTable = if (DDLUtils.isDatasourceTable(table)) { - table.withNewStorage( + table.copy(storage = table.storage.copy( locationUri = Some(location), - serdeProperties = table.storage.serdeProperties ++ Map("path" -> location)) + properties = table.storage.properties + ("path" -> location) + )) } else { - table.withNewStorage(locationUri = Some(location)) + table.copy(storage = table.storage.copy(locationUri = Some(location))) } catalog.alterTable(newTable) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 5c815df0deb9e..1dfc12348b463 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -49,9 +49,9 @@ case class CreateHiveTableAsSelectLogicalPlan( override lazy val resolved: Boolean = tableDesc.identifier.database.isDefined && tableDesc.schema.nonEmpty && - tableDesc.storage.serde.isDefined && - tableDesc.storage.inputFormat.isDefined && - tableDesc.storage.outputFormat.isDefined && + tableDesc.storage.getSerde.isDefined && + tableDesc.storage.getInputFormat.isDefined && + tableDesc.storage.getOutputFormat.isDefined && childrenResolved } @@ -80,11 +80,13 @@ case class CreateTableLikeCommand( s"Source table in CREATE TABLE LIKE cannot be temporary: '$sourceTable'") } - val tableToCreate = catalog.getTableMetadata(sourceTable).copy( + val sourceTableMeta = catalog.getTableMetadata(sourceTable) + val tableToCreate = sourceTableMeta.copy( identifier = targetTable, tableType = CatalogTableType.MANAGED, createTime = System.currentTimeMillis, - lastAccessTime = -1).withNewStorage(locationUri = None) + lastAccessTime = -1, + storage = sourceTableMeta.storage.copy(locationUri = None)) catalog.createTable(tableToCreate, ifNotExists) Seq.empty[Row] @@ -120,7 +122,7 @@ case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends override def run(sparkSession: SparkSession): Seq[Row] = { DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE") - DDLUtils.verifyTableProperties(table.storage.serdeProperties.keys.toSeq, "CREATE TABLE") + DDLUtils.verifyTableProperties(table.storage.properties.keys.toSeq, "CREATE TABLE") sparkSession.sessionState.catalog.createTable(table, ifNotExists) Seq.empty[Row] } @@ -166,8 +168,9 @@ case class AlterTableRenameCommand( val table = catalog.getTableMetadata(oldName) if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) { val newPath = catalog.defaultTablePath(newName) - val newTable = table.withNewStorage( - serdeProperties = table.storage.serdeProperties ++ Map("path" -> newPath)) + val newTable = table.copy( + storage = table.storage.copy( + properties = table.storage.properties + ("path" -> newPath))) catalog.alterTable(newTable) } // Invalidate the table last, otherwise uncaching the table would load the logical plan @@ -349,7 +352,7 @@ case class TruncateTableCommand( } val locations = if (isDatasourceTable) { - Seq(table.storage.serdeProperties.get("path")) + Seq(table.storage.properties.get("path")) } else if (table.partitionColumnNames.isEmpty) { Seq(table.storage.locationUri) } else { @@ -487,15 +490,15 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { append(buffer, "", "", "") append(buffer, "# Storage Information", "", "") - metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) - metadata.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) - metadata.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) - append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else "No", "") + + metadata.storage.getSerde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) + metadata.storage.getInputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) + metadata.storage.getOutputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) describeBucketingInfo(metadata, buffer) append(buffer, "Storage Desc Parameters:", "", "") - metadata.storage.serdeProperties.foreach { case (key, value) => - append(buffer, s" $key", value, "") + metadata.storage.getProperties.foreach { + case (key, value) => append(buffer, s" $key", value, "") } } @@ -821,10 +824,10 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman private def showHiveTableStorageInfo(metadata: CatalogTable, builder: StringBuilder): Unit = { val storage = metadata.storage - storage.serde.foreach { serde => + storage.getSerde.foreach { serde => builder ++= s"ROW FORMAT SERDE '$serde'\n" - val serdeProps = metadata.storage.serdeProperties.map { + val serdeProps = metadata.storage.getProperties.map { case (key, value) => s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'" } @@ -832,14 +835,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder ++= serdeProps.mkString("WITH SERDEPROPERTIES (\n ", ",\n ", "\n)\n") } - if (storage.inputFormat.isDefined || storage.outputFormat.isDefined) { + if (storage.getInputFormat.isDefined || storage.getOutputFormat.isDefined) { builder ++= "STORED AS\n" - storage.inputFormat.foreach { format => + storage.getInputFormat.foreach { format => builder ++= s" INPUTFORMAT '${escapeSingleQuotedString(format)}'\n" } - storage.outputFormat.foreach { format => + storage.getOutputFormat.foreach { format => builder ++= s" OUTPUTFORMAT '${escapeSingleQuotedString(format)}'\n" } } @@ -894,7 +897,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder ++= s"USING ${props(CreateDataSourceTableUtils.DATASOURCE_PROVIDER)}\n" - val dataSourceOptions = metadata.storage.serdeProperties.filterNot { + val dataSourceOptions = metadata.storage.properties.filterNot { case (key, value) => // If it's a managed table, omit PATH option. Spark SQL always creates external table // when the table creation DDL contains the PATH option. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 0841636d3309f..8ffdc507db529 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -214,7 +214,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[ val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table) - val options = table.storage.serdeProperties + val options = table.storage.properties val dataSource = DataSource( sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index b170a3a77ee04..02ac64ebe7c70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -246,9 +246,9 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTableCommand](query) val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == hiveSerde.get.serde) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + assert(ct.table.storage.getSerde == hiveSerde.get.serde) + assert(ct.table.storage.getInputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.getOutputFormat == hiveSerde.get.outputFormat) } } @@ -260,13 +260,13 @@ class DDLCommandSuite extends PlanTest { // No conflicting serdes here, OK val parsed1 = parseAs[CreateTableCommand](query1) - assert(parsed1.table.storage.serde == Some("anything")) - assert(parsed1.table.storage.inputFormat == Some("inputfmt")) - assert(parsed1.table.storage.outputFormat == Some("outputfmt")) + assert(parsed1.table.storage.getSerde == Some("anything")) + assert(parsed1.table.storage.getInputFormat == Some("inputfmt")) + assert(parsed1.table.storage.getOutputFormat == Some("outputfmt")) val parsed2 = parseAs[CreateTableCommand](query2) - assert(parsed2.table.storage.serde.isEmpty) - assert(parsed2.table.storage.inputFormat == Some("inputfmt")) - assert(parsed2.table.storage.outputFormat == Some("outputfmt")) + assert(parsed2.table.storage.getSerde.isEmpty) + assert(parsed2.table.storage.getInputFormat == Some("inputfmt")) + assert(parsed2.table.storage.getOutputFormat == Some("outputfmt")) } test("create table - row format serde and generic file format") { @@ -279,9 +279,9 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTableCommand](query) val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == Some("anything")) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + assert(ct.table.storage.getSerde == Some("anything")) + assert(ct.table.storage.getInputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.getOutputFormat == hiveSerde.get.outputFormat) } else { assertUnsupported(query, Seq("row format serde", "incompatible", s)) } @@ -298,9 +298,9 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTableCommand](query) val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == hiveSerde.get.serde) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + assert(ct.table.storage.getSerde == hiveSerde.get.serde) + assert(ct.table.storage.getInputFormat == hiveSerde.get.inputFormat) + assert(ct.table.storage.getOutputFormat == hiveSerde.get.outputFormat) } else { assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 169250d9bb1c2..d5a342a0aa44c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -80,11 +80,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val storage = CatalogStorageFormat( locationUri = Some(catalog.defaultTablePath(name)), - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = Map()) + provider = None, + properties = Map.empty) CatalogTable( identifier = name, tableType = CatalogTableType.EXTERNAL, @@ -107,7 +104,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { spec: TablePartitionSpec, tableName: TableIdentifier): Unit = { val part = CatalogTablePartition( - spec, CatalogStorageFormat(None, None, None, None, false, Map())) + spec, CatalogStorageFormat.empty) catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false) } @@ -891,9 +888,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { convertToDatasourceTable(catalog, tableIdent) } assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined) - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.getProperties.isEmpty) assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty) - assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty) + assert(catalog.getPartition(tableIdent, partSpec).storage.getProperties.isEmpty) // Verify that the location is set to the expected string def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = { val storageFormat = spec @@ -901,10 +898,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { .getOrElse { catalog.getTableMetadata(tableIdent).storage } if (isDatasourceTable) { if (spec.isDefined) { - assert(storageFormat.serdeProperties.isEmpty) + assert(storageFormat.getProperties.isEmpty) assert(storageFormat.locationUri.isEmpty) } else { - assert(storageFormat.serdeProperties.get("path") === Some(expected)) + assert(storageFormat.properties.get("path") === Some(expected)) assert(storageFormat.locationUri === Some(expected)) } } else { @@ -946,8 +943,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { if (isDatasourceTable) { convertToDatasourceTable(catalog, tableIdent) } - assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty) - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.getSerde.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.getProperties.isEmpty) // set table serde and/or properties (should fail on datasource tables) if (isDatasourceTable) { val e1 = intercept[AnalysisException] { @@ -961,22 +958,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e2.getMessage.contains("datasource")) } else { sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'") - assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop")) - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.getSerde == Some("org.apache.jadoop")) + assert(catalog.getTableMetadata(tableIdent).storage.getProperties.isEmpty) sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " + "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')") - assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop")) - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties == + assert(catalog.getTableMetadata(tableIdent).storage.getSerde == Some("org.apache.madoop")) + assert(catalog.getTableMetadata(tableIdent).storage.getProperties == Map("k" -> "v", "kay" -> "vee")) } // set serde properties only sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')") - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties == + assert(catalog.getTableMetadata(tableIdent).storage.getProperties == Map("k" -> "vvv", "kay" -> "vee")) // set things without explicitly specifying database catalog.setCurrentDatabase("dbx") sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')") - assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties == + assert(catalog.getTableMetadata(tableIdent).storage.getProperties == Map("k" -> "vvv", "kay" -> "veee")) // table to alter does not exist intercept[AnalysisException] { @@ -1002,8 +999,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { if (isDatasourceTable) { convertToDatasourceTable(catalog, tableIdent) } - assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty) - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty) + assert(catalog.getPartition(tableIdent, spec).storage.getSerde.isEmpty) + assert(catalog.getPartition(tableIdent, spec).storage.getProperties.isEmpty) // set table serde and/or properties (should fail on datasource tables) if (isDatasourceTable) { val e1 = intercept[AnalysisException] { @@ -1017,26 +1014,26 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e2.getMessage.contains("datasource")) } else { sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'") - assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop")) - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty) + assert(catalog.getPartition(tableIdent, spec).storage.getSerde == Some("org.apache.jadoop")) + assert(catalog.getPartition(tableIdent, spec).storage.getProperties.isEmpty) sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " + "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')") - assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop")) - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties == + assert(catalog.getPartition(tableIdent, spec).storage.getSerde == Some("org.apache.madoop")) + assert(catalog.getPartition(tableIdent, spec).storage.getProperties == Map("k" -> "v", "kay" -> "vee")) } // set serde properties only maybeWrapException(isDatasourceTable) { sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " + "SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')") - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties == + assert(catalog.getPartition(tableIdent, spec).storage.getProperties == Map("k" -> "vvv", "kay" -> "vee")) } // set things without explicitly specifying database catalog.setCurrentDatabase("dbx") maybeWrapException(isDatasourceTable) { sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')") - assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties == + assert(catalog.getPartition(tableIdent, spec).storage.getProperties == Map("k" -> "vvv", "kay" -> "veee")) } // table to alter does not exist diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index cf2b92fb898df..7ede725b6bb4c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -179,7 +179,9 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu try { client.createTable( - tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)), + tableDefinition.copy(storage = tableDefinition.storage.copy( + locationUri = Some(tempPath.toString) + )), ignoreIfExists) } finally { FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2be51ed0e87e7..1ddb541e8d662 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -119,7 +119,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort")) } - val options = table.storage.serdeProperties + val options = table.storage.properties val dataSource = DataSource( sparkSession, @@ -441,10 +441,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case p: LogicalPlan if p.resolved => p case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) => - val desc = if (table.storage.serde.isEmpty) { + val desc = if (table.storage.getSerde.isEmpty) { // add default serde - table.withNewStorage( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + table.copy(storage = + table.storage.withSerde("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) } else { table } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 3ab1bdabb99b3..d6d84e744804c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -84,28 +84,13 @@ private[hive] case class MetastoreRelation( case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString }) - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() - tTable.setSd(sd) - // Note: In Hive the schema and partition columns must be disjoint sets val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c => catalogTable.partitionColumnNames.contains(c.getName) } - sd.setCols(schema.asJava) + val sd = toHiveStorage(catalogTable.storage, schema) + tTable.setSd(sd) tTable.setPartitionKeys(partCols.asJava) - - catalogTable.storage.locationUri.foreach(sd.setLocation) - catalogTable.storage.inputFormat.foreach(sd.setInputFormat) - catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) - - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib) - sd.setSerdeInfo(serdeInfo) - - val serdeParameters = new java.util.HashMap[String, String]() - catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - serdeInfo.setParameters(serdeParameters) - new HiveTable(tTable) } @@ -160,25 +145,28 @@ private[hive] case class MetastoreRelation( tPartition.setTableName(tableName) tPartition.setValues(partitionKeys.map(a => p.spec(a.name)).asJava) - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() + val sd = toHiveStorage(p.storage, catalogTable.schema.map(toHiveColumn)) tPartition.setSd(sd) - sd.setCols(catalogTable.schema.map(toHiveColumn).asJava) - p.storage.locationUri.foreach(sd.setLocation) - p.storage.inputFormat.foreach(sd.setInputFormat) - p.storage.outputFormat.foreach(sd.setOutputFormat) + new Partition(hiveQlTable, tPartition) + } + } - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - sd.setSerdeInfo(serdeInfo) - // maps and lists should be set only after all elements are ready (see HIVE-7975) - p.storage.serde.foreach(serdeInfo.setSerializationLib) + private def toHiveStorage(storage: CatalogStorageFormat, schema: Seq[FieldSchema]) = { + val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() + sd.setCols(schema.asJava) + catalogTable.storage.locationUri.foreach(sd.setLocation) + catalogTable.storage.getInputFormat.foreach(sd.setInputFormat) + catalogTable.storage.getOutputFormat.foreach(sd.setOutputFormat) - val serdeParameters = new java.util.HashMap[String, String]() - catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - serdeInfo.setParameters(serdeParameters) + val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo + sd.setSerdeInfo(serdeInfo) + // maps and lists should be set only after all elements are ready (see HIVE-7975) + catalogTable.storage.getSerde.foreach(serdeInfo.setSerializationLib) - new Partition(hiveQlTable, tPartition) - } + val serdeParameters = new java.util.HashMap[String, String]() + catalogTable.storage.getProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + serdeInfo.setParameters(serdeParameters) + sd } /** Only compare database and tablename, not alias. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7e0cef3e355d5..101eef5b3a1c9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -355,6 +355,32 @@ private[hive] class HiveClientImpl( val properties = h.getParameters.asScala.toMap + var storage = CatalogStorageFormat( + locationUri = shim.getDataLocation(h).filterNot { _ => + // SPARK-15269: Persisted data source tables always store the location URI as a SerDe + // property named "path" instead of standard Hive `dataLocation`, because Hive only + // allows directory paths as location URIs while Spark SQL data source tables also + // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL + // data source tables. + DDLUtils.isDatasourceTable(properties) && + h.getTableType == HiveTableType.EXTERNAL_TABLE && + // Spark SQL may also save external data source in Hive compatible format when + // possible, so that these tables can be directly accessed by Hive. For these tables, + // `dataLocation` is still necessary. Here we also check for input format class + // because only these Hive compatible tables set this field. + h.getInputFormatClass == null + }, + provider = Some("hive"), + properties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap + ) + Option(h.getInputFormatClass).map(_.getName).foreach { inFmt => + storage = storage.withInputFormat(inFmt) + } + Option(h.getOutputFormatClass).map(_.getName).foreach { outFmt => + storage = storage.withOutputFormat(outFmt) + } + Option(h.getSerializationLib).foreach(serde => storage = storage.withSerde(serde)) + CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { @@ -371,27 +397,7 @@ private[hive] class HiveClientImpl( owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, - storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).filterNot { _ => - // SPARK-15269: Persisted data source tables always store the location URI as a SerDe - // property named "path" instead of standard Hive `dataLocation`, because Hive only - // allows directory paths as location URIs while Spark SQL data source tables also - // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL - // data source tables. - DDLUtils.isDatasourceTable(properties) && - h.getTableType == HiveTableType.EXTERNAL_TABLE && - // Spark SQL may also save external data source in Hive compatible format when - // possible, so that these tables can be directly accessed by Hive. For these tables, - // `dataLocation` is still necessary. Here we also check for input format class - // because only these Hive compatible tables set this field. - h.getInputFormatClass == null - }, - inputFormat = Option(h.getInputFormatClass).map(_.getName), - outputFormat = Option(h.getOutputFormatClass).map(_.getName), - serde = Option(h.getSerializationLib), - compressed = h.getTTable.getSd.isCompressed, - serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap - ), + storage = storage, properties = properties, viewOriginalText = Option(h.getViewOriginalText), viewText = Option(h.getViewExpandedText), @@ -770,11 +776,11 @@ private[hive] class HiveClientImpl( hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } - table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) - table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) + table.storage.getInputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) + table.storage.getOutputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( - table.storage.serde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } + table.storage.getSerde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + table.storage.getProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) } table.comment.foreach { c => hiveTable.setProperty("comment", c) } table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) } @@ -795,10 +801,10 @@ private[hive] class HiveClientImpl( val storageDesc = new StorageDescriptor val serdeInfo = new SerDeInfo p.storage.locationUri.foreach(storageDesc.setLocation) - p.storage.inputFormat.foreach(storageDesc.setInputFormat) - p.storage.outputFormat.foreach(storageDesc.setOutputFormat) - p.storage.serde.foreach(serdeInfo.setSerializationLib) - serdeInfo.setParameters(p.storage.serdeProperties.asJava) + p.storage.getInputFormat.foreach(storageDesc.setInputFormat) + p.storage.getOutputFormat.foreach(storageDesc.setOutputFormat) + p.storage.getSerde.foreach(serdeInfo.setSerializationLib) + serdeInfo.setParameters(p.storage.getProperties.asJava) storageDesc.setSerdeInfo(serdeInfo) tpart.setDbName(ht.getDbName) tpart.setTableName(ht.getTableName) @@ -809,14 +815,23 @@ private[hive] class HiveClientImpl( private def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition + + var storage = CatalogStorageFormat( + locationUri = Option(apiPartition.getSd.getLocation), + provider = Some("hive"), + properties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap) + Option(apiPartition.getSd.getInputFormat).foreach { inFmt => + storage = storage.withInputFormat(inFmt) + } + Option(apiPartition.getSd.getOutputFormat).foreach { outFmt => + storage = storage.withOutputFormat(outFmt) + } + Option(apiPartition.getSd.getSerdeInfo.getSerializationLib).foreach { serde => + storage = storage.withSerde(serde) + } + CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), - storage = CatalogStorageFormat( - locationUri = Option(apiPartition.getSd.getLocation), - inputFormat = Option(apiPartition.getSd.getInputFormat), - outputFormat = Option(apiPartition.getSd.getOutputFormat), - serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), - compressed = apiPartition.getSd.isCompressed, - serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) + storage = storage) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 2762e0cdd56ab..8027f898613bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -52,15 +52,19 @@ case class CreateHiveTableAsSelectCommand( import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.TextInputFormat - val withFormat = - tableDesc.withNewStorage( - inputFormat = - tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)), - outputFormat = - tableDesc.storage.outputFormat - .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)), - serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)), - compressed = tableDesc.storage.compressed) + var newStorage = tableDesc.storage + if (newStorage.getInputFormat.isEmpty) { + newStorage = newStorage.withInputFormat(classOf[TextInputFormat].getName) + } + if (newStorage.getOutputFormat.isEmpty) { + newStorage = newStorage.withOutputFormat( + classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName) + } + if (newStorage.getSerde.isEmpty) { + newStorage = newStorage.withSerde(classOf[LazySimpleSerDe].getName) + } + + val withFormat = tableDesc.copy(storage = newStorage) val withSchema = if (withFormat.schema.isEmpty) { // Hive doesn't support specifying the column list for target table in CTAS diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 867aadb5f5569..b47f4dd95533c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -69,9 +69,9 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.partitionColumns == Seq.empty[CatalogColumn]) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.storage.serde == + assert(desc.storage.getInputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.getOutputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.getSerde == Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) } @@ -100,10 +100,10 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.partitionColumns == Seq.empty[CatalogColumn]) - assert(desc.storage.serdeProperties == Map()) - assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) - assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) - assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe")) + assert(desc.storage.getProperties == Map()) + assert(desc.storage.getInputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) + assert(desc.storage.getOutputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) + assert(desc.storage.getSerde == Some("parquet.hive.serde.ParquetHiveSerDe")) assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) } @@ -118,11 +118,11 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText assert(desc.viewOriginalText.isEmpty) - assert(desc.storage.serdeProperties == Map()) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) - assert(desc.storage.outputFormat == + assert(desc.storage.getProperties == Map()) + assert(desc.storage.getInputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(desc.storage.getOutputFormat == Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.getSerde.isEmpty) assert(desc.properties == Map()) } @@ -154,10 +154,10 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText assert(desc.viewOriginalText.isEmpty) - assert(desc.storage.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) + assert(desc.storage.getProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) + assert(desc.storage.getInputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.getOutputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.getSerde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22"))) } @@ -300,12 +300,12 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.storage.locationUri.isEmpty) - assert(desc.storage.inputFormat == + assert(desc.storage.getInputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) - assert(desc.storage.outputFormat == + assert(desc.storage.getOutputFormat == Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde.isEmpty) - assert(desc.storage.serdeProperties.isEmpty) + assert(desc.storage.getSerde.isEmpty) + assert(desc.storage.getProperties.isEmpty) assert(desc.properties.isEmpty) assert(desc.comment.isEmpty) } @@ -390,11 +390,11 @@ class HiveDDLCommandSuite extends PlanTest { val (desc1, _) = extractTableDesc(query1) val (desc2, _) = extractTableDesc(query2) val (desc3, _) = extractTableDesc(query3) - assert(desc1.storage.serde == Some("org.apache.poof.serde.Baff")) - assert(desc1.storage.serdeProperties.isEmpty) - assert(desc2.storage.serde == Some("org.apache.poof.serde.Baff")) - assert(desc2.storage.serdeProperties == Map("k1" -> "v1")) - assert(desc3.storage.serdeProperties == Map( + assert(desc1.storage.getSerde == Some("org.apache.poof.serde.Baff")) + assert(desc1.storage.getProperties.isEmpty) + assert(desc2.storage.getSerde == Some("org.apache.poof.serde.Baff")) + assert(desc2.storage.getProperties == Map("k1" -> "v1")) + assert(desc3.storage.getProperties == Map( "field.delim" -> "x", "escape.delim" -> "y", "serialization.format" -> "x", @@ -409,12 +409,13 @@ class HiveDDLCommandSuite extends PlanTest { val query2 = s"$baseQuery ORC" val (desc1, _) = extractTableDesc(query1) val (desc2, _) = extractTableDesc(query2) - assert(desc1.storage.inputFormat == Some("winput")) - assert(desc1.storage.outputFormat == Some("wowput")) - assert(desc1.storage.serde.isEmpty) - assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) - assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) - assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + assert(desc1.storage.getInputFormat == Some("winput")) + assert(desc1.storage.getOutputFormat == Some("wowput")) + assert(desc1.storage.getSerde.isEmpty) + assert(desc2.storage.getInputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) + assert(desc2.storage.getOutputFormat == + Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) + assert(desc2.storage.getSerde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) } test("create table - storage handler") { @@ -460,10 +461,10 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) assert(desc.storage.locationUri == Some("/path/to/mercury")) - assert(desc.storage.inputFormat == Some("winput")) - assert(desc.storage.outputFormat == Some("wowput")) - assert(desc.storage.serde == Some("org.apache.poof.serde.Baff")) - assert(desc.storage.serdeProperties == Map("k1" -> "v1")) + assert(desc.storage.getInputFormat == Some("winput")) + assert(desc.storage.getOutputFormat == Some("wowput")) + assert(desc.storage.getSerde == Some("org.apache.poof.serde.Baff")) + assert(desc.storage.getProperties == Map("k1" -> "v1")) assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2")) assert(desc.comment == Some("no comment")) } @@ -479,10 +480,10 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == Option("SELECT * FROM tab1")) assert(desc.viewOriginalText == Option("SELECT * FROM tab1")) - assert(desc.storage.serdeProperties == Map()) - assert(desc.storage.inputFormat.isEmpty) - assert(desc.storage.outputFormat.isEmpty) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.getProperties == Map()) + assert(desc.storage.getInputFormat.isEmpty) + assert(desc.storage.getOutputFormat.isEmpty) + assert(desc.storage.getSerde.isEmpty) assert(desc.properties == Map()) } @@ -505,10 +506,10 @@ class HiveDDLCommandSuite extends PlanTest { CatalogColumn("col3", null, nullable = true, None) :: Nil) assert(desc.viewText == Option("SELECT * FROM tab1")) assert(desc.viewOriginalText == Option("SELECT * FROM tab1")) - assert(desc.storage.serdeProperties == Map()) - assert(desc.storage.inputFormat.isEmpty) - assert(desc.storage.outputFormat.isEmpty) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.getProperties == Map()) + assert(desc.storage.getInputFormat.isEmpty) + assert(desc.storage.getOutputFormat.isEmpty) + assert(desc.storage.getSerde.isEmpty) assert(desc.properties == Map("prop1Key" -> "prop1Val")) assert(desc.comment == Option("BLABLA")) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 754aabb5ac936..84ec5941e0c55 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -93,9 +93,9 @@ class DataSourceWithHiveMetastoreCatalogSuite } val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) - assert(hiveTable.storage.inputFormat === Some(inputFormat)) - assert(hiveTable.storage.outputFormat === Some(outputFormat)) - assert(hiveTable.storage.serde === Some(serde)) + assert(hiveTable.storage.getInputFormat === Some(inputFormat)) + assert(hiveTable.storage.getOutputFormat === Some(outputFormat)) + assert(hiveTable.storage.getSerde === Some(serde)) assert(hiveTable.partitionColumnNames.isEmpty) assert(hiveTable.tableType === CatalogTableType.MANAGED) @@ -125,9 +125,9 @@ class DataSourceWithHiveMetastoreCatalogSuite val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) - assert(hiveTable.storage.inputFormat === Some(inputFormat)) - assert(hiveTable.storage.outputFormat === Some(outputFormat)) - assert(hiveTable.storage.serde === Some(serde)) + assert(hiveTable.storage.getInputFormat === Some(inputFormat)) + assert(hiveTable.storage.getOutputFormat === Some(outputFormat)) + assert(hiveTable.storage.getSerde === Some(serde)) assert(hiveTable.tableType === CatalogTableType.EXTERNAL) assert(hiveTable.storage.locationUri === @@ -157,9 +157,9 @@ class DataSourceWithHiveMetastoreCatalogSuite val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) - assert(hiveTable.storage.inputFormat === Some(inputFormat)) - assert(hiveTable.storage.outputFormat === Some(outputFormat)) - assert(hiveTable.storage.serde === Some(serde)) + assert(hiveTable.storage.getInputFormat === Some(inputFormat)) + assert(hiveTable.storage.getOutputFormat === Some(outputFormat)) + assert(hiveTable.storage.getSerde === Some(serde)) assert(hiveTable.partitionColumnNames.isEmpty) assert(hiveTable.tableType === CatalogTableType.EXTERNAL) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 12d250d4fb604..e94584168e888 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -729,11 +729,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv schema = Seq.empty, storage = CatalogStorageFormat( locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = Map( + provider = None, + properties = Map( "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), properties = Map( @@ -1171,8 +1168,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv checkAnswer(table("t"), Seq(Row(1, 2, 3), Row(2, 3, 4))) val catalogTable = sharedState.externalCatalog.getTable("default", "t") // there should not be a lowercase key 'path' now - assert(catalogTable.storage.serdeProperties.get("path").isEmpty) - assert(catalogTable.storage.serdeProperties.get("PATH").isDefined) + assert(catalogTable.storage.properties.get("path").isEmpty) + assert(catalogTable.storage.properties.get("PATH").isDefined) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 83f1b192f7c94..7ba880e476137 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -29,7 +29,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val expectedPath = spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName - assert(metastoreTable.storage.serdeProperties("path") === expectedPath) + assert(metastoreTable.storage.properties("path") === expectedPath) } private def getTableNames(dbName: Option[String] = None): Array[String] = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index a972f61e25858..57eae35a9bb22 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -147,14 +147,10 @@ class VersionsSuite extends SparkFunSuite with Logging { identifier = TableIdentifier(tableName, Some(database)), tableType = CatalogTableType.MANAGED, schema = Seq(CatalogColumn("key", "int")), - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = Some(classOf[TextInputFormat].getName), - outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), - serde = Some(classOf[LazySimpleSerDe].getName()), - compressed = false, - serdeProperties = Map.empty - )) + storage = CatalogStorageFormat.empty + .withInputFormat(classOf[TextInputFormat].getName) + .withOutputFormat(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName) + .withSerde(classOf[LazySimpleSerDe].getName)) } /////////////////////////////////////////////////////////////////////////// @@ -269,13 +265,7 @@ class VersionsSuite extends SparkFunSuite with Logging { // Partition related API /////////////////////////////////////////////////////////////////////////// - val storageFormat = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = Map.empty) + val storageFormat = CatalogStorageFormat.empty test(s"$version: sql create partitioned table") { client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)") @@ -366,10 +356,9 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: alterPartitions") { val spec = Map("key1" -> "1", "key2" -> "2") val newLocation = Utils.createTempDir().getPath() - val storage = storageFormat.copy( - locationUri = Some(newLocation), + val storage = storageFormat.copy(locationUri = Some(newLocation)) // needed for 0.12 alter partitions - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + .withSerde("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val partition = CatalogTablePartition(spec, storage) client.alterPartitions("default", "src_part", Seq(partition)) assert(client.getPartition("default", "src_part", spec) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 343d7bae98bff..aa15d411caacb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -355,16 +355,16 @@ class HiveDDLSuite val expectedSerdePropsString = expectedSerdeProps.map { case (k, v) => s"'$k'='$v'" }.mkString(", ") val oldPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" -> "4")) - assume(oldPart.storage.serde != Some(expectedSerde), "bad test: serde was already set") - assume(oldPart.storage.serdeProperties.filterKeys(expectedSerdeProps.contains) != + assume(oldPart.storage.getSerde != Some(expectedSerde), "bad test: serde was already set") + assume(oldPart.storage.getProperties.filterKeys(expectedSerdeProps.contains) != expectedSerdeProps, "bad test: serde properties were already set") sql(s"""ALTER TABLE boxes PARTITION (width=4) | SET SERDE '$expectedSerde' | WITH SERDEPROPERTIES ($expectedSerdePropsString) |""".stripMargin) val newPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" -> "4")) - assert(newPart.storage.serde == Some(expectedSerde)) - assume(newPart.storage.serdeProperties.filterKeys(expectedSerdeProps.contains) == + assert(newPart.storage.getSerde == Some(expectedSerde)) + assume(newPart.storage.getProperties.filterKeys(expectedSerdeProps.contains) == expectedSerdeProps) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 961d95c268b2c..4c1ecce1f96d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -434,7 +434,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { case None => // OK. } // Also make sure that the format is the desired format. - assert(catalogTable.storage.inputFormat.get.toLowerCase.contains(format)) + assert(catalogTable.storage.getInputFormat.get.toLowerCase.contains(format)) } // When a user-specified location is defined, the table type needs to be EXTERNAL. From 87fbff072e03abf63d862dc6222e7986332a4e88 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 8 Jul 2016 11:07:13 +0800 Subject: [PATCH 2/3] address comments --- .../apache/spark/sql/execution/SparkSqlParser.scala | 4 ++++ .../sql/execution/command/createDataSourceTables.scala | 2 +- .../apache/spark/sql/execution/command/tables.scala | 4 ++++ .../org/apache/spark/sql/hive/MetastoreRelation.scala | 10 +++++----- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index fe7e4acd50f88..573c97d232fa2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -956,10 +956,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { provider = Some("hive"), properties = Map.empty) Option(ctx.createFileFormat).foreach(ctx => storage = getFileFormat(ctx, storage)) + // if both file format and row format specifies serde, the one from row format wins. Option(ctx.rowFormat).foreach(ctx => storage = getRowFormat(ctx, storage)) val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) + // set default values for hive formats. Note that serde doesn't have default value, because we + // use the presence of the serde to decide whether to convert a table created by CTAS to a + // datasource table. if (storage.getInputFormat.isEmpty) { storage = storage.withInputFormat(defaultHiveSerde.flatMap(_.inputFormat) .getOrElse("org.apache.hadoop.mapred.TextInputFormat")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index b5653e8052e5f..580184b3b8742 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -401,7 +401,7 @@ object CreateDataSourceTableUtils extends Logging { assert(relation.partitionSchema.isEmpty) var storage = CatalogStorageFormat( - locationUri = None, + locationUri = Some(relation.location.paths.map(_.toUri.toString).head), provider = Some("hive"), properties = options) serde.inputFormat.foreach(inFmt => storage = storage.withInputFormat(inFmt)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 1dfc12348b463..cc71897f89050 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -43,6 +43,7 @@ case class CreateHiveTableAsSelectLogicalPlan( tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean) extends UnaryNode with Command { + assert(tableDesc.storage.provider == Some("hive")) override def output: Seq[Attribute] = Seq.empty[Attribute] @@ -119,6 +120,7 @@ case class CreateTableLikeCommand( * }}} */ case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { + assert(table.storage.provider == Some("hive")) override def run(sparkSession: SparkSession): Seq[Row] = { DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE") @@ -446,6 +448,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF partCols.foreach(col => append(buffer, col, "", "")) } } else { + assert(table.storage.provider == Some("hive")) describeSchema(table.schema, buffer) if (table.partitionColumns.nonEmpty) { @@ -744,6 +747,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { showCreateDataSourceTable(tableMetadata) } else { + assert(tableMetadata.storage.provider == Some("hive")) showCreateHiveTable(tableMetadata) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index d6d84e744804c..406cebcc0e359 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -154,17 +154,17 @@ private[hive] case class MetastoreRelation( private def toHiveStorage(storage: CatalogStorageFormat, schema: Seq[FieldSchema]) = { val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() sd.setCols(schema.asJava) - catalogTable.storage.locationUri.foreach(sd.setLocation) - catalogTable.storage.getInputFormat.foreach(sd.setInputFormat) - catalogTable.storage.getOutputFormat.foreach(sd.setOutputFormat) + storage.locationUri.foreach(sd.setLocation) + storage.getInputFormat.foreach(sd.setInputFormat) + storage.getOutputFormat.foreach(sd.setOutputFormat) val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo sd.setSerdeInfo(serdeInfo) // maps and lists should be set only after all elements are ready (see HIVE-7975) - catalogTable.storage.getSerde.foreach(serdeInfo.setSerializationLib) + storage.getSerde.foreach(serdeInfo.setSerializationLib) val serdeParameters = new java.util.HashMap[String, String]() - catalogTable.storage.getProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + storage.getProperties.foreach { case (k, v) => serdeParameters.put(k, v) } serdeInfo.setParameters(serdeParameters) sd } From d18ac598b27083962b4a783996a5bf39760b0369 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 13 Jul 2016 22:10:18 +0800 Subject: [PATCH 3/3] refine --- .../sql/catalyst/catalog/interface.scala | 26 ++- .../catalog/ExternalCatalogSuite.scala | 10 +- .../spark/sql/execution/SparkSqlParser.scala | 201 ++++++++++-------- .../command/createDataSourceTables.scala | 11 +- .../spark/sql/execution/command/ddl.scala | 28 ++- .../spark/sql/execution/command/tables.scala | 22 +- .../datasources/DataSourceStrategy.scala | 2 +- .../sql/execution/command/DDLSuite.scala | 1 - .../spark/sql/hive/HiveExternalCatalog.scala | 4 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 5 +- .../sql/hive/client/HiveClientImpl.scala | 32 +-- .../CreateHiveTableAsSelectCommand.scala | 18 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 1 - .../spark/sql/hive/client/VersionsSuite.scala | 8 +- 14 files changed, 183 insertions(+), 186 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 882612883abed..f3ee99e045579 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -45,13 +45,10 @@ case class CatalogFunction( * * @param locationUri the storage location of the table/partition, can be None if it has no storage, * e.g. view, or it's not file-based, e.g. JDBC data source table. - * @param provider the name of the data source provider, e.g. parquet, json, etc. Can be None if it - * has no storage. Note that `hive` is also one kind of provider. * @param properties used to store some extra information for the storage. */ case class CatalogStorageFormat( locationUri: Option[String], - provider: Option[String], properties: Map[String, String]) { override def toString: String = { @@ -63,23 +60,22 @@ case class CatalogStorageFormat( } val output = Seq( locationUri.map("Location: " + _).getOrElse(""), - provider.map("Provider: " + _).getOrElse(""), propertiesToString) output.filter(_.nonEmpty).mkString("Storage(", ", ", ")") } // TODO: remove these hive hacks - def withInputFormat(inFmt: String): CatalogStorageFormat = { - this.copy(properties = properties + ("inputFormat" -> inFmt)) + def withInputFormat(inFmt: Option[String]): CatalogStorageFormat = { + inFmt.map(i => copy(properties = properties + ("inputFormat" -> i))).getOrElse(this) } - def withOutputFormat(outFmt: String): CatalogStorageFormat = { - this.copy(properties = properties + ("outputFormat" -> outFmt)) + def withOutputFormat(outFmt: Option[String]): CatalogStorageFormat = { + outFmt.map(o => copy(properties = properties + ("outputFormat" -> o))).getOrElse(this) } - def withSerde(serde: String): CatalogStorageFormat = { - this.copy(properties = properties + ("serde" -> serde)) + def withSerde(serde: Option[String]): CatalogStorageFormat = { + serde.map(s => copy(properties = properties + ("serde" -> s))).getOrElse(this) } def getInputFormat: Option[String] = properties.get("inputFormat") @@ -93,7 +89,7 @@ case class CatalogStorageFormat( object CatalogStorageFormat { /** Empty storage format for default values and copies. */ - val empty = CatalogStorageFormat(None, None, Map.empty) + val empty = CatalogStorageFormat(None, Map.empty) } /** @@ -135,6 +131,8 @@ case class CatalogTablePartition( * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the * future once we have a better understanding of how we want to handle skewed columns. * + * @param provider the name of the data source provider, e.g. parquet, json, etc. + * Note that `hive` is also one kind of provider. * @param unsupportedFeatures is a list of string descriptions of features that are used by the * underlying table but not supported by Spark SQL yet. */ @@ -143,6 +141,7 @@ case class CatalogTable( tableType: CatalogTableType, storage: CatalogStorageFormat, schema: Seq[CatalogColumn], + provider: Option[String] = None, partitionColumnNames: Seq[String] = Seq.empty, sortColumnNames: Seq[String] = Seq.empty, bucketColumnNames: Seq[String] = Seq.empty, @@ -178,6 +177,11 @@ case class CatalogTable( /** Return the fully qualified name of this table, assuming the database was specified. */ def qualifiedName: String = identifier.unquotedString + /** Syntactic sugar to update the `storage`. */ + def mapStorage(f: CatalogStorageFormat => CatalogStorageFormat): CatalogTable = { + copy(storage = f(storage)) + } + override def toString: String = { val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") val partitionColumns = partitionColumnNames.map("`" + _ + "`").mkString("[", ", ", "]") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 1574e3d1b4cdc..4cdc633b2ca2a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -398,7 +398,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(oldPart2.storage.locationUri != Some(newLocation)) // alter other storage information catalog.alterPartitions("db2", "tbl2", Seq( - oldPart1.copy(storage = storageFormat.withSerde(newSerde)), + oldPart1.copy(storage = storageFormat.withSerde(Some(newSerde))), oldPart2.copy(storage = storageFormat.copy(properties = newSerdeProps)))) val newPart1b = catalog.getPartition("db2", "tbl2", part1.spec) val newPart2b = catalog.getPartition("db2", "tbl2", part2.spec) @@ -569,7 +569,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat( Some(Utils.createTempDir().getAbsolutePath), - None, Map.empty), + Map.empty), schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")) ) catalog.createTable("db1", externalTable, ignoreIfExists = false) @@ -609,7 +609,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac Map("a" -> "7", "b" -> "8"), CatalogStorageFormat( Some(Utils.createTempDir().getAbsolutePath), - None, Map.empty) + Map.empty) ) catalog.createPartitions("db1", "tbl", Seq(externalPartition), ignoreIfExists = false) assert(!exists(databaseDir, "tbl", "a=7", "b=8")) @@ -629,8 +629,8 @@ abstract class CatalogTestUtils { // These fields must be lazy because they rely on fields that are not implemented yet lazy val storageFormat = CatalogStorageFormat.empty - .withInputFormat(tableInputFormat) - .withOutputFormat(tableOutputFormat) + .withInputFormat(Some(tableInputFormat)) + .withOutputFormat(Some(tableOutputFormat)) lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 573c97d232fa2..74cad2b909e31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.util.Try import org.antlr.v4.runtime.{ParserRuleContext, Token} @@ -923,7 +922,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (ctx.bucketSpec != null) { operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) } - val comment = Option(ctx.comment).map(string) + val comment = Option(ctx.STRING).map(string) val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns) val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns) val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) @@ -950,37 +949,39 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // to include the partition columns here explicitly val schema = cols ++ partitionCols - validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) - var storage = CatalogStorageFormat( - locationUri = Option(ctx.locationSpec).map(visitLocationSpec), - provider = Some("hive"), - properties = Map.empty) - Option(ctx.createFileFormat).foreach(ctx => storage = getFileFormat(ctx, storage)) - // if both file format and row format specifies serde, the one from row format wins. - Option(ctx.rowFormat).foreach(ctx => storage = getRowFormat(ctx, storage)) - - val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") - val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) - // set default values for hive formats. Note that serde doesn't have default value, because we - // use the presence of the serde to decide whether to convert a table created by CTAS to a - // datasource table. - if (storage.getInputFormat.isEmpty) { - storage = storage.withInputFormat(defaultHiveSerde.flatMap(_.inputFormat) - .getOrElse("org.apache.hadoop.mapred.TextInputFormat")) - } - if (storage.getOutputFormat.isEmpty) { - storage = storage.withOutputFormat(defaultHiveSerde.flatMap(_.outputFormat) - .getOrElse("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + // default input/output format + val (defaultInputFormat, defaultOutputFormat) = { + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) + val inFmt = defaultHiveSerde.flatMap(_.inputFormat) + .getOrElse("org.apache.hadoop.mapred.TextInputFormat") + val outFmt = defaultHiveSerde.flatMap(_.outputFormat) + .getOrElse("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat") + inFmt -> outFmt } - + validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) + val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) + .getOrElse(CatalogStorageFormat.empty) + val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) + .getOrElse(CatalogStorageFormat.empty) + val location = Option(ctx.locationSpec).map(visitLocationSpec) // If we are creating an EXTERNAL table, then the LOCATION field is required - if (external && storage.locationUri.isEmpty) { + if (external && location.isEmpty) { operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) } - + val storage = CatalogStorageFormat( + locationUri = location, + properties = rowStorage.getProperties ++ fileStorage.getProperties + ).withInputFormat(Some(defaultInputFormat)) + .withInputFormat(fileStorage.getInputFormat) + .withOutputFormat(Some(defaultOutputFormat)) + .withOutputFormat(fileStorage.getOutputFormat) + // if both file format and row format specifies serde, the one from row format wins. + .withSerde(fileStorage.getSerde) + .withSerde(rowStorage.getSerde) // If location is defined, we'll assume this is an external table. // Otherwise, we may accidentally delete existing data. - val tableType = if (external || storage.locationUri.isDefined) { + val tableType = if (external || location.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -1019,8 +1020,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties // are empty Maps. - val optionsWithPath = if (storage.locationUri.isDefined) { - Map("path" -> storage.locationUri.get) + val optionsWithPath = if (location.isDefined) { + Map("path" -> location.get) } else { Map.empty[String, String] } @@ -1056,39 +1057,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Parse the `createFileFormat` and put the storage information into the passed in map. - * - * Example format: - * {{{ - * STORED AS INPUTFORMAT formatName1 OUTPUTFORMAT formatName2 - * }}} - * - * OR + * Create a [[CatalogStorageFormat]] for creating tables. * - * {{{ - * STORED AS fileFormatName - * }}} + * Format: STORED AS ... */ - private def getFileFormat( - ctx: CreateFileFormatContext, - storage: CatalogStorageFormat): CatalogStorageFormat = { + override def visitCreateFileFormat( + ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { (ctx.fileFormat, ctx.storageHandler) match { // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format case (c: TableFileFormatContext, null) => - storage.withInputFormat(string(c.inFmt)).withOutputFormat(string(c.outFmt)) + visitTableFileFormat(c) // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO case (c: GenericFileFormatContext, null) => - val source = c.identifier.getText - HiveSerDe.sourceToSerDe(source, conf) match { - case Some(s) => - var result = storage - s.inputFormat.foreach(inFmt => result = result.withInputFormat(inFmt)) - s.outputFormat.foreach(outFmt => result = result.withOutputFormat(outFmt)) - s.serde.foreach(serde => result = result.withSerde(serde)) - result - case None => - operationNotAllowed(s"STORED AS with file format '$source'", ctx) - } + visitGenericFileFormat(c) case (null, storageHandler) => operationNotAllowed("STORED BY", ctx) case _ => @@ -1097,7 +1078,34 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Parse the `rowFormat` and put the storage information into the passed in map. + * Create a [[CatalogStorageFormat]]. + */ + override def visitTableFileFormat( + ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + CatalogStorageFormat.empty + .withInputFormat(Option(ctx.inFmt).map(string)) + .withOutputFormat(Option(ctx.outFmt).map(string)) + } + + /** + * Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]]. + */ + override def visitGenericFileFormat( + ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + val source = ctx.identifier.getText + HiveSerDe.sourceToSerDe(source, conf) match { + case Some(s) => + CatalogStorageFormat.empty + .withInputFormat(s.inputFormat) + .withOutputFormat(s.outputFormat) + .withSerde(s.serde) + case None => + operationNotAllowed(s"STORED AS with file format '$source'", ctx) + } + } + + /** + * Create a [[CatalogStorageFormat]] used for creating tables. * * Example format: * {{{ @@ -1114,41 +1122,52 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * [NULL DEFINED AS char] * }}} */ - private def getRowFormat( - ctx: RowFormatContext, - storage: CatalogStorageFormat): CatalogStorageFormat = withOrigin(ctx) { + private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) { ctx match { - case serde: RowFormatSerdeContext => - var result = storage.withSerde(string(serde.name)) - Option(serde.props).map(visitPropertyKeyValues).foreach { props => - result = result.copy(properties = result.properties ++ props) - } - result - case delimited: RowFormatDelimitedContext => - // Collect the entries if any. - def entry(key: String, value: Token): Option[(String, String)] = { - Option(value).map(x => key -> string(x)) - } - // TODO we need proper support for the NULL format. - val entries = - entry("field.delim", delimited.fieldsTerminatedBy) ++ - entry("serialization.format", delimited.fieldsTerminatedBy) ++ - entry("escape.delim", delimited.escapedBy) ++ - // The following typo is inherited from Hive... - entry("colelction.delim", delimited.collectionItemsTerminatedBy) ++ - entry("mapkey.delim", delimited.keysTerminatedBy) ++ - Option(delimited.linesSeparatedBy).toSeq.map { token => - val value = string(token) - assert( - value == "\n", - s"LINES TERMINATED BY only supports newline '\\n' right now: $value", - ctx) - "line.delim" -> value - } - storage.copy(properties = storage.properties ++ entries) + case serde: RowFormatSerdeContext => visitRowFormatSerde(serde) + case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited) } } + /** + * Create SERDE row format name and properties pair. + */ + override def visitRowFormatSerde( + ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { + import ctx._ + CatalogStorageFormat.empty + .copy(properties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) + .withSerde(Option(name).map(string)) + } + + /** + * Create a delimited row format properties object. + */ + override def visitRowFormatDelimited( + ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) { + // Collect the entries if any. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).toSeq.map(x => key -> string(x)) + } + // TODO we need proper support for the NULL format. + val entries = + entry("field.delim", ctx.fieldsTerminatedBy) ++ + entry("serialization.format", ctx.fieldsTerminatedBy) ++ + entry("escape.delim", ctx.escapedBy) ++ + // The following typo is inherited from Hive... + entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ + entry("mapkey.delim", ctx.keysTerminatedBy) ++ + Option(ctx.linesSeparatedBy).toSeq.map { token => + val value = string(token) + assert( + value == "\n", + s"LINES TERMINATED BY only supports newline '\\n' right now: $value", + ctx) + "line.delim" -> value + } + CatalogStorageFormat.empty.copy(properties = entries.toMap) + } + /** * Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT * and STORED AS. @@ -1310,8 +1329,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // expects a seq of pairs in which the old parsers' token names are used as keys. // Transforming the result of visitRowFormatDelimited would be quite a bit messier than // retrieving the key value pairs ourselves. - def entry(key: String, value: Token): Option[(String, String)] = { - Option(value).map(t => key -> t.getText) + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).map(t => key -> t.getText).toSeq } val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ @@ -1319,11 +1338,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) - (entries.toSeq, None, Seq.empty, None) + (entries, None, Seq.empty, None) case c: RowFormatSerdeContext => + // Use a serde format. val props = Option(c.props).map(visitPropertyKeyValues).getOrElse(Nil) val name = string(c.name) + // SPARK-10310: Special cases LazySimpleSerDe val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { Try(conf.getConfString(configKey)).toOption diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 580184b3b8742..3aa36ecdb5541 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -388,7 +388,6 @@ object CreateDataSourceTableUtils extends Logging { schema = Nil, storage = CatalogStorageFormat( locationUri = None, - provider = None, properties = options ), properties = tableProperties.toMap) @@ -400,18 +399,18 @@ object CreateDataSourceTableUtils extends Logging { assert(partitionColumns.isEmpty) assert(relation.partitionSchema.isEmpty) - var storage = CatalogStorageFormat( + val storage = CatalogStorageFormat( locationUri = Some(relation.location.paths.map(_.toUri.toString).head), - provider = Some("hive"), properties = options) - serde.inputFormat.foreach(inFmt => storage = storage.withInputFormat(inFmt)) - serde.outputFormat.foreach(outFmt => storage = storage.withOutputFormat(outFmt)) - serde.serde.foreach(sd => storage = storage.withSerde(sd)) + .withInputFormat(serde.inputFormat) + .withOutputFormat(serde.outputFormat) + .withSerde(serde.serde) CatalogTable( identifier = tableIdent, tableType = tableType, storage = storage, + provider = Some("hive"), schema = relation.schema.map { f => CatalogColumn(f.name, f.dataType.catalogString) }, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 504b1f7f986a0..560cd27c46486 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -318,21 +318,18 @@ case class AlterTableSerDePropertiesCommand( "not supported for tables created with the datasource API") } if (partSpec.isEmpty) { - var newStorage = table.storage - serdeClassName.foreach(serde => newStorage = newStorage.withSerde(serde)) - serdeProperties.foreach { props => - newStorage = newStorage.copy(properties = newStorage.properties ++ props) + val newTable = table.mapStorage { storage => + storage + .copy(properties = storage.properties ++ serdeProperties.getOrElse(Map.empty)) + .withSerde(serdeClassName) } - val newTable = table.copy(storage = newStorage) catalog.alterTable(newTable) } else { val spec = partSpec.get val part = catalog.getPartition(tableName, spec) - var newStorage = part.storage - serdeClassName.foreach(serde => newStorage = newStorage.withSerde(serde)) - serdeProperties.foreach { props => - newStorage = newStorage.copy(properties = newStorage.properties ++ props) - } + val newStorage = part.storage.copy( + properties = part.storage.properties ++ serdeProperties.getOrElse(Map.empty) + ).withSerde(serdeClassName) val newPart = part.copy(storage = newStorage) catalog.alterPartitions(tableName, Seq(newPart)) } @@ -470,12 +467,13 @@ case class AlterTableSetLocationCommand( // No partition spec is specified, so we set the location for the table itself val newTable = if (DDLUtils.isDatasourceTable(table)) { - table.copy(storage = table.storage.copy( - locationUri = Some(location), - properties = table.storage.properties + ("path" -> location) - )) + table.mapStorage { storage => + storage.copy( + locationUri = Some(location), + properties = storage.properties + ("path" -> location)) + } } else { - table.copy(storage = table.storage.copy(locationUri = Some(location))) + table.mapStorage(_.copy(locationUri = Some(location))) } catalog.alterTable(newTable) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index cc71897f89050..960602b9ac49c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -43,7 +43,7 @@ case class CreateHiveTableAsSelectLogicalPlan( tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean) extends UnaryNode with Command { - assert(tableDesc.storage.provider == Some("hive")) + assert(tableDesc.provider == Some("hive")) override def output: Seq[Attribute] = Seq.empty[Attribute] @@ -81,13 +81,11 @@ case class CreateTableLikeCommand( s"Source table in CREATE TABLE LIKE cannot be temporary: '$sourceTable'") } - val sourceTableMeta = catalog.getTableMetadata(sourceTable) - val tableToCreate = sourceTableMeta.copy( + val tableToCreate = catalog.getTableMetadata(sourceTable).copy( identifier = targetTable, tableType = CatalogTableType.MANAGED, createTime = System.currentTimeMillis, - lastAccessTime = -1, - storage = sourceTableMeta.storage.copy(locationUri = None)) + lastAccessTime = -1).mapStorage(_.copy(locationUri = None)) catalog.createTable(tableToCreate, ifNotExists) Seq.empty[Row] @@ -120,11 +118,11 @@ case class CreateTableLikeCommand( * }}} */ case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { - assert(table.storage.provider == Some("hive")) + assert(table.provider == Some("hive")) override def run(sparkSession: SparkSession): Seq[Row] = { DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE") - DDLUtils.verifyTableProperties(table.storage.properties.keys.toSeq, "CREATE TABLE") + DDLUtils.verifyTableProperties(table.storage.getProperties.keys.toSeq, "CREATE TABLE") sparkSession.sessionState.catalog.createTable(table, ifNotExists) Seq.empty[Row] } @@ -170,9 +168,9 @@ case class AlterTableRenameCommand( val table = catalog.getTableMetadata(oldName) if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) { val newPath = catalog.defaultTablePath(newName) - val newTable = table.copy( - storage = table.storage.copy( - properties = table.storage.properties + ("path" -> newPath))) + val newTable = table.mapStorage { storage => + storage.copy(properties = storage.properties + ("path" -> newPath)) + } catalog.alterTable(newTable) } // Invalidate the table last, otherwise uncaching the table would load the logical plan @@ -448,7 +446,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF partCols.foreach(col => append(buffer, col, "", "")) } } else { - assert(table.storage.provider == Some("hive")) + assert(table.provider == Some("hive")) describeSchema(table.schema, buffer) if (table.partitionColumns.nonEmpty) { @@ -747,7 +745,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { showCreateDataSourceTable(tableMetadata) } else { - assert(tableMetadata.storage.provider == Some("hive")) + assert(tableMetadata.provider == Some("hive")) showCreateHiveTable(tableMetadata) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 8ffdc507db529..6bd11130902fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -214,7 +214,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[ val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table) - val options = table.storage.properties + val options = table.storage.getProperties val dataSource = DataSource( sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d5a342a0aa44c..413e789077f12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -80,7 +80,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val storage = CatalogStorageFormat( locationUri = Some(catalog.defaultTablePath(name)), - provider = None, properties = Map.empty) CatalogTable( identifier = name, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 7ede725b6bb4c..eb486a14c4eae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -179,9 +179,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu try { client.createTable( - tableDefinition.copy(storage = tableDefinition.storage.copy( - locationUri = Some(tempPath.toString) - )), + tableDefinition.mapStorage(_.copy(locationUri = Some(tempPath.toString))), ignoreIfExists) } finally { FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 1ddb541e8d662..02f0749d4c1dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -119,7 +119,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort")) } - val options = table.storage.properties + val options = table.storage.getProperties val dataSource = DataSource( sparkSession, @@ -443,8 +443,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) => val desc = if (table.storage.getSerde.isEmpty) { // add default serde - table.copy(storage = - table.storage.withSerde("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + table.mapStorage(_.withSerde(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))) } else { table } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 101eef5b3a1c9..8395120c97791 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -355,7 +355,7 @@ private[hive] class HiveClientImpl( val properties = h.getParameters.asScala.toMap - var storage = CatalogStorageFormat( + val storage = CatalogStorageFormat( locationUri = shim.getDataLocation(h).filterNot { _ => // SPARK-15269: Persisted data source tables always store the location URI as a SerDe // property named "path" instead of standard Hive `dataLocation`, because Hive only @@ -370,16 +370,10 @@ private[hive] class HiveClientImpl( // because only these Hive compatible tables set this field. h.getInputFormatClass == null }, - provider = Some("hive"), properties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap - ) - Option(h.getInputFormatClass).map(_.getName).foreach { inFmt => - storage = storage.withInputFormat(inFmt) - } - Option(h.getOutputFormatClass).map(_.getName).foreach { outFmt => - storage = storage.withOutputFormat(outFmt) - } - Option(h.getSerializationLib).foreach(serde => storage = storage.withSerde(serde)) + ).withInputFormat(Option(h.getInputFormatClass).map(_.getName)) + .withOutputFormat(Option(h.getOutputFormatClass).map(_.getName)) + .withSerde(Option(h.getSerializationLib)) CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), @@ -390,6 +384,7 @@ private[hive] class HiveClientImpl( case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW }, schema = schema, + provider = Some("hive"), partitionColumnNames = partCols.map(_.name), sortColumnNames = Seq(), // TODO: populate this bucketColumnNames = h.getBucketCols.asScala, @@ -816,19 +811,12 @@ private[hive] class HiveClientImpl( private def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition - var storage = CatalogStorageFormat( + val storage = CatalogStorageFormat( locationUri = Option(apiPartition.getSd.getLocation), - provider = Some("hive"), - properties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap) - Option(apiPartition.getSd.getInputFormat).foreach { inFmt => - storage = storage.withInputFormat(inFmt) - } - Option(apiPartition.getSd.getOutputFormat).foreach { outFmt => - storage = storage.withOutputFormat(outFmt) - } - Option(apiPartition.getSd.getSerdeInfo.getSerializationLib).foreach { serde => - storage = storage.withSerde(serde) - } + properties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap + ).withInputFormat(Option(apiPartition.getSd.getInputFormat)) + .withOutputFormat(Option(apiPartition.getSd.getOutputFormat)) + .withSerde(Option(apiPartition.getSd.getSerdeInfo.getSerializationLib)) CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 8027f898613bd..ffe6ac2d4ce37 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -52,19 +52,13 @@ case class CreateHiveTableAsSelectCommand( import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.TextInputFormat - var newStorage = tableDesc.storage - if (newStorage.getInputFormat.isEmpty) { - newStorage = newStorage.withInputFormat(classOf[TextInputFormat].getName) + val withFormat = tableDesc.mapStorage { storage => + storage + .withInputFormat(storage.getInputFormat.orElse(Some(classOf[TextInputFormat].getName))) + .withOutputFormat(storage.getOutputFormat + .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName))) + .withSerde(storage.getSerde.orElse(Some(classOf[LazySimpleSerDe].getName))) } - if (newStorage.getOutputFormat.isEmpty) { - newStorage = newStorage.withOutputFormat( - classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName) - } - if (newStorage.getSerde.isEmpty) { - newStorage = newStorage.withSerde(classOf[LazySimpleSerDe].getName) - } - - val withFormat = tableDesc.copy(storage = newStorage) val withSchema = if (withFormat.schema.isEmpty) { // Hive doesn't support specifying the column list for target table in CTAS diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index e94584168e888..9cace78c517fe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -729,7 +729,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv schema = Seq.empty, storage = CatalogStorageFormat( locationUri = None, - provider = None, properties = Map( "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 57eae35a9bb22..bfe8568f722fd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -148,9 +148,9 @@ class VersionsSuite extends SparkFunSuite with Logging { tableType = CatalogTableType.MANAGED, schema = Seq(CatalogColumn("key", "int")), storage = CatalogStorageFormat.empty - .withInputFormat(classOf[TextInputFormat].getName) - .withOutputFormat(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName) - .withSerde(classOf[LazySimpleSerDe].getName)) + .withInputFormat(Some(classOf[TextInputFormat].getName)) + .withOutputFormat(Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName)) + .withSerde(Some(classOf[LazySimpleSerDe].getName))) } /////////////////////////////////////////////////////////////////////////// @@ -358,7 +358,7 @@ class VersionsSuite extends SparkFunSuite with Logging { val newLocation = Utils.createTempDir().getPath() val storage = storageFormat.copy(locationUri = Some(newLocation)) // needed for 0.12 alter partitions - .withSerde("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + .withSerde(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) val partition = CatalogTablePartition(spec, storage) client.alterPartitions("default", "src_part", Seq(partition)) assert(client.getPartition("default", "src_part", spec)