-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24252][SQL] Add catalog registration and table catalog APIs. #21306
[SPARK-24252][SQL] Add catalog registration and table catalog APIs. #21306
Conversation
This comment has been minimized.
This comment has been minimized.
@henryr, @cloud-fan, @marmbrus, here's a first pass at adding a catalog mix-in to the v2 API. Please have a look and leave comments on what you'd like to change. One thing that I don't think we need right away is the |
This comment has been minimized.
This comment has been minimized.
@cloud-fan, what needs to change to get this in? I'd like to start making more PRs based on these changes. |
There are several things we need to discuss here:
|
@cloud-fan, thanks for the thorough feedback!
This PR introduces create, drop, and alter. We can always add more later. These are the ones that we need to implement DataSourceV2 operations and DDL support.
These two are the easiest and least intrusive way to start because the data source catalog interaction is explicitly tied to a catalog. It also matches the behavior used by other systems for multiple catalogs. I think this is what we should start with and then tackle ideas like your second point.
For this and a couple other questions, I don't think we need to decide right now. This PR is about getting the interface for other sources in Spark. We don't necessarily need to know all of the ways that users will call it or interact with it, like how To your question here, I'm not sure whether the
The SPIP proposes two catalog interfaces that return
That sounds like a reasonable idea to me. Like the behavior of
This is another example of something we don't need to decide yet. We have a couple different options for the behavior and will want to think them through and discuss them on the dev list. But I don't think that the behavior necessarily needs to be decided before we add this API to sources. |
* Data sources must implement this interface to support logical operations that combine writing | ||
* data with catalog tasks, like create-table-as-select. | ||
*/ | ||
public interface CatalogSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about it more, what we really need in the near future is all about table: create/alter/lookup/drop tables, instead of how the tables are organized, like databases, and how other information is stored, like view/function.
How about we call it TableSupport
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me.
/** | ||
* Represents table metadata from a {@link DataSourceCatalog}. | ||
*/ | ||
public interface Table { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is something we should decide now. IMO schema
and properties
are must-have, but others may not. e.g. if a data source uses a path to lookup table, then there is no database/table name to it. And we don't have a story to deal with partitions yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the table metadata, I think we do need partitions. Iceberg creates partitioned tables and I'd like to start getting the DDL operations working. This is why I proposed this metadata in the SPIP a few months ago. We seem to have lazy consensus around it.
You're right about the name. How about I change it to identifier that could be a path or a name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the last comment because I thought this was referring to CatalogSupport
at first. Sorry about the confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll just remove name
and database
. We can add them later when we figure out how we want to handle it. We need partitioning right away, though.
* Create a TableChange for renaming a field. | ||
* <p> | ||
* The name is used to find the field to rename. The new name will replace the name of the type. | ||
* For example, renameColumn("a.b.c", "x") should produce column a.b.x. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's great to have an example to show how to use this API, can we add an example to all the methods here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you looking for examples in Javadoc, or an example implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an example to the Javadocs:
import TableChange._
val catalog = source.asInstanceOf[TableSupport].catalog()
catalog.alterTable(ident,
addColumn("x", IntegerType),
renameColumn("a", "b"),
deleteColumn("c")
)
* TableChange subclasses represent requested changes to a table. These are passed to | ||
* {@link DataSourceCatalog#alterTable}. | ||
*/ | ||
public interface TableChange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great!
return new DeleteColumn(name); | ||
} | ||
|
||
final class AddColumn implements TableChange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed that these aren't public, but should be because they will be passed to implementations through alterTable
.
These should also implement unapply
for Scala implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I forgot that these are in an interface so they are automatically public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, I'm not sure it's possible to implement unapply in Java. Not even implementing Product works.
7130d13
to
42ed4a4
Compare
@cloud-fan, I've updated this to address your comments. Thanks for the reviews! |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
*/ | ||
Table createTable(TableIdentifier ident, | ||
StructType schema, | ||
List<Expression> partitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's expressions? In current Spark we only support PARTITION BY columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend reading the proposal SPIP's "Proposed Changes" section, which goes into more detail than this comment can. In short, you're thinking of partitions as columns like Hive tables, but that is a narrow definition that prevents the underlying format from optimizing queries.
Partitions of a table are derived from the column data through some transform. For example, partitioning by day uses a day transform from a timestamp column: day(ts)
. Hive doesn't keep track of that transform and requires queries to handle it by inserting both ts
and day
columns. This leads to a few problems, including:
- Hive has no ability to transform
ts > X
to the partition predicateday >= day(X)
. Queries that don't take into account the table's physical storage by adding partition predicates by hand will result in full table scans. - Users can insert any data they choose into the
day
partition and it is up to them to do it correctly.
Also, consider bucketing. Bucketing is also a transform that is effectively a partitioning of the table's files: bucket=hash(col) % N
. The reason why bucketing is handled as a special case in Hive is that using it requires knowing the transform and relationship between the bucket number and its column. If we think of partitioning as grouping data by common values of a set of transforms, then buckets are just another partition that we can use for purposes like bucketed joins or limiting scans when looking for specific values.
If the transform is identity -- just copy the value into partition data -- then you have the same functionality that Hive provides. But by building the transformations into the partitioning layer, we can do more to optimize queries, while hiding the physical layout of a table.
Using Expression allows Spark to pass day(ts)
to the data source. It is up to the source which expressions are supported. The current FS tables would reject any expression that isn't just a column reference. Iceberg supports identity
, year
, month
, day
, hour
, truncate
, and bucket
transforms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the SPIP but I still can't figure it out. How can Spark pass these "partition transform" to data source? The current end-user API only allows users to specify partition columns.
And why does the "partition transform" belong to a table definition? I think it's reasonable to say that a table is partition by column timestamp
, and supports pushing partition predicates like year(timestamp) > 2000
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current end-user API only allows users to specify partition columns.
I think an example would help understand the use of expression here. Right now, I can create a table partitioned by day like this:
CREATE TABLE t (ts timestamp, data string, day string) PARTITIONED BY (day)
Then it's up to queries to supply the right values for day
in their queries. I'm proposing we change that to something like the following that uses expressions in the PARTITIONED BY clause instead of only allowing column names:
CREATE TABLE t (ts timestamp, data string) PARTITIONED BY (date(ts));
This can handle all identity partitioning in Hive tables today and it can handle bucketing.
And why does the "partition transform" belong to a table definition?
Transforms should be passed to the table so the source can use them for the physical layout. In DataSourceV2, the source could be anything so it needs to be the component that handles the physical layout. Because we want distributed data sources, we need some way of telling them how to distribute data.
For example, I could use a partitioning expression to tell a source how to shard across PostgreSQL instances. I could also use it to define the keys in an HBase connector. Those are uses of partitioning that Spark can't handle internally.
Like Hive, Spark has only supported a limited definition of partitioning up to now, but I'd like to be able to put tables using Hive's layout behind this API eventually. I think this way of configuring partitioning is a good way to do that, while supporting what Iceberg and other sources will need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another benefit: this would allow us to translate BUCKETED BY
clauses into something we can actually pass to data sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, and this seems a very cool feature. My only concern is that, this new feature is not being discussed in dev list yet, and no JIRA ticket is tracking it. I feel a little weird to support a non-existing feature in data source v2 API. Shall we start a thread in dev list for this new feature? And see if we can make it before 2.4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't say this way of passing partitioning is a new feature. It's just a generalization of the existing partitioning that allows us to pass any type of partition, whether it is bucketing or column-based.
As for open discussion, this was proposed in the SPIP that was fairly widely read and commented on. That SPIP was posted to the dev list a few times, too. I do appreciate you wanting to make sure there's been a chance for the community to discuss it, but there has been plenty of opportunity to comment. At this point, I think it's reasonable to move forward with the implementation.
This comment has been minimized.
This comment has been minimized.
023995d
to
46100f3
Compare
This comment has been minimized.
This comment has been minimized.
@marmbrus, @cloud-fan, @gatorsmile, I've updated this PR to use reflection to instantiate catalogs. This allows implementations to provide named catalogs (and reuse implementations) and configure those catalogs with Spark configuration properties. FYI @bersprockets, @felixcheung, @jzhuge |
@@ -609,6 +611,12 @@ class SparkSession private( | |||
*/ | |||
@transient lazy val catalog: Catalog = new CatalogImpl(self) | |||
|
|||
@transient private lazy val catalogs = new mutable.HashMap[String, CatalogProvider]() | |||
|
|||
private[sql] def catalog(name: String): CatalogProvider = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this is private[sql]
. This allows us to use the named TableCatalog
instances without solving how multiple catalogs should be exposed to users through a public API just yet.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Can we support column range partition predicates please? |
* ) | ||
* </pre> | ||
*/ | ||
public interface TableChange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we support adding a comment to a column? / table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can add that. What do you suggest changing?
* ) | ||
* </pre> | ||
*/ | ||
public interface TableChange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is adding or dropping table partitions a table change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this API covers table configuration, not data modification. If you're interested in dropping partitions, you should look at the DeleteSupport API proposal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be a valid operation to change the partitioning of the table without dropping the entire table and re-creating it? E.g. change the bucket size for such and such column. Seems pretty difficult to do in practice though since the underlying data layout would have to change as part of the modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah, our table format supports updating the partitioning of a table, so I think it should be supported. But, this is intended to be an initial API so I didn't want to block this on agreeing how to repartition a table.
This has an "apply" transform for passing other functions directly through, so that may help if you have additional transforms that aren't committed to Spark yet. As for range partitioning, can you be more specific about what you mean? What does that transform function look like? Part of the rationale for the existing proposal is that these are all widely used and understood. I want to make sure that as we expand the set of validated transforms, we aren't introducing confusion. Also, could you share the use case you intend for this? It would be great to hear about uses other than just Iceberg tables. |
Sure,
I am looking at the point of view of supporting Kudu. Check out https://kudu.apache.org/docs/schema_design.html#partitioning for some of the details. In particular https://kudu.apache.org/2016/08/23/new-range-partitioning-features.html.
As kudu is a column store, each column also has attributes associated with it such as encoding and compression codecs.
Apache Kudu - Apache Kudu Schema Design<https://kudu.apache.org/docs/schema_design.html#partitioning>
A new open source Apache Hadoop ecosystem project, Apache Kudu completes Hadoop's storage layer to enable fast analytics on fast data
kudu.apache.org
I really think that partitions should be considered part of the table schema. They have an existence above and beyond the definition of a filter that matches a record. Adding an empty partition changes the state of many underlying systems. Many systems that support partitions also have APIs for adding and removing partition definitions, some systems require partition information to be specified during table creation. Those systems that support changing partitions after creation usually have specific APIs for adding and removing partitions.
Dale,
…________________________________
From: Ryan Blue <notifications@github.com>
Sent: Tuesday, 4 September 2018 4:20 PM
To: apache/spark
Cc: tigerquoll; Comment
Subject: Re: [apache/spark] [SPARK-24252][SQL] Add catalog registration and table catalog APIs. (#21306)
Can we support column range partition predicates please?
This has an "apply" transform for passing other functions directly through, so that may help if you have additional transforms that aren't committed to Spark yet.
As for range partitioning, can you be more specific about what you mean? What does that transform function look like? Part of the rationale for the existing proposal is that these are all widely used and understood. I want to make sure that as we expand the set of validated transforms, we aren't introducing confusion.
Also, could you share the use case you intend for this? It would be great to hear about uses other than just Iceberg tables.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F21306%23issuecomment-418430089&data=02%7C01%7C%7C335b27fc36b2449d1ac208d612824fa2%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636716748222067761&sdata=yWzFakaWAq5yhYAo%2FuBoFkIXpP9hoh9f1N6xm3XcQOs%3D&reserved=0>, or mute the thread<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fnotifications%2Funsubscribe-auth%2FAH9Fuh3RPZ-hTd5T3e92TX-xmiPHEGv5ks5uXqhEgaJpZM4T8FJh&data=02%7C01%7C%7C335b27fc36b2449d1ac208d612824fa2%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636716748222067761&sdata=wJSnYO69FKZ8ZHbqGNrxxGsjC1W0rR7NIWOAE0EqXTA%3D&reserved=0>.
|
So Kudu range partitions support arbitrary sized partition intervals, like the example below, where the first and last range partition are six months in size, but the middle partition is one year in size. -- Make a table representing a date/time value as TIMESTAMP.
-- The strings representing the partition bounds are automatically
-- cast to TIMESTAMP values.
create table native_timestamp(id bigint, when_exactly timestamp, event string, primary key (id, when_exactly))
range (when_exactly)
(
partition '2015-06-01' <= values < '2016-01-01',
partition '2016-01-01' <= values < '2017-01-01',
partition '2017-01-01' <= values < '2017-06-01'
)
stored as kudu; |
@tigerquoll, the proposal isn't to make partitions part of table configuration. It is to make the partitioning scheme part of the table configuration. How sources choose to handle individual partitions is up to the source. How those partitions are exposed through Spark is a different API because the current v2 data source design covers tables that appear to be unpartitioned. We could support range partitioning with the strategy that was discussed on the dev list, where the configuration is a function application with column references and literals. So your partitioning could be expressed like this: create table t (id bigint, ts timestamp, data string)
partitioned by (range(ts, '2016-01-01', '2017-01-01', '2017-06-01')) using kudu |
where are we on this? |
@felixcheung, we're waiting on more reviews and a community decision about how to pass partition transforms. For passing transforms, I think the most reasonable compromise is to go with a generic function application, so each transform would be passed as a function/transform name with one or more arguments, where each argument is either a column reference (by name) or a literal value. That's a fairly small public API addition but it supports a lot of different partitioning schemes to be expressed, including the one above for Kudu. We already have all of this implemented based on the current PR, but I can update this in the next week or so. |
/** | ||
* Represents table metadata from a {@link TableCatalog} or other table sources. | ||
*/ | ||
public interface Table { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nomenclature here appears to conflict with @cloud-fan's refactor in https://github.com/apache/spark/pull/22547/files#diff-45399ef5eed5c873d5f12bf0f1671b8fR40. Maybe we can call this TableMetadata
or TableDescription
? Or perhaps we rename the other construct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the two Table
classes are trying to be the same thing. This is one of the reasons why I brought it up in the sync. @cloud-fan's current PR isn't yet based on this work, so it doesn't get the abstraction right.
What you linked to uses Table
to expose newScanConfigBuilder
, basically requiring that all tables are readable. Instead, the implementation classes in #22547 should be interfaces that extend this Table
to make it readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this patch in comparison to the other again (updated to #23086) it looks like this work should be rebased on top of the batch read refactor's PR in order to not have two Table
classes that do the same thing - is this the right assessment?
* ) | ||
* </pre> | ||
*/ | ||
public interface TableChange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be a valid operation to change the partitioning of the table without dropping the entire table and re-creating it? E.g. change the bucket size for such and such column. Seems pretty difficult to do in practice though since the underlying data layout would have to change as part of the modification.
@rdblue Have you considered about stream table API? It may have some differences between batch table ddl and stream table ddl. |
@stczwd, thanks for taking a look at this. What are the differences between batch and stream DDL that you think will come up? |
import java.util.Map; | ||
|
||
/** | ||
* Represents table metadata from a {@link TableCatalog} or other table sources. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it include a View, like what we are doing in the CatalogTable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this interface carries minimal set of operations needed to implement the v2 logical plans. We can expand it later when we need to.
The goal here is to build a replacement catalog API incrementally and to avoid requiring all catalogs to implement all possible catalog features. This API is focused on table operations, not view or function operations that we have yet to define.
A general question. How to use this catalog API to implement the Hive metastore? Is it doable? |
* {@code name}, is also added to the options and will contain the catalog's name; in this case, | ||
* "catalog-name". | ||
*/ | ||
public interface CatalogProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, will these APIs now live in the sql-api
package? Also at what point are we going to introduce this new Maven module and package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, do you want me to create the sql-api
package in this PR, or do you want to add a separate PR to move the current v2 API?
|
import java.util.List; | ||
import java.util.Map; | ||
|
||
public interface TableCatalog extends CatalogProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantics here aren't clear at least to me. Typically a <something>Provider
is a class that can instantiate a something
. Here it appears to provide itself? The abstraction I would imagine would be to have either:
CatalogProvider
has a method calledget(options, sqlConf)
which returns aTableCatalog
configured with the given options andSQLConf
, or- Remove
CatalogProvider
entirely and putinitialize
in this interface. EveryTableCatalog
instance must be initialized before calling any other methods likeloadTable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent is to use some interface to load all catalogs, whether they implement TableCatalog
, FunctionCatalog
, or both (or other catalog API parts). So you load a catalog, then check whether it is a TableCatalog
when you want to use it for tables.
Sounds like the name CatalogProvider
is the confusing part. You're right that a provider usually implements a get method to provide something. I could change that to CatalogImpl
or something. Would that work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps something as simple as Catalog
as the top level then, which is sub-interfaced by TableCatalog
, FunctionCatalog
, and other "kinds" of catalogs. They can all share the same initialize
method which is declared by Catalog
. That sounds like the simplest idea, perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about CatalogPlugin
? I'm hesitant to go with just Catalog
because it isn't very specific. I think it might cause confusion because the interface has only the initialize
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with that!
} | ||
|
||
lazy val options: Map[String, String] = { | ||
v1Table.storage.locationUri match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider .map(...).getOrElse
, but we haven't been consistent on using or not using match
on Option
types throughout Spark in general anyways so it's fine to leave as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also any particular reason this and the following variables have to be lazy
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would the getOrElse
pattern work here? If the URI is undefined, what tuple should be added to the table properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A second read over this, don't think we necessarily have to use the Option
lambdas here, and in fact may be less legible, varying from developer to developer.
But if one were to do so, it'd be something like this...
v1Table.storage.properties + v1Table.storage.locationUri.map(uri -> Map("path" -> CatalogUtils.URITOString(uri)).getOrElse(Map.empty)
* @return an Apply transform for the column | ||
*/ | ||
public static PartitionTransform apply(String transform, String colName) { | ||
if ("identity".equals(transform)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to support all PartitionTransform
types for a first pass? Though, I would imagine we'd have to for the v2 to v1 catalog adapter. If it weren't for that, I would suggest supporting only a simple set of PartitionTransform
, such as only identity
, to keep this PR focused on the catalog API and not the partitions API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I wanted to discuss on Wednesday was how to pass these transforms. @rxin and I had some discussions about it on the dev list, but we didn't come up with a decision. I think the solution will probably be to add way to pass generic function application and a list of arguments that are either columns or constant literals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should note that the generic function application will probably look like the Apply
case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to defer partition support, or is is fundamentally important enough to get that correct now because we will be building on it on e.g. the very next evolution of this API and its uses? I'm thinking about how to minimize the amount of API we're proposing per change, particularly if choices aren't particularly obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should get this done now. Partition transforms are a generalization of Hive partitioning (which uses some columns directly) and bucketing (which is one specific transform). If we add transformation functions now, we will support both of those with a simple API instead of building in special cases for identity and bucket transforms.
I also have a data source that allows users to configure partitioning using more transforms than just identity and bucketing, so I'd like to get this in so that DDL for those tables works.
/** | ||
* Represents table metadata from a {@link TableCatalog} or other table sources. | ||
*/ | ||
public interface Table { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this patch in comparison to the other again (updated to #23086) it looks like this work should be rebased on top of the batch read refactor's PR in order to not have two Table
classes that do the same thing - is this the right assessment?
* Return the table properties. | ||
* @return this table's map of string properties | ||
*/ | ||
Map<String, String> properties(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we default
this to an empty Map? I don't think all tables will support custom properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that works.
* Return the table partitioning transforms. | ||
* @return this table's partitioning transforms | ||
*/ | ||
List<PartitionTransform> partitioning(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we default this to no partitioning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
@stczwd my understanding here is that a table isn't a streaming table or a batch table, but rather that a table points to data that can either be scanned in stream or in batch, and that the table is responsible for returning either streaming scanners or batch scanners when the logical plan calls for it. The reason why I believe this is the case is because of https://github.com/apache/spark/pull/23086/files#diff-d111d7e2179b55465840c9a81ea004f2R65 and its eventual analogous streaming variant. In the new abstractions we propose here and in our proposal, the catalog gets a reference to a In other words, the crucial overarching theme in all of the following matters is that a Table isn't inherently a streaming or a batch table, but rather a Table supports returning streaming and/or batch scans. The table returned by the catalog is a pointer to the data, and the Scan defines how one reads that data.
The catalog returns an instance of
When one gets back a
Probably would be done from the SQL code side. But not as certain about this, can you elaborate?
The new abstraction handles this at the
This I don't think is as clear given what has been proposed so far. Will let others offer comment here. Others should feel free to offer more commentary or correct anything from above. |
@mccheah you mean the tables user created do not distinguish between stream and batch, but only when they are actually read from it? |
} | ||
|
||
lazy val options: Map[String, String] = { | ||
v1Table.storage.locationUri match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use lazy for a couple reasons. First, to avoid building maps or other data values that are never used. Second, to avoid a required ordering for fields. If fields depend on one another, then they have to be reordered when those dependencies change. Lazy values never require reordering.
@stczwd that's my understanding yeah. Others can feel free to correct me otherwise. |
@stczwd, I agree with @mccheah. Tables are basically named data sets. Whether they support batch, micro-batch streaming, or continuous streaming is determined by checking whether they implement SupportsBatchScan or similar interfaces. Matt's referenced docs are the right place to go for more context. The purpose here is to make catalogs and reads orthogonal. A catalog can return both batch-compatible and stream-compatible source "tables". A "table" may be a Kafka topic or may be a file-based data source. And note that both of those can support batch and streaming execution. A Kafka topic could be CDC stream that represents a table, and a file-based source could be streamed by periodically checking for new committed files. This PR is based on an SPIP. That has some background for why I chose the set of table attributes here (schema, partitioning, properties), but a short summary is that those are the core set of attributes that are used in comparable SQL variants and already used in Spark. |
I can +1 on the features brought by this PR. Even if what I'm interested in is more a "side" feature like having the ability to use the table catalog to inject an objet to the spark sources using a custom table. |
## What changes were proposed in this pull request? This adds a v2 API for adding new catalog plugins to Spark. * Catalog implementations extend `CatalogPlugin` and are loaded via reflection, similar to data sources * `Catalogs` loads and initializes catalogs using configuration from a `SQLConf` * `CaseInsensitiveStringMap` is used to pass configuration to `CatalogPlugin` via `initialize` Catalogs are configured by adding config properties starting with `spark.sql.catalog.(name)`. The name property must specify a class that implements `CatalogPlugin`. Other properties under the namespace (`spark.sql.catalog.(name).(prop)`) are passed to the provider during initialization along with the catalog name. This replaces #21306, which will be implemented in two multiple parts: the catalog plugin system (this commit) and specific catalog APIs, like `TableCatalog`. ## How was this patch tested? Added test suites for `CaseInsensitiveStringMap` and for catalog loading. Closes #23915 from rdblue/SPARK-24252-add-v2-catalog-plugins. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Closing this because it is replaced by #24246. |
## What changes were proposed in this pull request? This adds a v2 API for adding new catalog plugins to Spark. * Catalog implementations extend `CatalogPlugin` and are loaded via reflection, similar to data sources * `Catalogs` loads and initializes catalogs using configuration from a `SQLConf` * `CaseInsensitiveStringMap` is used to pass configuration to `CatalogPlugin` via `initialize` Catalogs are configured by adding config properties starting with `spark.sql.catalog.(name)`. The name property must specify a class that implements `CatalogPlugin`. Other properties under the namespace (`spark.sql.catalog.(name).(prop)`) are passed to the provider during initialization along with the catalog name. This replaces apache#21306, which will be implemented in two multiple parts: the catalog plugin system (this commit) and specific catalog APIs, like `TableCatalog`. ## How was this patch tested? Added test suites for `CaseInsensitiveStringMap` and for catalog loading. Closes apache#23915 from rdblue/SPARK-24252-add-v2-catalog-plugins. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This adds a v2 API for storage implementations to provide catalog instances to Spark. There are two parts:
CatalogProvider
is used to instantiate and initialize catalogs via reflection, similar to data sourcesTableCatalog
provides table operations proposed in the Table Metadata API SPIPThis also adds helper classes:
Catalogs
loads and initializes catalogs using configuration from aSQLConf
CaseInsensitiveStringMap
is used to pass configuration toCatalogProvider
viainitialize
PartitionTransform
is used to pass table partitioning without usingExpression
Catalogs are configured by adding config properties starting with
spark.sql.catalog.(name)
. The name property must specify a class that implementsCatalogProvider
. Other properties under the namespace (spark.sql.catalog.(name).(prop)
) are passed to the provider during initialization.How was this patch tested?
This includes a suite for
CaseInsensitiveStringMap
and one that tests loading catalog providers.