Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31694][SQL] Add SupportsPartitions APIs on DataSourceV2 #28617

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Map;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;

/**
* An atomic partition interface of {@link Table} to operate multiple partitions atomically.
* <p>
* These APIs are used to modify table partition or partition metadata,
* they will change the table data as well.
* ${@link #createPartitions}:
* add an array of partitions and any data they contain to the table
* ${@link #dropPartitions}:
* remove an array of partitions and any data they contain from the table
*
* @since 3.1.0
*/
@Experimental
public interface SupportsAtomicPartitionManagement extends SupportsPartitionManagement {

cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
@Override
default void createPartition(
InternalRow ident,
Map<String, String> properties)
throws PartitionAlreadyExistsException, UnsupportedOperationException {
try {
createPartitions(new InternalRow[]{ident}, new Map[]{properties});
} catch (PartitionsAlreadyExistException e) {
throw new PartitionAlreadyExistsException(e.getMessage());
}
}

@Override
default boolean dropPartition(InternalRow ident) {
return dropPartitions(new InternalRow[]{ident});
}

/**
* Create an array of partitions atomically in table.
* <p>
* If any partition already exists,
* the operation of createPartitions need to be safely rolled back.
*
* @param idents an array of new partition identifiers
* @param properties the metadata of the partitions
* @throws PartitionsAlreadyExistException If any partition already exists for the identifier
* @throws UnsupportedOperationException If partition property is not supported
*/
void createPartitions(
InternalRow[] idents,
Map<String, String>[] properties)
throws PartitionsAlreadyExistException, UnsupportedOperationException;

/**
* Drop an array of partitions atomically from table.
* <p>
* If any partition doesn't exists,
* the operation of dropPartitions need to be safely rolled back.
*
* @param idents an array of partition identifiers
* @return true if partitions were deleted, false if any partition not exists
*/
boolean dropPartitions(InternalRow[] idents);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Map;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
import org.apache.spark.sql.types.StructType;

/**
* A partition interface of {@link Table}.
* A partition is composed of identifier and properties,
* and properties contains metadata information of the partition.
* <p>
* These APIs are used to modify table partition identifier or partition metadata.
* In some cases, they will change the table data as well.
* ${@link #createPartition}:
* add a partition and any data it contains to the table
* ${@link #dropPartition}:
* remove a partition and any data it contains from the table
* ${@link #replacePartitionMetadata}:
* point a partition to a new location, which will swap one location's data for the other
*
* @since 3.1.0
*/
@Experimental
public interface SupportsPartitionManagement extends Table {

/**
* Get the partition schema of table,
* this must be consistent with ${@link Table#partitioning()}.
* @return the partition schema of table
*/
StructType partitionSchema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mention that, this must be consistent with Table.partitioning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sound reasonable to me.


/**
* Create a partition in table.
*
* @param ident a new partition identifier
* @param properties the metadata of a partition
* @throws PartitionAlreadyExistsException If a partition already exists for the identifier
* @throws UnsupportedOperationException If partition property is not supported
*/
void createPartition(
InternalRow ident,
Map<String, String> properties)
throws PartitionAlreadyExistsException, UnsupportedOperationException;

/**
* Drop a partition from table.
*
* @param ident a partition identifier
* @return true if a partition was deleted, false if no partition exists for the identifier
*/
boolean dropPartition(InternalRow ident);

/**
* Test whether a partition exists using an {@link InternalRow ident} from the table.
*
* @param ident a partition identifier
* @return true if the partition exists, false otherwise
*/
default boolean partitionExists(InternalRow ident) {
return listPartitionIdentifiers(ident).length > 0;
}

/**
* Replace the partition metadata of the existing partition.
*
* @param ident the partition identifier of the existing partition
* @param properties the new metadata of the partition
* @throws NoSuchPartitionException If the partition identifier to alter doesn't exist
* @throws UnsupportedOperationException If partition property is not supported
*/
void replacePartitionMetadata(
InternalRow ident,
Map<String, String> properties)
throws NoSuchPartitionException, UnsupportedOperationException;

/**
* Retrieve the partition metadata of the existing partition.
*
* @param ident a partition identifier
* @return the metadata of the partition
* @throws UnsupportedOperationException If partition property is not supported
*/
Map<String, String> loadPartitionMetadata(InternalRow ident)
throws UnsupportedOperationException;

/**
* List the identifiers of all partitions that contains the ident in a table.
*
* @param ident a prefix of partition identifier
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionIdentifiers(InternalRow ident);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.StructType

/**
* Thrown by a catalog when an item already exists. The analyzer will rethrow the exception
Expand Down Expand Up @@ -48,14 +50,30 @@ class TableAlreadyExistsException(message: String) extends AnalysisException(mes
class TempTableAlreadyExistsException(table: String)
extends TableAlreadyExistsException(s"Temporary view '$table' already exists")

class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec)
extends AnalysisException(
s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, spec: TablePartitionSpec) = {
this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
}

def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
this(s"Partition already exists in table $tableName:" +
partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(","))
}
}

class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec])
extends AnalysisException(
s"The following partitions already exists in table '$table' database '$db':\n"
class PartitionsAlreadyExistException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, specs: Seq[TablePartitionSpec]) {
this(s"The following partitions already exists in table '$table' database '$db':\n"
+ specs.mkString("\n===\n"))
}

def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = {
this(s"The following partitions already exists in table $tableName:" +
partitionIdents.map(_.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n"))
}
}

class FunctionAlreadyExistsException(db: String, func: String)
extends AnalysisException(s"Function '$func' already exists in database '$db'")
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.StructType


/**
Expand All @@ -46,12 +48,17 @@ class NoSuchTableException(message: String) extends AnalysisException(message) {
}
}

class NoSuchPartitionException(
db: String,
table: String,
spec: TablePartitionSpec)
extends AnalysisException(
s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
class NoSuchPartitionException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, spec: TablePartitionSpec) = {
this(s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
}

def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
this(s"Partition not found in table $tableName: "
+ partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(","))
}
}

class NoSuchPermanentFunctionException(db: String, func: String)
extends AnalysisException(s"Function '$func' not found in database '$db'")
Expand All @@ -61,10 +68,18 @@ class NoSuchFunctionException(db: String, func: String, cause: Option[Throwable]
s"Undefined function: '$func'. This function is neither a registered temporary function nor " +
s"a permanent function registered in the database '$db'.", cause = cause)

class NoSuchPartitionsException(db: String, table: String, specs: Seq[TablePartitionSpec])
extends AnalysisException(
s"The following partitions not found in table '$table' database '$db':\n"
class NoSuchPartitionsException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = {
this(s"The following partitions not found in table '$table' database '$db':\n"
+ specs.mkString("\n===\n"))
}

def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = {
this(s"The following partitions not found in table $tableName: "
+ partitionIdents.map(_.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n"))
}
}

class NoSuchTempFunctionException(func: String)
extends AnalysisException(s"Temporary function '$func' not found")
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector

import java.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType

/**
* This class is used to test SupportsAtomicPartitionManagement API.
*/
class InMemoryAtomicPartitionTable (
name: String,
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String])
extends InMemoryPartitionTable(name, schema, partitioning, properties)
with SupportsAtomicPartitionManagement {

override def createPartition(
ident: InternalRow,
properties: util.Map[String, String]): Unit = {
if (memoryTablePartitions.containsKey(ident)) {
throw new PartitionAlreadyExistsException(name, ident, partitionSchema)
} else {
memoryTablePartitions.put(ident, properties)
}
}

override def dropPartition(ident: InternalRow): Boolean = {
if (memoryTablePartitions.containsKey(ident)) {
memoryTablePartitions.remove(ident)
true
} else {
false
}
}

override def createPartitions(
idents: Array[InternalRow],
properties: Array[util.Map[String, String]]): Unit = {
if (idents.exists(partitionExists)) {
throw new PartitionsAlreadyExistException(
name, idents.filter(partitionExists), partitionSchema)
}
idents.zip(properties).foreach { case (ident, property) =>
createPartition(ident, property)
}
}

override def dropPartitions(idents: Array[InternalRow]): Boolean = {
if (!idents.forall(partitionExists)) {
return false;
}
idents.forall(dropPartition)
}
}
Loading