Skip to content

Commit

Permalink
rename partition API
Browse files Browse the repository at this point in the history
Change-Id: Ifa7655acf23f3ae6cfd70c41c91ee190ae78d4b8
  • Loading branch information
jackylee-ch committed Jul 26, 2020
1 parent 4a77db0 commit 9ff1c6c
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 459 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,87 +19,78 @@
import java.util.Map;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;

/**
* Catalog methods for working with Partitions.
* A partition interface of {@link Table} to indicate partition APIs.
* A partition is composed of identifier and properties,
* and properties contains metadata information of the partition.
* <p>
* These APIs are used to modify table partition identifier or partition metadata,
* in some cases, they will change actual value of table data as well.
*
* @since 3.0.0
*/
@Experimental
public interface SupportsPartitions extends TableCatalog {

/**
* Create partitions in an existing table, assuming it exists.
*
* @param ident a table identifier
* @param partitions transforms to use for partitioning data in the table
* @param ignoreIfExists
*/
void createPartitions(
Identifier ident,
TablePartition[] partitions,
Boolean ignoreIfExists);
public interface SupportsPartitions extends Table {

/**
* Drop partitions from a table, assuming they exist.
* Create a partition in table.
*
* @param ident a table identifier
* @param partitions a list of string map for existing partitions
* @param ignoreIfNotExists
* @param ident a new partition identifier
* @param properties the metadata of a partition
* @throws PartitionAlreadyExistsException If a partition already exists for the identifier
*/
void dropPartitions(
Identifier ident,
Map<String, String>[] partitions,
Boolean ignoreIfNotExists);
void createPartition(
InternalRow ident,
Map<String, String> properties) throws PartitionAlreadyExistsException;

/**
* Override the specs of one or many existing table partitions, assuming they exist.
* Drop a partition from table.
*
* @param ident a table identifier
* @param oldpartitions a list of string map for existing partitions to be renamed
* @param newPartitions a list of string map for new partitions
* @param ident a partition identifier
* @return true if a partition was deleted, false if no partition exists for the identifier
*/
void renamePartitions(
Identifier ident,
Map<String, String>[] oldpartitions,
Map<String, String>[] newPartitions);
Boolean dropPartition(InternalRow ident);

/**
* Alter one or many table partitions whose specs that match those specified in `parts`,
* assuming the partitions exist.
* Rename a Partition from old identifier to new identifier with no metadata changed.
*
* @param ident a table identifier
* @param partitions transforms to use for partitioning data in the table
* @param oldIdent the partition identifier of the existing partition
* @param newIdent the new partition identifier of the partition
* @throws NoSuchPartitionException If the partition identifier to rename doesn't exist
* @throws PartitionAlreadyExistsException If the new partition identifier already exists
*/
void alterPartitions(
Identifier ident,
TablePartition[] partitions);
void renamePartition(
InternalRow oldIdent,
InternalRow newIdent) throws NoSuchPartitionException, PartitionAlreadyExistsException;

/**
* Retrieve the metadata of a table partition, assuming it exists.
* Replace the partition metadata of the existing partition.
*
* @param ident a table identifier
* @param partition a list of string map for existing partitions
* @param ident the partition identifier of the existing partition
* @param properties the new metadata of the partition
* @throws NoSuchPartitionException If the partition identifier to rename doesn't exist
*/
TablePartition getPartition(
Identifier ident,
Map<String, String> partition);
void replacePartitionMetadata(
InternalRow ident,
Map<String, String> properties) throws NoSuchPartitionException;

/**
* List the names of all partitions that belong to the specified table, assuming it exists.
* Retrieve the partition metadata of the existing partition.
*
* @param ident a table identifier
* @param partition a list of string map for existing partitions
* @param ident a partition identifier
* @return the metadata of the partition
*/
String[] listPartitionNames(
Identifier ident,
Map<String, String> partition);
Map<String, String> getPartitionMetadata(InternalRow ident);

/**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
* List the identifiers of all partitions that contains the ident in a table.
*
* @param ident a table identifier
* @param partition a list of string map for existing partitions
* @param ident a prefix of partition identifier
* @return an array of Identifiers for the partitions
*/
TablePartition[] listPartitions(
Identifier ident,
Map<String, String> partition);
InternalRow[] listPartitionIdentifiers(InternalRow ident);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.StructType

/**
* Thrown by a catalog when an item already exists. The analyzer will rethrow the exception
Expand Down Expand Up @@ -48,14 +50,22 @@ class TableAlreadyExistsException(message: String) extends AnalysisException(mes
class TempTableAlreadyExistsException(table: String)
extends TableAlreadyExistsException(s"Temporary view '$table' already exists")

class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec)
extends AnalysisException(
s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, spec: TablePartitionSpec) = {
this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
}

class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec])
extends AnalysisException(
s"The following partitions already exists in table '$table' database '$db':\n"
def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = {
this(s"The following partitions already exists in table '$table' database '$db':\n"
+ specs.mkString("\n===\n"))
}

def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
this(s"Partition " +
s"${partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")} in $tableName exists")
}
}

class FunctionAlreadyExistsException(db: String, func: String)
extends AnalysisException(s"Function '$func' already exists in database '$db'")
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.StructType


/**
Expand All @@ -46,12 +48,17 @@ class NoSuchTableException(message: String) extends AnalysisException(message) {
}
}

class NoSuchPartitionException(
db: String,
table: String,
spec: TablePartitionSpec)
extends AnalysisException(
s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
class NoSuchPartitionException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, spec: TablePartitionSpec) = {
this(s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
}

def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
this(s"Partition not found in table $tableName: " +
s"${partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")}")
}
}

class NoSuchPermanentFunctionException(db: String, func: String)
extends AnalysisException(s"Function '$func' not found in database '$db'")
Expand Down
Loading

0 comments on commit 9ff1c6c

Please sign in to comment.