Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Aug 19, 2016
1 parent 96d57b6 commit 6ca8909
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ case class CreateDataSourceTableCommand(
bucketSpec = bucketSpec
)

sessionState.catalog.createTable(table, ignoreIfExists)
// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(table, ignoreIfExists = false)
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
Expand Down Expand Up @@ -164,26 +165,62 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
val db = tableDefinition.identifier.database.get
requireDbExists(db)
verifyTableProperties(tableDefinition)
// We can't create index table currently.
assert(tableDefinition.tableType != INDEX)
// All tables except view must have a provider.
assert(tableDefinition.tableType == VIEW || tableDefinition.provider.isDefined)

// For view or Hive serde tables, they are guaranteed to be Hive compatible and we save them
// to Hive metastore directly. Otherwise, we need to put table metadata to table properties to
// work around some hive metastore problems, e.g. not case-preserving, bad decimal type support.
if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) {
client.createTable(tableDefinition, ignoreIfExists)
} else {
// Before saving data source table metadata into Hive metastore, we should:
// 1. Put table schema, partition column names and bucket specification in table properties.
// 2. Check if this table is hive compatible
// 2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty
// and save table metadata to Hive.
// 2.1 If it's hive compatible, set serde information in table metadata and try to save
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1

val tableProperties = tableMetadataToProperties(tableDefinition)
// Before saving data source table metadata into Hive metastore, we should:
// 1. Put table schema, partition column names and bucket specification in table properties.
// 2. Check if this table is hive compatible
// 2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty
// and save table metadata to Hive.
// 2.1 If it's hive compatible, set serde information in table metadata and try to save
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
if (DDLUtils.isDatasourceTable(tableDefinition)) {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
val provider = tableDefinition.provider.get
val partitionColumns = tableDefinition.partitionColumnNames
val bucketSpec = tableDefinition.bucketSpec

val tableProperties = new scala.collection.mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)

// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
// TODO: the threshold should be set by `spark.sql.sources.schemaStringLengthThreshold`,
// however the current SQLConf is session isolated, which is not applicable to external
// catalog. We should re-enable this conf instead of hard code the value here, after we have
// global SQLConf.
val threshold = 4000
val schemaJsonString = tableDefinition.schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}

if (partitionColumns.nonEmpty) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}

if (bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get

tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
}

if (sortColumnNames.nonEmpty) {
tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
}
}
}

// converts the table metadata to Spark SQL specific format, i.e. set schema, partition column
// names and bucket specification to empty.
Expand Down Expand Up @@ -269,60 +306,14 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
logWarning(message)
saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
}
} else {
client.createTable(tableDefinition, ignoreIfExists)
}
}

/**
* Converts table schema, partition column names and bucket specification to a (String, String)
* map, which will be put into table properties later.
*/
private def tableMetadataToProperties(table: CatalogTable): Map[String, String] = {
val properties = new scala.collection.mutable.HashMap[String, String]
properties.put(DATASOURCE_PROVIDER, table.provider.get)

// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
// TODO: the threshold should be set by `spark.sql.sources.schemaStringLengthThreshold`, however
// the current SQLConf is session isolated, which is not applicable to external catalog. We
// should re-enable this conf instead of hard code the value here, after we have global SQLConf.
val threshold = 4000
val schemaJsonString = table.schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}

if (table.partitionColumnNames.nonEmpty) {
properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, table.partitionColumnNames.length.toString)
table.partitionColumnNames.zipWithIndex.foreach { case (partCol, index) =>
properties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}

if (table.bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = table.bucketSpec.get

properties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
properties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
properties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
}

if (sortColumnNames.nonEmpty) {
properties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
properties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
}
}
}

properties.toMap
}

private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
assert(DDLUtils.isDatasourceTable(tableDefinition),
"saveTableIntoHive only takes data source table.")
// If this is an external data source table...
if (tableDefinition.tableType == EXTERNAL &&
// ... that is not persisted as Hive compatible format (external tables in Hive compatible
Expand Down Expand Up @@ -384,20 +375,21 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
requireTableExists(db, tableDefinition.identifier.table)
verifyTableProperties(tableDefinition)

if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) {
client.alterTable(tableDefinition)
} else {
if (DDLUtils.isDatasourceTable(tableDefinition)) {
val oldDef = client.getTable(db, tableDefinition.identifier.table)
// Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
// to retain the spark specific format if it is.
// Also add table meta properties to table properties, to retain the data source table format.
// to retain the spark specific format if it is. Also add old data source properties to table
// properties, to retain the data source table format.
val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
val newDef = tableDefinition.copy(
schema = oldDef.schema,
partitionColumnNames = oldDef.partitionColumnNames,
bucketSpec = oldDef.bucketSpec,
properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties)
properties = oldDataSourceProps ++ tableDefinition.properties)

client.alterTable(newDef)
} else {
client.alterTable(tableDefinition)
}
}

Expand All @@ -421,6 +413,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
table
} else {
getProviderFromTableProperties(table).map { provider =>
assert(provider != "hive", "Hive serde table should not save provider in table properties.")
// SPARK-15269: Persisted data source tables always store the location URI as a storage
// property named "path" instead of standard Hive `dataLocation`, because Hive only
// allows directory paths as location URIs while Spark SQL data source tables also
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
}

// To test `HiveExternalCatalog`, we need to read the raw table metadata(schema, partition
// columns and bucket specification are still in table properties) from hive client.
private def hiveClient: HiveClient = sharedState.asInstanceOf[HiveSharedState].metadataHive

test("persistent JSON table") {
Expand Down

0 comments on commit 6ca8909

Please sign in to comment.