Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jul 13, 2016
1 parent 87fbff0 commit d18ac59
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,10 @@ case class CatalogFunction(
*
* @param locationUri the storage location of the table/partition, can be None if it has no storage,
* e.g. view, or it's not file-based, e.g. JDBC data source table.
* @param provider the name of the data source provider, e.g. parquet, json, etc. Can be None if it
* has no storage. Note that `hive` is also one kind of provider.
* @param properties used to store some extra information for the storage.
*/
case class CatalogStorageFormat(
locationUri: Option[String],
provider: Option[String],
properties: Map[String, String]) {

override def toString: String = {
Expand All @@ -63,23 +60,22 @@ case class CatalogStorageFormat(
}
val output = Seq(
locationUri.map("Location: " + _).getOrElse(""),
provider.map("Provider: " + _).getOrElse(""),
propertiesToString)
output.filter(_.nonEmpty).mkString("Storage(", ", ", ")")
}

// TODO: remove these hive hacks

def withInputFormat(inFmt: String): CatalogStorageFormat = {
this.copy(properties = properties + ("inputFormat" -> inFmt))
def withInputFormat(inFmt: Option[String]): CatalogStorageFormat = {
inFmt.map(i => copy(properties = properties + ("inputFormat" -> i))).getOrElse(this)
}

def withOutputFormat(outFmt: String): CatalogStorageFormat = {
this.copy(properties = properties + ("outputFormat" -> outFmt))
def withOutputFormat(outFmt: Option[String]): CatalogStorageFormat = {
outFmt.map(o => copy(properties = properties + ("outputFormat" -> o))).getOrElse(this)
}

def withSerde(serde: String): CatalogStorageFormat = {
this.copy(properties = properties + ("serde" -> serde))
def withSerde(serde: Option[String]): CatalogStorageFormat = {
serde.map(s => copy(properties = properties + ("serde" -> s))).getOrElse(this)
}

def getInputFormat: Option[String] = properties.get("inputFormat")
Expand All @@ -93,7 +89,7 @@ case class CatalogStorageFormat(

object CatalogStorageFormat {
/** Empty storage format for default values and copies. */
val empty = CatalogStorageFormat(None, None, Map.empty)
val empty = CatalogStorageFormat(None, Map.empty)
}

/**
Expand Down Expand Up @@ -135,6 +131,8 @@ case class CatalogTablePartition(
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
*
* @param provider the name of the data source provider, e.g. parquet, json, etc.
* Note that `hive` is also one kind of provider.
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
* underlying table but not supported by Spark SQL yet.
*/
Expand All @@ -143,6 +141,7 @@ case class CatalogTable(
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
provider: Option[String] = None,
partitionColumnNames: Seq[String] = Seq.empty,
sortColumnNames: Seq[String] = Seq.empty,
bucketColumnNames: Seq[String] = Seq.empty,
Expand Down Expand Up @@ -178,6 +177,11 @@ case class CatalogTable(
/** Return the fully qualified name of this table, assuming the database was specified. */
def qualifiedName: String = identifier.unquotedString

/** Syntactic sugar to update the `storage`. */
def mapStorage(f: CatalogStorageFormat => CatalogStorageFormat): CatalogTable = {
copy(storage = f(storage))
}

override def toString: String = {
val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
val partitionColumns = partitionColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(oldPart2.storage.locationUri != Some(newLocation))
// alter other storage information
catalog.alterPartitions("db2", "tbl2", Seq(
oldPart1.copy(storage = storageFormat.withSerde(newSerde)),
oldPart1.copy(storage = storageFormat.withSerde(Some(newSerde))),
oldPart2.copy(storage = storageFormat.copy(properties = newSerdeProps))))
val newPart1b = catalog.getPartition("db2", "tbl2", part1.spec)
val newPart2b = catalog.getPartition("db2", "tbl2", part2.spec)
Expand Down Expand Up @@ -569,7 +569,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
None, Map.empty),
Map.empty),
schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
)
catalog.createTable("db1", externalTable, ignoreIfExists = false)
Expand Down Expand Up @@ -609,7 +609,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
Map("a" -> "7", "b" -> "8"),
CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
None, Map.empty)
Map.empty)
)
catalog.createPartitions("db1", "tbl", Seq(externalPartition), ignoreIfExists = false)
assert(!exists(databaseDir, "tbl", "a=7", "b=8"))
Expand All @@ -629,8 +629,8 @@ abstract class CatalogTestUtils {

// These fields must be lazy because they rely on fields that are not implemented yet
lazy val storageFormat = CatalogStorageFormat.empty
.withInputFormat(tableInputFormat)
.withOutputFormat(tableOutputFormat)
.withInputFormat(Some(tableInputFormat))
.withOutputFormat(Some(tableOutputFormat))
lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
Expand Down
Loading

0 comments on commit d18ac59

Please sign in to comment.