Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16397][SQL] make CatalogTable more general and less hive specific #14071

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ statement
(PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? AS? query #createTableUsing
| createTableHeader ('(' columns=colTypeList ')')?
(COMMENT STRING)?
(COMMENT comment=STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
bucketSpec? skewSpec?
rowFormat? createFileFormat? locationSpec?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,38 +42,54 @@ case class CatalogFunction(

/**
* Storage format, used to describe how a partition or a table is stored.
*
* @param locationUri the storage location of the table/partition, can be None if it has no storage,
* e.g. view, or it's not file-based, e.g. JDBC data source table.
* @param properties used to store some extra information for the storage.
*/
case class CatalogStorageFormat(
locationUri: Option[String],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never persist this flag into external catalog.

inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
serdeProperties: Map[String, String]) {
properties: Map[String, String]) {

override def toString: String = {
val serdePropsToString =
if (serdeProperties.nonEmpty) {
s"Properties: " + serdeProperties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
val propertiesToString =
if (properties.nonEmpty) {
s"Properties: " + properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
} else {
""
}
val output =
Seq(locationUri.map("Location: " + _).getOrElse(""),
inputFormat.map("InputFormat: " + _).getOrElse(""),
outputFormat.map("OutputFormat: " + _).getOrElse(""),
if (compressed) "Compressed" else "",
serde.map("Serde: " + _).getOrElse(""),
serdePropsToString)
val output = Seq(
locationUri.map("Location: " + _).getOrElse(""),
propertiesToString)
output.filter(_.nonEmpty).mkString("Storage(", ", ", ")")
}

// TODO: remove these hive hacks

def withInputFormat(inFmt: Option[String]): CatalogStorageFormat = {
inFmt.map(i => copy(properties = properties + ("inputFormat" -> i))).getOrElse(this)
}

def withOutputFormat(outFmt: Option[String]): CatalogStorageFormat = {
outFmt.map(o => copy(properties = properties + ("outputFormat" -> o))).getOrElse(this)
}

def withSerde(serde: Option[String]): CatalogStorageFormat = {
serde.map(s => copy(properties = properties + ("serde" -> s))).getOrElse(this)
}

def getInputFormat: Option[String] = properties.get("inputFormat")

def getOutputFormat: Option[String] = properties.get("outputFormat")

def getSerde: Option[String] = properties.get("serde")

def getProperties: Map[String, String] = properties - "inputFormat" - "outputFormat" - "serde"
}

object CatalogStorageFormat {
/** Empty storage format for default values and copies. */
val empty = CatalogStorageFormat(locationUri = None, inputFormat = None,
outputFormat = None, serde = None, compressed = false, serdeProperties = Map.empty)
val empty = CatalogStorageFormat(None, Map.empty)
}

/**
Expand Down Expand Up @@ -115,6 +131,8 @@ case class CatalogTablePartition(
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
*
* @param provider the name of the data source provider, e.g. parquet, json, etc.
* Note that `hive` is also one kind of provider.
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
* underlying table but not supported by Spark SQL yet.
*/
Expand All @@ -123,6 +141,7 @@ case class CatalogTable(
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
provider: Option[String] = None,
partitionColumnNames: Seq[String] = Seq.empty,
sortColumnNames: Seq[String] = Seq.empty,
bucketColumnNames: Seq[String] = Seq.empty,
Expand Down Expand Up @@ -158,16 +177,9 @@ case class CatalogTable(
/** Return the fully qualified name of this table, assuming the database was specified. */
def qualifiedName: String = identifier.unquotedString

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
locationUri: Option[String] = storage.locationUri,
inputFormat: Option[String] = storage.inputFormat,
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
serde: Option[String] = storage.serde,
serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = {
copy(storage = CatalogStorageFormat(
locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties))
/** Syntactic sugar to update the `storage`. */
def mapStorage(f: CatalogStorageFormat => CatalogStorageFormat): CatalogTable = {
copy(storage = f(storage))
}

override def toString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(oldPart2.storage.locationUri != Some(newLocation))
// alter other storage information
catalog.alterPartitions("db2", "tbl2", Seq(
oldPart1.copy(storage = storageFormat.copy(serde = Some(newSerde))),
oldPart2.copy(storage = storageFormat.copy(serdeProperties = newSerdeProps))))
oldPart1.copy(storage = storageFormat.withSerde(Some(newSerde))),
oldPart2.copy(storage = storageFormat.copy(properties = newSerdeProps))))
val newPart1b = catalog.getPartition("db2", "tbl2", part1.spec)
val newPart2b = catalog.getPartition("db2", "tbl2", part2.spec)
assert(newPart1b.storage.serde == Some(newSerde))
assert(newPart2b.storage.serdeProperties == newSerdeProps)
assert(newPart1b.storage.getSerde == Some(newSerde))
assert(newPart2b.storage.getProperties == newSerdeProps)
// alter but change spec, should fail because new partition specs do not exist yet
val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
Expand Down Expand Up @@ -550,7 +550,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
storage = CatalogStorageFormat.empty,
schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
)

Expand All @@ -569,7 +569,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
None, None, None, false, Map.empty),
Map.empty),
schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
)
catalog.createTable("db1", externalTable, ignoreIfExists = false)
Expand All @@ -582,7 +582,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
storage = CatalogStorageFormat.empty,
schema = Seq(
CatalogColumn("col1", "int"),
CatalogColumn("col2", "string"),
Expand All @@ -609,7 +609,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
Map("a" -> "7", "b" -> "8"),
CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
None, None, None, false, Map.empty)
Map.empty)
)
catalog.createPartitions("db1", "tbl", Seq(externalPartition), ignoreIfExists = false)
assert(!exists(databaseDir, "tbl", "a=7", "b=8"))
Expand All @@ -628,13 +628,9 @@ abstract class CatalogTestUtils {
def newEmptyCatalog(): ExternalCatalog

// These fields must be lazy because they rely on fields that are not implemented yet
lazy val storageFormat = CatalogStorageFormat(
locationUri = None,
inputFormat = Some(tableInputFormat),
outputFormat = Some(tableOutputFormat),
serde = None,
compressed = false,
serdeProperties = Map.empty)
lazy val storageFormat = CatalogStorageFormat.empty
.withInputFormat(Some(tableInputFormat))
.withOutputFormat(Some(tableOutputFormat))
lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,21 +949,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// to include the partition columns here explicitly
val schema = cols ++ partitionCols

// Storage format
val defaultStorage: CatalogStorageFormat = {
// default input/output format
val (defaultInputFormat, defaultOutputFormat) = {
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
CatalogStorageFormat(
locationUri = None,
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
// Note: Keep this unspecified because we use the presence of the serde to decide
// whether to convert a table created by CTAS to a datasource table.
serde = None,
compressed = false,
serdeProperties = Map())
val inFmt = defaultHiveSerde.flatMap(_.inputFormat)
.getOrElse("org.apache.hadoop.mapred.TextInputFormat")
val outFmt = defaultHiveSerde.flatMap(_.outputFormat)
.getOrElse("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
inFmt -> outFmt
}
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
Expand All @@ -977,11 +971,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
val storage = CatalogStorageFormat(
locationUri = location,
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
compressed = false,
serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties)
properties = rowStorage.getProperties ++ fileStorage.getProperties
).withInputFormat(Some(defaultInputFormat))
.withInputFormat(fileStorage.getInputFormat)
.withOutputFormat(Some(defaultOutputFormat))
.withOutputFormat(fileStorage.getOutputFormat)
// if both file format and row format specifies serde, the one from row format wins.
.withSerde(fileStorage.getSerde)
.withSerde(rowStorage.getSerde)
// If location is defined, we'll assume this is an external table.
// Otherwise, we may accidentally delete existing data.
val tableType = if (external || location.isDefined) {
Expand Down Expand Up @@ -1085,9 +1082,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*/
override def visitTableFileFormat(
ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
CatalogStorageFormat.empty.copy(
inputFormat = Option(string(ctx.inFmt)),
outputFormat = Option(string(ctx.outFmt)))
CatalogStorageFormat.empty
.withInputFormat(Option(ctx.inFmt).map(string))
.withOutputFormat(Option(ctx.outFmt).map(string))
}

/**
Expand All @@ -1098,10 +1095,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val source = ctx.identifier.getText
HiveSerDe.sourceToSerDe(source, conf) match {
case Some(s) =>
CatalogStorageFormat.empty.copy(
inputFormat = s.inputFormat,
outputFormat = s.outputFormat,
serde = s.serde)
CatalogStorageFormat.empty
.withInputFormat(s.inputFormat)
.withOutputFormat(s.outputFormat)
.withSerde(s.serde)
case None =>
operationNotAllowed(s"STORED AS with file format '$source'", ctx)
}
Expand Down Expand Up @@ -1138,9 +1135,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitRowFormatSerde(
ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
import ctx._
CatalogStorageFormat.empty.copy(
serde = Option(string(name)),
serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
CatalogStorageFormat.empty
.copy(properties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
.withSerde(Option(name).map(string))
}

/**
Expand Down Expand Up @@ -1168,7 +1165,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx)
"line.delim" -> value
}
CatalogStorageFormat.empty.copy(serdeProperties = entries.toMap)
CatalogStorageFormat.empty.copy(properties = entries.toMap)
}

/**
Expand Down Expand Up @@ -1296,6 +1293,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) {
ctx.colType.asScala.map { col =>
CatalogColumn(
// TODO: it should be case preserving.
col.identifier.getText.toLowerCase,
// Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
// just convert the whole type string to lower case, otherwise the struct field names
Expand Down Expand Up @@ -1344,7 +1342,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {

case c: RowFormatSerdeContext =>
// Use a serde format.
val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c)
val props = Option(c.props).map(visitPropertyKeyValues).getOrElse(Nil)
val name = string(c.name)

// SPARK-10310: Special cases LazySimpleSerDe
val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,7 @@ object CreateDataSourceTableUtils extends Logging {
schema = Nil,
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
serdeProperties = options
properties = options
),
properties = tableProperties.toMap)
}
Expand All @@ -403,17 +399,18 @@ object CreateDataSourceTableUtils extends Logging {
assert(partitionColumns.isEmpty)
assert(relation.partitionSchema.isEmpty)

val storage = CatalogStorageFormat(
locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
properties = options)
.withInputFormat(serde.inputFormat)
.withOutputFormat(serde.outputFormat)
.withSerde(serde.serde)

CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = CatalogStorageFormat(
locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
inputFormat = serde.inputFormat,
outputFormat = serde.outputFormat,
serde = serde.serde,
compressed = false,
serdeProperties = options
),
storage = storage,
provider = Some("hive"),
schema = relation.schema.map { f =>
CatalogColumn(f.name, f.dataType.catalogString)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,19 @@ case class AlterTableSerDePropertiesCommand(
"not supported for tables created with the datasource API")
}
if (partSpec.isEmpty) {
val newTable = table.withNewStorage(
serde = serdeClassName.orElse(table.storage.serde),
serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))
val newTable = table.mapStorage { storage =>
storage
.copy(properties = storage.properties ++ serdeProperties.getOrElse(Map.empty))
.withSerde(serdeClassName)
}
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
val part = catalog.getPartition(tableName, spec)
val newPart = part.copy(storage = part.storage.copy(
serde = serdeClassName.orElse(part.storage.serde),
serdeProperties = part.storage.serdeProperties ++ serdeProperties.getOrElse(Map())))
val newStorage = part.storage.copy(
properties = part.storage.properties ++ serdeProperties.getOrElse(Map.empty)
).withSerde(serdeClassName)
val newPart = part.copy(storage = newStorage)
catalog.alterPartitions(tableName, Seq(newPart))
}
Seq.empty[Row]
Expand Down Expand Up @@ -464,11 +467,13 @@ case class AlterTableSetLocationCommand(
// No partition spec is specified, so we set the location for the table itself
val newTable =
if (DDLUtils.isDatasourceTable(table)) {
table.withNewStorage(
locationUri = Some(location),
serdeProperties = table.storage.serdeProperties ++ Map("path" -> location))
table.mapStorage { storage =>
storage.copy(
locationUri = Some(location),
properties = storage.properties + ("path" -> location))
}
} else {
table.withNewStorage(locationUri = Some(location))
table.mapStorage(_.copy(locationUri = Some(location)))
}
catalog.alterTable(newTable)
}
Expand Down
Loading