-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31694][SQL] Add SupportsPartitions APIs on DataSourceV2 #28617
Conversation
Change-Id: If8ae497644895167fd0f75de863411c3c37e2662
Change-Id: I56cd7d5f02b4fe9018a25bcc901bc23e6acaaed4
Change-Id: I39d0a5457b11dc071962f8e60d9a580fb9db1ed6
ok to test |
Test build #124963 has finished for PR 28617 at commit
|
*/ | ||
void dropPartitions( | ||
Identifier ident, | ||
Map<String, String>[] partitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few cases here where partitions are referred to as Map<String, String> and a few times where they use the TablePartition class. I think it would probably make more sense if they were all TablePartition (the class) unless there is a significant reason for them not to be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TablePartition Contains the partition metadata, it's too heavy for this. As for Transform, it maybe a good choice if it can pass partition value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then is this partitionSpec and not "partition"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to decide how to pass the data that identifies a partition.
There have been a lot of problems over the years working with Hive partitions because values are coerced to and from String. Often, people get the conversions slightly wrong. I think a better approach is to use a row of values to pass partition data between Spark and a source. We already pass typed rows in the read and write APIs, so it would be reasonable to do so here as well.
One benefit of using a typed row to represent partition data is that we can directly use a listPartitions
call for metadata queries.
This would also align more closely with how Spark handles partitions internally. From PartitioningUtils:
/**
* Holds a directory in a partitioned collection of files as well as the partition values
* in the form of a Row. Before scanning, the files at `path` need to be enumerated.
*/
case class PartitionPath(values: InternalRow, path: Path)
case class PartitionSpec(
partitionColumns: StructType,
partitions: Seq[PartitionPath])
A table that implements SupportsPartitions
could return a partitionType(): StructType
that describes the partition rows it accepts and produces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more thing: using a row to pass a tuple would make it possible to also get the partition that an input split (e.g., file) belongs to. That would be useful for storage-partitioned joins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good Point! The partition identifiers can be written like PartitionSpec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was actually thinking that partitions would be identified more like PartitionPath
. In this API, I'm not sure if the Path
part of PartitionPath
is needed, since sources may not need to expose it to Spark. (In Iceberg, for example, there is no partition path.)
I think just using an InternalRow
to identify a partition is a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, I have changed it to InternalRow
. The definition in PartitioningUtils is a very good idea, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Further think about this. InternalRow
only contains the data of partition identifier, not partition columns. That means user must make partition data always in order. Is that reasonable to user? @rdblue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is reasonable. While this is a public API, the user here is a connector developer and they are expected to be able to produce InternalRow
in other places. I think this is actually a good thing because we don't need to pass the names every time.
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public class TablePartition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partitions are already referred to in other parts of the catalog with the Transform class, do we need this as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your CR.
Transform doest not contains actual partition values and partition metadata. In many cases, we need to know the metadata of a partition.
Please correct me if there is something wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class and method needs documentation, it might also help clarify how this differents from Transform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reply, I will doing it.
void createPartitions( | ||
Identifier ident, | ||
TablePartition[] partitions, | ||
Boolean ignoreIfExists); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this be boolean? also please add to the javadoc for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be renamed to createPartition(identifier, properties)
TablePartition[] partitions); | ||
|
||
/** | ||
* Retrieve the metadata of a table partition, assuming it exists. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if they don't exist? Is an exception thrown. I don't know if this documentation style is consistent with how spark does it but I would expect something like:
Retrieve the metadata of a table partition.
@throws ParitionNotFoundException ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorrry, I will rewrite the document. The NoSuchPartition should be thrown.
* Retrieve the metadata of a table partition, assuming it exists. | ||
* | ||
* @param ident a table identifier | ||
* @param partition a list of string map for existing partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't exactly clear what the keys and values of the maps are here. It also does not appear to be a list which I find confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorrry, I will rewrite the document. Thanks for pay attention to this pr
|
||
public class TablePartition { | ||
private Map<String, String> partitionSpec; | ||
private Map<String, String> parametes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo "parameters"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thanks
* Catalog methods for working with Partitions. | ||
*/ | ||
@Experimental | ||
public interface SupportsPartitions extends TableCatalog { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason to extend TableCatalog
instead of Table
? I think it would be better to support partitions at a table level.
Doing it this way creates more complexity for implementations because they need to handle more cases. For example, if the table doesn't exist, this should throw NoSuchTableException
just like loadTable
. It would be simpler for the API if these methods were used to manipulate a table, not to load and manipulate a table. Loading should be orthogonal to partition operations.
Another issue is that this assumes a table catalog contains tables that support partitions, or tables that do not. But Spark's built-in catalog supports some sources that don't expose partitions and some that do. This would cause more work for many catalogs, which would need to detect whether a table has support and throw UnsupportedOperationException
if it does not. That also makes integration more difficult for Spark because it can't check a table in the analyzer to determine whether it supports the operation or not. Instead, Spark would need to handle exceptions at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me.
The reason I want defined it as Catalog API is I think Catalog API
is used to manage metadata for Partition and Table API
is used for the actual data operation.
However, as you said, there are some source, such as mysql or FileTable will use partition API to manage partition data. Thus making Partition API
as part of Table API
is a better way.
Thanks
* @param partitions a list of string map for existing partitions | ||
* @param ignoreIfNotExists | ||
*/ | ||
void dropPartitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other places in the v2 API, we leave ifNotExists
to Spark so that we don't need to pass it. Instead of passing an extra parameter, the method returns a boolean to indicate whether the partition was dropped or if no action was taken. Either way, the partition should not exist after the method call so it is idempotent.
Then, Spark decides whether to throw an exception because the partition did not exist. We prefer that pattern of doing more in Spark to make behavior standard across sources and to make the requirements as simple as we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I will change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be renamed to boolean dropPartition(identifier)
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java
Outdated
Show resolved
Hide resolved
*/ | ||
String[] listPartitionNames( | ||
Identifier ident, | ||
Map<String, String> partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't clear what this does. What is a partition name? Are you referring to the "key" in Hive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will return the partition identifiers, may be filtered by a partition identifier, in this table. Not just the key
in Hive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be changed to Row[] listPartitionIdentifiers(identifier); The
identifier` in parameter is used to find partition identifiers in which the parameter is part of then..
Thanks for working on this, @stczwd! I think it would be great to get this into 3.1, if possible, to support some of the existing SQL that doesn't work with v2 (like ADD/DROP PARTITION). The main things I think should change are:
I like that your |
@rdblue Thanks for your attention and advices. It would be great if we can have this in 3.1, a lots of things can be done after BTW, the reason using |
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitions.java
Outdated
Show resolved
Hide resolved
Change-Id: Ifa7655acf23f3ae6cfd70c41c91ee190ae78d4b8
Test build #126569 has finished for PR 28617 at commit
|
Change-Id: I1438992637bfb20a68b71c078610171fd576ade8
Test build #126570 has finished for PR 28617 at commit
|
retest this please |
Test build #127273 has finished for PR 28617 at commit
|
* These APIs are used to modify table partition or partition metadata, | ||
* they will change the table data as well. | ||
* ${@link #createPartitions}: | ||
* add an array of partitions and any data that their location contains to the table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: their location contains
-> they contain
, to be consistent with the doc of dropPartitions
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
.../src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java
Show resolved
Hide resolved
* @throws NoSuchPartitionsException If any partition identifier to alter doesn't exist | ||
* @throws UnsupportedOperationException If partition property is not supported | ||
*/ | ||
void replacePartitionMetadatas( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which command needs it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, AlterTableSerDePropertiesCommand
and AlterTableSetLocationCommand
use this API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the parser rules, they can't change multiple partitions at once:
| ALTER TABLE multipartIdentifier (partitionSpec)?
SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)? #setTableSerDe
| ALTER TABLE multipartIdentifier (partitionSpec)?
SET SERDEPROPERTIES tablePropertyList #setTableSerDe
I think we don't need this batch API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this because replacePartitionMetadata
also operate on partition data, it should also be atomically if support multiple partition operations. If we don’t need it currently, it can be deleted.
* These APIs are used to modify table partition identifier or partition metadata. | ||
* In some cases, they will change the table data as well. | ||
* ${@link #createPartition}: | ||
* add a partition and any data that its location contains to the table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: its location contains
-> it contains
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Change-Id: I4870b050b2979991d039407e7c99a638b2eb8cd0
Test build #127325 has finished for PR 28617 at commit
|
Change-Id: I0c3c6c44c3f7a6187159c22aa91c7c2de204acee
Test build #127328 has finished for PR 28617 at commit
|
Change-Id: I902fe987e6685aa51386bda4c65e19998f317ee8
Change-Id: I14207ee891274be86c591c68abd9e66666c15d03
Test build #127334 has finished for PR 28617 at commit
|
Test build #127338 has finished for PR 28617 at commit
|
* the operation of dropPartitions need to be safely rolled back. | ||
* | ||
* @param idents an array of partition identifiers | ||
* @throws NoSuchPartitionsException If any partition identifier to drop doesn't exist |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we be consistent with dropPartition
? which doesn't require you to throw exception for non-existing partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The partitions will be checked before dropPartitions in AlterTableDropPartitionExec
, thus NoSuchPartitionsException won't need here.
It's ok we return boolean.
/** | ||
* @return the partition schema of table | ||
*/ | ||
StructType partitionSchema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we mention that, this must be consistent with Table.partitioning
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, sound reasonable to me.
boolean dropPartition(InternalRow ident); | ||
|
||
/** | ||
* Test whether a partition exists using an {@link Identifier identifier} from the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an {@link Identifier identifier} from the table.
This is nothing about Identifier
. I think you mean partition identifier
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, wrong comment, I change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except several minor comments
Change-Id: Id45c2cfd3acbbd6fbcb88d0ac1452fc8aa4c19fa
Change-Id: Icbe0edf5eec106dd7dff5ce3463cff82ec0310a0
Test build #127383 has finished for PR 28617 at commit
|
Change-Id: I2d07560c289b6b90092f778a4763a578f938c887
Test build #127379 has finished for PR 28617 at commit
|
thanks, merging to master! |
Thank you all! |
Test build #127384 has finished for PR 28617 at commit
|
Thanks for your help and support, @rdblue @cloud-fan @dongjoon-hyun |
…asourcev2 ### What changes were proposed in this pull request? This patch is trying to add `AlterTableAddPartitionExec` and `AlterTableDropPartitionExec` with the new table partition API, defined in #28617. ### Does this PR introduce _any_ user-facing change? Yes. User can use `alter table add partition` or `alter table drop partition` to create/drop partition in V2Table. ### How was this patch tested? Run suites and fix old tests. Closes #29339 from stczwd/SPARK-32512-new. Lead-authored-by: stczwd <qcsd2011@163.com> Co-authored-by: Jacky Lee <qcsd2011@163.com> Co-authored-by: Jackey Lee <qcsd2011@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
There are no partition Commands, such as AlterTableAddPartition supported in DatasourceV2, it is widely used in mysql or hive or other datasources. Thus it is necessary to defined Partition API to support these Commands.
We defined the partition API as part of Table API, as it will change table data sometimes. And a partition is composed of identifier and properties, while identifier is defined with InternalRow and properties is defined as a Map.
Does this PR introduce any user-facing change?
Yes. This PR will enable user to use some partition commands
How was this patch tested?
run all tests and add some partition api tests