Skip to content

Commit

Permalink
[SPARK-16498][SQL] move hive hack for data source table into HiveExte…
Browse files Browse the repository at this point in the history
…rnalCatalog

## What changes were proposed in this pull request?

Spark SQL doesn't have its own meta store yet, and use hive's currently. However, hive's meta store has some limitations(e.g. columns can't be too many, not case-preserving, bad decimal type support, etc.), so we have some hacks to successfully store data source table metadata into hive meta store, i.e. put all the information in table properties.

This PR moves these hacks to `HiveExternalCatalog`, tries to isolate hive specific logic in one place.

changes overview:

1.  **before this PR**: we need to put metadata(schema, partition columns, etc.) of data source tables to table properties before saving it to external catalog, even the external catalog doesn't use hive metastore(e.g. `InMemoryCatalog`)
**after this PR**: the table properties tricks are only in `HiveExternalCatalog`, the caller side doesn't need to take care of it anymore.

2. **before this PR**: because the table properties tricks are done outside of external catalog, so we also need to revert these tricks when we read the table metadata from external catalog and use it. e.g. in `DescribeTableCommand` we will read schema and partition columns from table properties.
**after this PR**: The table metadata read from external catalog is exactly the same with what we saved to it.

bonus: now we can create data source table using `SessionCatalog`, if schema is specified.
breaks: `schemaStringLengthThreshold` is not configurable anymore. `hive.default.rcfile.serde` is not configurable anymore.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14155 from cloud-fan/catalog-table.
  • Loading branch information
cloud-fan authored and yhuai committed Aug 22, 2016
1 parent 91c2397 commit b2074b6
Show file tree
Hide file tree
Showing 24 changed files with 536 additions and 653 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand All @@ -51,7 +50,7 @@ private[libsvm] class LibSVMOutputWriter(
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// Storage format
val defaultStorage: CatalogStorageFormat = {
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
CatalogStorageFormat(
locationUri = None,
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
Expand Down Expand Up @@ -1115,7 +1115,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitGenericFileFormat(
ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
val source = ctx.identifier.getText
HiveSerDe.sourceToSerDe(source, conf) match {
HiveSerDe.sourceToSerDe(source) match {
case Some(s) =>
CatalogStorageFormat.empty.copy(
inputFormat = s.inputFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

package org.apache.spark.sql.execution.command

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -97,16 +92,19 @@ case class CreateDataSourceTableCommand(
}
}

CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
val table = CatalogTable(
identifier = tableIdent,
tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
schema = dataSource.schema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
options = optionsWithPath,
isExternal = isExternal)

provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = bucketSpec
)

// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(table, ignoreIfExists = false)
Seq.empty[Row]
}
}
Expand Down Expand Up @@ -193,7 +191,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
existingSchema = Some(l.schema)
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata))
existingSchema = Some(s.metadata.schema)
case o =>
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
Expand Down Expand Up @@ -233,226 +231,21 @@ case class CreateDataSourceTableAsSelectCommand(
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
schema = result.schema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
options = optionsWithPath,
isExternal = isExternal)
val schema = result.schema
val table = CatalogTable(
identifier = tableIdent,
tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
schema = schema,
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = bucketSpec
)
sessionState.catalog.createTable(table, ignoreIfExists = false)
}

// Refresh the cache of the table in the catalog.
sessionState.catalog.refreshTable(tableIdent)
Seq.empty[Row]
}
}


object CreateDataSourceTableUtils extends Logging {

val DATASOURCE_PREFIX = "spark.sql.sources."
val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."

def createDataSourceTable(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
schema: StructType,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)

// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}

if (partitionColumns.length > 0) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}

if (bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get

tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
}

if (sortColumnNames.nonEmpty) {
tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
}
}
}

val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
CatalogTableType.EXTERNAL
} else {
tableProperties.put("EXTERNAL", "FALSE")
CatalogTableType.MANAGED
}

val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf)
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = provider,
options = options)

def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
CatalogTable(
identifier = tableIdent,
tableType = tableType,
schema = new StructType,
provider = Some(provider),
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
properties = options
),
properties = tableProperties.toMap)
}

def newHiveCompatibleMetastoreTable(
relation: HadoopFsRelation,
serde: HiveSerDe): CatalogTable = {
assert(partitionColumns.isEmpty)
assert(relation.partitionSchema.isEmpty)

CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = CatalogStorageFormat(
locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
inputFormat = serde.inputFormat,
outputFormat = serde.outputFormat,
serde = serde.serde,
compressed = false,
properties = options
),
schema = relation.schema,
provider = Some(provider),
properties = tableProperties.toMap,
viewText = None)
}

// TODO: Support persisting partitioned data source relations in Hive compatible format
val qualifiedTableName = tableIdent.quotedString
val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
val resolvedRelation = dataSource.resolveRelation(checkPathExist = false)
val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match {
case _ if skipHiveMetadata =>
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
(None, message)

case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 &&
relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty =>
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
val message =
s"Persisting data source relation $qualifiedTableName with a single input path " +
s"into Hive metastore in Hive compatible format. Input path: " +
s"${relation.location.paths.head}."
(Some(hiveTable), message)

case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty =>
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
"Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
(None, message)

case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty =>
val message =
s"Persisting bucketed data source relation $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
"Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
(None, message)

case (Some(serde), relation: HadoopFsRelation) =>
val message =
s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
s"Input paths: " + relation.location.paths.mkString("\n", "\n", "")
(None, message)

case (Some(serde), _) =>
val message =
s"Data source relation $qualifiedTableName is not a " +
s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
"in Spark SQL specific format, which is NOT compatible with Hive."
(None, message)

case _ =>
val message =
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
s"Spark SQL specific format, which is NOT compatible with Hive."
(None, message)
}

(hiveCompatibleTable, logMessage) match {
case (Some(table), message) =>
// We first try to save the metadata of the table in a Hive compatible way.
// If Hive throws an error, we fall back to save its metadata in the Spark SQL
// specific way.
try {
logInfo(message)
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
s"it into Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
val table = newSparkSQLSpecificMetastoreTable()
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
}

case (None, message) =>
logWarning(message)
val table = newSparkSQLSpecificMetastoreTable()
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
}
}
}
Loading

0 comments on commit b2074b6

Please sign in to comment.