Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31694][SQL] Add SupportsPartitions APIs on DataSourceV2 #28617

Closed
wants to merge 23 commits into from

Conversation

jackylee-ch
Copy link
Contributor

@jackylee-ch jackylee-ch commented May 23, 2020

What changes were proposed in this pull request?

There are no partition Commands, such as AlterTableAddPartition supported in DatasourceV2, it is widely used in mysql or hive or other datasources. Thus it is necessary to defined Partition API to support these Commands.

We defined the partition API as part of Table API, as it will change table data sometimes. And a partition is composed of identifier and properties, while identifier is defined with InternalRow and properties is defined as a Map.

Does this PR introduce any user-facing change?

Yes. This PR will enable user to use some partition commands

How was this patch tested?

run all tests and add some partition api tests

Change-Id: If8ae497644895167fd0f75de863411c3c37e2662
@jackylee-ch jackylee-ch changed the title [SPARK-32694][SQL][WIP] Add SupportsPartitions Catalog APIs on DataSourceV2 [SPARK-31694][SQL][WIP] Add SupportsPartitions Catalog APIs on DataSourceV2 May 23, 2020
Change-Id: I56cd7d5f02b4fe9018a25bcc901bc23e6acaaed4
Change-Id: I39d0a5457b11dc071962f8e60d9a580fb9db1ed6
@jackylee-ch
Copy link
Contributor Author

@jackylee-ch jackylee-ch changed the title [SPARK-31694][SQL][WIP] Add SupportsPartitions Catalog APIs on DataSourceV2 [SPARK-31694][SQL] Add SupportsPartitions Catalog APIs on DataSourceV2 Jun 9, 2020
@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #124963 has finished for PR 28617 at commit 4a77db0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jackylee-ch jackylee-ch reopened this Jul 15, 2020
*/
void dropPartitions(
Identifier ident,
Map<String, String>[] partitions,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few cases here where partitions are referred to as Map<String, String> and a few times where they use the TablePartition class. I think it would probably make more sense if they were all TablePartition (the class) unless there is a significant reason for them not to be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TablePartition Contains the partition metadata, it's too heavy for this. As for Transform, it maybe a good choice if it can pass partition value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then is this partitionSpec and not "partition"?

Copy link
Contributor

@rdblue rdblue Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to decide how to pass the data that identifies a partition.

There have been a lot of problems over the years working with Hive partitions because values are coerced to and from String. Often, people get the conversions slightly wrong. I think a better approach is to use a row of values to pass partition data between Spark and a source. We already pass typed rows in the read and write APIs, so it would be reasonable to do so here as well.

One benefit of using a typed row to represent partition data is that we can directly use a listPartitions call for metadata queries.

This would also align more closely with how Spark handles partitions internally. From PartitioningUtils:

/**
 * Holds a directory in a partitioned collection of files as well as the partition values
 * in the form of a Row.  Before scanning, the files at `path` need to be enumerated.
 */
case class PartitionPath(values: InternalRow, path: Path)

case class PartitionSpec(
    partitionColumns: StructType,
    partitions: Seq[PartitionPath])

A table that implements SupportsPartitions could return a partitionType(): StructType that describes the partition rows it accepts and produces.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing: using a row to pass a tuple would make it possible to also get the partition that an input split (e.g., file) belongs to. That would be useful for storage-partitioned joins.

Copy link
Contributor Author

@jackylee-ch jackylee-ch Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Point! The partition identifiers can be written like PartitionSpec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually thinking that partitions would be identified more like PartitionPath. In this API, I'm not sure if the Path part of PartitionPath is needed, since sources may not need to expose it to Spark. (In Iceberg, for example, there is no partition path.)

I think just using an InternalRow to identify a partition is a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, I have changed it to InternalRow. The definition in PartitioningUtils is a very good idea, thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further think about this. InternalRow only contains the data of partition identifier, not partition columns. That means user must make partition data always in order. Is that reasonable to user? @rdblue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is reasonable. While this is a public API, the user here is a connector developer and they are expected to be able to produce InternalRow in other places. I think this is actually a good thing because we don't need to pass the names every time.

import java.util.HashMap;
import java.util.Map;

public class TablePartition {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partitions are already referred to in other parts of the catalog with the Transform class, do we need this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your CR.
Transform doest not contains actual partition values and partition metadata. In many cases, we need to know the metadata of a partition.
Please correct me if there is something wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class and method needs documentation, it might also help clarify how this differents from Transform

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reply, I will doing it.

void createPartitions(
Identifier ident,
TablePartition[] partitions,
Boolean ignoreIfExists);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this be boolean? also please add to the javadoc for this.

Copy link
Contributor Author

@jackylee-ch jackylee-ch Jul 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be renamed to createPartition(identifier, properties)

TablePartition[] partitions);

/**
* Retrieve the metadata of a table partition, assuming it exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if they don't exist? Is an exception thrown. I don't know if this documentation style is consistent with how spark does it but I would expect something like:

Retrieve the metadata of a table partition.

@throws ParitionNotFoundException ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorrry, I will rewrite the document. The NoSuchPartition should be thrown.

* Retrieve the metadata of a table partition, assuming it exists.
*
* @param ident a table identifier
* @param partition a list of string map for existing partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't exactly clear what the keys and values of the maps are here. It also does not appear to be a list which I find confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorrry, I will rewrite the document. Thanks for pay attention to this pr


public class TablePartition {
private Map<String, String> partitionSpec;
private Map<String, String> parametes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo "parameters"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks

* Catalog methods for working with Partitions.
*/
@Experimental
public interface SupportsPartitions extends TableCatalog {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to extend TableCatalog instead of Table? I think it would be better to support partitions at a table level.

Doing it this way creates more complexity for implementations because they need to handle more cases. For example, if the table doesn't exist, this should throw NoSuchTableException just like loadTable. It would be simpler for the API if these methods were used to manipulate a table, not to load and manipulate a table. Loading should be orthogonal to partition operations.

Another issue is that this assumes a table catalog contains tables that support partitions, or tables that do not. But Spark's built-in catalog supports some sources that don't expose partitions and some that do. This would cause more work for many catalogs, which would need to detect whether a table has support and throw UnsupportedOperationException if it does not. That also makes integration more difficult for Spark because it can't check a table in the analyzer to determine whether it supports the operation or not. Instead, Spark would need to handle exceptions at runtime.

Copy link
Contributor Author

@jackylee-ch jackylee-ch Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable to me.
The reason I want defined it as Catalog API is I think Catalog API is used to manage metadata for Partition and Table API is used for the actual data operation.
However, as you said, there are some source, such as mysql or FileTable will use partition API to manage partition data. Thus making Partition API as part of Table API is a better way.
Thanks

* @param partitions a list of string map for existing partitions
* @param ignoreIfNotExists
*/
void dropPartitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places in the v2 API, we leave ifNotExists to Spark so that we don't need to pass it. Instead of passing an extra parameter, the method returns a boolean to indicate whether the partition was dropped or if no action was taken. Either way, the partition should not exist after the method call so it is idempotent.

Then, Spark decides whether to throw an exception because the partition did not exist. We prefer that pattern of doing more in Spark to make behavior standard across sources and to make the requirements as simple as we can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be renamed to boolean dropPartition(identifier)

*/
String[] listPartitionNames(
Identifier ident,
Map<String, String> partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't clear what this does. What is a partition name? Are you referring to the "key" in Hive?

Copy link
Contributor Author

@jackylee-ch jackylee-ch Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return the partition identifiers, may be filtered by a partition identifier, in this table. Not just the key in Hive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be changed to Row[] listPartitionIdentifiers(identifier); The identifier` in parameter is used to find partition identifiers in which the parameter is part of then..

@rdblue
Copy link
Contributor

rdblue commented Jul 23, 2020

Thanks for working on this, @stczwd! I think it would be great to get this into 3.1, if possible, to support some of the existing SQL that doesn't work with v2 (like ADD/DROP PARTITION).

The main things I think should change are:

  • Partition support should be at the table level, not the catalog level
  • Partitions should be identified by a tuple of data, not strings that need to be parsed or encoded
  • Methods should work like the ones in other parts of the catalog. For example, drop should return a boolean and we may want alter to be more specific about the changes that are made. That, or maybe it should be replacePartitionMetadata to be clear what the behavior is (replace the map of properties, I think).

I like that your TablePartition is a partition identifier and a map of properties. That seems reasonably simple to me. Have you considered what to do for tables that don't support storing metadata at the partition level? (I don't think path-based tables have partition metadata, right?)

@jackylee-ch
Copy link
Contributor Author

jackylee-ch commented Jul 24, 2020

@rdblue Thanks for your attention and advices. It would be great if we can have this in 3.1, a lots of things can be done after Partition API defined.
Your points are all reasonable to me, thanks for your help. I will reconsider about the partition methods.

BTW, the reason using parameters to contains partition metadata is to suit other sources, like mysql. They may have other kind of metadata for partition. As for other sources which does not have partition metadata, the can use TablePartition with partition identifier as well, the parameters can be an empty map.

Change-Id: Ifa7655acf23f3ae6cfd70c41c91ee190ae78d4b8
@SparkQA
Copy link

SparkQA commented Jul 26, 2020

Test build #126569 has finished for PR 28617 at commit 9ff1c6c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

Change-Id: I1438992637bfb20a68b71c078610171fd576ade8
@SparkQA
Copy link

SparkQA commented Jul 26, 2020

Test build #126570 has finished for PR 28617 at commit f9288aa.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec])

Change-Id: I510fabc80caec1a4514273970fc72f6f8fa17d76
@jackylee-ch
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 10, 2020

Test build #127273 has finished for PR 28617 at commit b570496.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* These APIs are used to modify table partition or partition metadata,
* they will change the table data as well.
* ${@link #createPartitions}:
* add an array of partitions and any data that their location contains to the table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: their location contains -> they contain, to be consistent with the doc of dropPartitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

* @throws NoSuchPartitionsException If any partition identifier to alter doesn't exist
* @throws UnsupportedOperationException If partition property is not supported
*/
void replacePartitionMetadatas(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which command needs it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, AlterTableSerDePropertiesCommand and AlterTableSetLocationCommand use this API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the parser rules, they can't change multiple partitions at once:

    | ALTER TABLE multipartIdentifier (partitionSpec)?
        SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)?     #setTableSerDe
    | ALTER TABLE multipartIdentifier (partitionSpec)?
        SET SERDEPROPERTIES tablePropertyList                          #setTableSerDe

I think we don't need this batch API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because replacePartitionMetadata also operate on partition data, it should also be atomically if support multiple partition operations. If we don’t need it currently, it can be deleted.

* These APIs are used to modify table partition identifier or partition metadata.
* In some cases, they will change the table data as well.
* ${@link #createPartition}:
* add a partition and any data that its location contains to the table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: its location contains -> it contains

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Change-Id: I4870b050b2979991d039407e7c99a638b2eb8cd0
@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127325 has finished for PR 28617 at commit b3a6e2b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

Change-Id: I0c3c6c44c3f7a6187159c22aa91c7c2de204acee
@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127328 has finished for PR 28617 at commit 279cea6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Change-Id: I902fe987e6685aa51386bda4c65e19998f317ee8
Change-Id: I14207ee891274be86c591c68abd9e66666c15d03
@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127334 has finished for PR 28617 at commit 4bf9711.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127338 has finished for PR 28617 at commit c96e0fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* the operation of dropPartitions need to be safely rolled back.
*
* @param idents an array of partition identifiers
* @throws NoSuchPartitionsException If any partition identifier to drop doesn't exist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we be consistent with dropPartition? which doesn't require you to throw exception for non-existing partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partitions will be checked before dropPartitions in AlterTableDropPartitionExec, thus NoSuchPartitionsException won't need here.
It's ok we return boolean.

/**
* @return the partition schema of table
*/
StructType partitionSchema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mention that, this must be consistent with Table.partitioning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sound reasonable to me.

boolean dropPartition(InternalRow ident);

/**
* Test whether a partition exists using an {@link Identifier identifier} from the table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an {@link Identifier identifier} from the table. This is nothing about Identifier. I think you mean partition identifier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, wrong comment, I change it.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except several minor comments

Change-Id: Id45c2cfd3acbbd6fbcb88d0ac1452fc8aa4c19fa
Change-Id: Icbe0edf5eec106dd7dff5ce3463cff82ec0310a0
@SparkQA
Copy link

SparkQA commented Aug 12, 2020

Test build #127383 has finished for PR 28617 at commit 4f1bff3.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Change-Id: I2d07560c289b6b90092f778a4763a578f938c887
@SparkQA
Copy link

SparkQA commented Aug 12, 2020

Test build #127379 has finished for PR 28617 at commit 4686a24.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 60fa8e3 Aug 12, 2020
@dongjoon-hyun
Copy link
Member

Thank you all!

@SparkQA
Copy link

SparkQA commented Aug 12, 2020

Test build #127384 has finished for PR 28617 at commit bfd17d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jackylee-ch
Copy link
Contributor Author

Thanks for your help and support, @rdblue @cloud-fan @dongjoon-hyun

cloud-fan pushed a commit that referenced this pull request Nov 11, 2020
…asourcev2

### What changes were proposed in this pull request?
This patch is trying to add `AlterTableAddPartitionExec` and `AlterTableDropPartitionExec` with the new table partition API, defined in #28617.

### Does this PR introduce _any_ user-facing change?
Yes. User can use `alter table add partition` or `alter table drop partition` to create/drop partition in V2Table.

### How was this patch tested?
Run suites and fix old tests.

Closes #29339 from stczwd/SPARK-32512-new.

Lead-authored-by: stczwd <qcsd2011@163.com>
Co-authored-by: Jacky Lee <qcsd2011@163.com>
Co-authored-by: Jackey Lee <qcsd2011@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants