Skip to content

Commit

Permalink
Do the same for functions and partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Mar 16, 2016
1 parent 1d12578 commit 5bf695c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst

/**
* Identifies a `table` in `database`. If `database` is not defined, the current database is used.
* Identifies a table in a database.
* If `database` is not defined, the current database is used.
*/
private[sql] case class TableIdentifier(table: String, database: Option[String]) {
def this(table: String) = this(table, None)
Expand All @@ -33,3 +34,22 @@ private[sql] case class TableIdentifier(table: String, database: Option[String])
private[sql] object TableIdentifier {
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
}

/**
* Identifies a function in a database.
* If `database` is not defined, the current database is used.
*/
// TODO: reuse some code with TableIdentifier.
private[sql] case class FunctionIdentifier(name: String, database: Option[String]) {
def this(name: String) = this(name, None)

override def toString: String = quotedString

def quotedString: String = database.map(db => s"`$db`.`$name`").getOrElse(s"`$name`")

def unquotedString: String = database.map(db => s"$db.$name").getOrElse(name)
}

private[sql] object FunctionIdentifier {
def apply(funcName: String): FunctionIdentifier = new FunctionIdentifier(funcName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog

import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan


Expand Down Expand Up @@ -64,6 +64,8 @@ abstract class SessionCatalog(catalog: ExternalCatalog) {
// sessions as their metadata is persisted in the underlying catalog.
// ----------------------------------------------------------------------------

// Methods that interact with metastore tables only.

/**
* Create a metastore table in the database specified in `tableDefinition`.
* If no such table is specified, create it in the current database.
Expand All @@ -84,7 +86,9 @@ abstract class SessionCatalog(catalog: ExternalCatalog) {
/**
* Retrieve the metadata of an existing metastore table.
*/
def getTableMetadata(name: TableIdentifier): CatalogTable
def getTable(name: TableIdentifier): CatalogTable

// Methods that interact with temporary tables and metastore tables.

/**
* Create a temporary table.
Expand Down Expand Up @@ -140,24 +144,27 @@ abstract class SessionCatalog(catalog: ExternalCatalog) {
*/
def listTables(currentDb: String, pattern: String): Seq[TableIdentifier]

// --------------------------------------------------------------------------
// ----------------------------------------------------------------------------
// Partitions
// ----------------------------------------------------------------------------
// All methods in this category interact directly with the underlying catalog.
// --------------------------------------------------------------------------
// TODO: We need to figure out how these methods interact with our data source tables.
// For data source tables, we do not store values of partitioning columns in the metastore.
// For now, partition values of a data source table will be automatically discovered
// when we load the table.
// These methods are concerned with only metastore tables.
// ----------------------------------------------------------------------------

// TODO: We need to figure out how these methods interact with our data source
// tables. For such tables, we do not store values of partitioning columns in
// the metastore. For now, partition values of a data source table will be
// automatically discovered when we load the table.

def createPartitions(
db: String,
table: String,
currentDb: String,
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit

def dropPartitions(
db: String,
table: String,
currentDb: String,
tableName: TableIdentifier,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit

Expand All @@ -166,8 +173,8 @@ abstract class SessionCatalog(catalog: ExternalCatalog) {
* This assumes index i of `specs` corresponds to index i of `newSpecs`.
*/
def renamePartitions(
db: String,
table: String,
currentDb: String,
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit

Expand All @@ -179,69 +186,59 @@ abstract class SessionCatalog(catalog: ExternalCatalog) {
* this becomes a no-op.
*/
def alterPartitions(
db: String,
table: String,
currentDb: String,
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition]): Unit

def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
def getPartition(
currentDb: String,
tableName: TableIdentifier,
spec: TablePartitionSpec): CatalogTablePartition

def listPartitions(db: String, table: String): Seq[CatalogTablePartition]
def listPartitions(
currentDb: String,
tableName: TableIdentifier): Seq[CatalogTablePartition]

// --------------------------------------------------------------------------
// ----------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
// ----------------------------------------------------------------------------
// There are two kinds of functions, temporary functions and metastore
// functions (permanent UDFs). Temporary functions are isolated across
// sessions. Metastore functions can be used across multiple sessions as
// their metadata is persisted in the underlying catalog.
// ----------------------------------------------------------------------------

// Methods that interact with metastore functions only.

// --------------------------------------------------------------------------
// Functions: Methods for metastore functions (permanent UDFs).
// --------------------------------------------------------------------------
def createFunction(currentDb: String, funcDefinition: CatalogFunction): Unit

def createFunction(db: String, funcDefinition: CatalogFunction): Unit
def dropFunction(currentDb: String, funcName: FunctionIdentifier): Unit

/**
* Drops a permanent function with the given name from the given database.
* Alter a function whose name that matches the one specified in `funcDefinition`,
* assuming the function exists.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def dropFunction(db: String, funcName: String): Unit
def alterFunction(currentDb: String, funcDefinition: CatalogFunction): Unit

// --------------------------------------------------------------------------
// Functions: Methods for metastore functions (permanent UDFs) or temp functions.
// --------------------------------------------------------------------------
// Methods that interact with temporary functions and metastore functions.

def createTempFunction(funcDefinition: CatalogFunction): Unit

/**
* Drops a temporary function with the given name.
*/
// TODO: The reason that we distinguish dropFunction and dropTempFunction is that
// Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate
// dropFunction and dropTempFunction.
def dropTempFunction(funcName: String): Unit

def renameFunction(
specifiedDB: Option[String],
currentDB: String,
oldName: String,
newName: String): Unit
currentDb: String,
oldName: FunctionIdentifier,
newName: FunctionIdentifier): Unit

/**
* Alter a function whose name that matches the one specified in `funcDefinition`,
* assuming the function exists.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterFunction(
specifiedDB: Option[String],
currentDB: String,
funcDefinition: CatalogFunction): Unit

def getFunction(
specifiedDB: Option[String],
currentDB: String,
funcName: String): CatalogFunction

def listFunctions(
specifiedDB: Option[String],
currentDB: String,
pattern: String): Seq[String]
def getFunction(currentDb: String, funcIdentifier: FunctionIdentifier): CatalogFunction

def listFunctions(currentDb: String, pattern: String): Seq[FunctionIdentifier]

}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ abstract class ExternalCatalog {
* @param name name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
*/
// TODO: use FunctionIdentifier here.
case class CatalogFunction(name: String, className: String)


Expand Down

0 comments on commit 5bf695c

Please sign in to comment.