Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Feb 27, 2015
1 parent 143927a commit 73e71b4
Showing 1 changed file with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,22 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val table = synchronized {
client.getTable(in.database, in.name)
}
val schemaString = Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match {
case Some(numOfParts) =>
val parts = (0 until numOfParts.toInt).map { index =>
Option(table.getProperty(s"spark.sql.sources.schema.part.${index}"))
.getOrElse("Could not read schema from the metastore because it is corrupted.")
val userSpecifiedSchema =
Option(table.getProperty("spark.sql.sources.schema.numParts")).flatMap { numParts =>
val parts = (0 until numParts.toInt).map { index =>
val part = table.getProperty(s"spark.sql.sources.schema.part.${index}")
if (part == null) {
throw new AnalysisException(
"Could not read schema from the metastore because it is corrupted.")
}

part
}
// Stick all parts back to a single schema string in the JSON representation.
Some(parts.mkString)
case None => None // The schema was not defined.
// Stick all parts back to a single schema string in the JSON representation
// and convert it back to a StructType.
Some(DataType.fromJson(parts.mkString).asInstanceOf[StructType])
}

val userSpecifiedSchema =
schemaString.flatMap(s => Some(DataType.fromJson(s).asInstanceOf[StructType]))
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
Expand Down Expand Up @@ -129,10 +132,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val schemaJsonString = userSpecifiedSchema.get.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString)
parts.zipWithIndex.foreach {
case (part, index) =>
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
tbl.setProperty("spark.sql.sources.schema.numParts", parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
}
}
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
Expand Down

0 comments on commit 73e71b4

Please sign in to comment.