-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27181][SQL]: Add public transform API #24117
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
80c3966
to
5d192b9
Compare
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/logical/expressions/Expression.java
Outdated
Show resolved
Hide resolved
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
; | ||
|
||
transform | ||
: qualifiedName #identityTransform |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's used in CREATE TABLE statement only, do we really need qualifiedName
? I think identifier
is good enough here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to use qualifiedName
. This may be a logical name in the current use, but later Spark may need to resolve the transform using this name. For example, this could be set to builtin.bucket
to tell Spark that it is the built-in bucket transform function. Using that information, Spark would know it can run a bucketed join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the transform arguments? Do they need to be qualifiedName
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, arguments need to be qualifiedName
because they may reference nested fields.
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
Show resolved
Hide resolved
...catalyst/src/main/java/org/apache/spark/sql/catalyst/logical/expressions/NamedReference.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/logical/expressions/Transforms.java
Outdated
Show resolved
Hide resolved
package org.apache.spark.sql.catalyst.logical.expressions; | ||
|
||
/** | ||
* Helper methods to create logical transforms to pass into Spark. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who will call these help methods? Spark or data source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data sources that need to pass transforms back to Spark through Table.partitioning
. Spark uses the internal LogicalExpressions
.
This comment has been minimized.
This comment has been minimized.
* @param <T> the Java type of a value held by the literal | ||
*/ | ||
@Experimental | ||
public interface Literal<T> extends Expression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little hesitant to add type parameter to an expression interface. I'm not sure how useful it is. When I deal with expressions, my method parameter and return type is usually Expression
. Because of type erasure, I won't get the type parameter of literal, unless the method deals with literal only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the down side to using this? We have a typed literal in Iceberg and it is useful for maintaining type safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The downside is, we may need to add cast to read value from this literal, e.g.
def func(e: Expression) = e match {
case lit: Literal[_] => lit.asInstanceOf[Literal[Any]].value
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it will be good to see some examples. In general, my feeling is that, adding type parameter to a sub-class but not the base class is not going to be very useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The alternative is to cast the value instead, so you have to cast either way. You can't get around casting when the type is discarded. I don't think it is a good idea to throw away type information in all cases just because it isn't useful in some cases.
Here's an example of how it is used in Iceberg in expression evaluation:
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
Comparator<T> cmp = lit.comparator();
return cmp.compare(ref.get(struct), lit.value()) < 0;
}
In Iceberg, expression binding guarantees that the literal's type matches the reference's type. With that information, this code knows that the value returned by the reference's get method matches the type of the comparator and of the literal value.
override def toString: String = describe | ||
} | ||
|
||
private[sql] final case class ApplyTransform( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the semantics of this? an arbitrary function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some unknown transform.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
Outdated
Show resolved
Hide resolved
f3508b3
to
3d1ec9b
Compare
This comment has been minimized.
This comment has been minimized.
3d1ec9b
to
0aa3533
Compare
It is an ill pattern to have a lazy val that doesn’t do any computation.
…On Fri, Mar 29, 2019 at 4:28 PM Wenchen Fan ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
<#24117 (comment)>:
> +
+ override lazy val references: Array[NamedReference] = {
+ arguments
+ .filter(_.isInstanceOf[NamedReference])
+ .map(_.asInstanceOf[NamedReference])
+ }
+
+ override lazy val describe: String = s"$name(${arguments.map(_.describe).mkString(", ")})"
+
+ override def toString: String = describe
+}
+
+private[sql] final case class IdentityTransform(
+ ref: NamedReference) extends SingleColumnTransform(ref) {
+ override lazy val name: String = "identity"
+ override lazy val describe: String = ref.describe
I think it's more than a preference. PRs should keep the code style
consistent with the code base.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#24117 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AATvPPjFIH3rKqBfggEEEYmnWuDJ5Z8bks5vbqGLgaJpZM4b4GN3>
.
|
Test build #104098 has finished for PR 24117 at commit
|
@cloud-fan, I've updated the uses of |
Retest this please. |
Test build #104323 has finished for PR 24117 at commit
|
Retest this please. |
Test build #104334 has finished for PR 24117 at commit
|
Retest this please. |
56ca5da
to
a4a87ac
Compare
Test build #104396 has finished for PR 24117 at commit
|
Test build #104397 has finished for PR 24117 at commit
|
@cloud-fan, tests passed. Is this ready to merge? |
test("create table - partitioned by transforms") { | ||
val transforms = Seq( | ||
"bucket(16, b)", "years(ts)", "months(ts)", "days(ts)", "hours(ts)", "foo(a, 'bar', 34)", | ||
"bucket(32, b), days(ts)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this reminds me one thing: shall we resolve the references before passing the transforms to the table catalog? Now we create transforms at parser side, and we may pass years(ts)
to the table catalog even if ts
doesn't exist. Do we expect the table catalog itself to resolve references?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark will validate that the columns exist in the table schema using an analysis rule. That isn't going into this PR. This PR updates the parser and adds tests for that. We will add analysis rules later, when this PR makes it in.
The reason why we can't add them here is that we don't want to write validations against the parsed SQL plans. We want to write them against the v2 create table commands, which won't be added until after the table catalog makes it in (#24246).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
LGTM except one comment |
thanks, merging to master! |
Thank you @cloud-fan! |
Can we create a JIRA ticket and mark it a blocker for 3.0? It'd be bad to move all classes after 3.0.
…On Mon, Apr 15, 2019 at 4:48 PM, Ryan Blue < ***@***.*** > wrote:
***@***.**** commented on this pull request.
In sql/ catalyst/ src/ main/ java/ org/ apache/ spark/ sql/ catalog/ v2/ expressions/
Expression. java (
#24117 (comment) ) :
> + * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. +
* The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with +
* the License. You may obtain a copy of the License at + * + * http:/ / www.
apache. org/ licenses/ LICENSE-2. 0 (
http://www.apache.org/licenses/LICENSE-2.0 ) + * + * Unless required by
applicable law or agreed to in writing, software + * distributed under the
License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. + * See the License for
the specific language governing permissions and + * limitations under the
License. + */ + +package org.apache.spark.sql.catalog.v2.expressions;
First, we have to decide on what all we need to move. I don't think that
is clear yet. The next concern is getting #24246 (
#24246 ) in because it is a blocker for
a lot of work that can be done in parallel. After that, it would be nice
to coordinate to avoid breaking lots of PRs, but that's less of a concern.
So to answer your question, I think we should do this after #24246 (
#24246 ) and after we've decided what
needs to move and what the new organization should be.
The actual changes should be simple and quick to review, but would cause
too much delay combined into a PR with other changes.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub (
#24117 (comment) ) , or mute
the thread (
https://github.com/notifications/unsubscribe-auth/AATvPPrGVn_d-wMJ-FpQfWH0ySYnFrS6ks5vhQ-xgaJpZM4b4GN3
).
|
## What changes were proposed in this pull request? This adds a public Expression API that can be used to pass partition transformations to data sources. ## How was this patch tested? Existing tests to validate no regressions. Added transform cases to DDL suite and v1 conversions suite. Closes apache#24117 from rdblue/add-public-transform-api. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@rdblue, sorry that I just came here late and leave a comment like this but do you mind if I ask what does this PR target (or any discussion link I can refer)? I happened to come here while tracking history and I am lost here. So .. does this PR target to support CREATE TABLE table(col INT) USING parquet PARTITIONED BY transform(col) For Looks it's going to be super confusing.
Seems like we should either try to push expression itself (or subset of the expressions that the source can handle) and the datasource implementation should interpret it. WDYT? please let me know if I am getting completely wrong somewhere. |
@HyukjinKwon, these transforms are passed to the data source as table configuration and it is up to the sources to implement them. These are not expressions like other Spark SQL syntax. They are named transformations that accept a list of literals and columns. |
Then, I think at the very least the way of calling it should be different from calling Spark functions. This is the root cause because users cannot distinguish which one is the transforms and Spark expressions; however, both work differently. |
These are only allowed within a |
The problem is that, previously it wasn't allowed but apparently looks like it's going to allow now (while it's actually not). I myself was confused about this so I had to track the history. |
Can we have a bit different syntax for this at least? this: CREATE TABLE table(col INT) USING parquet PARTITIONED BY transform(col) looks it will allow arbitrary expression at |
@rdblue, shall we bring this topic up to the next DSv2 meeting if you don't currently have an idea to handle it and/or don't think it matters? |
@HyukjinKwon, we discussed this a while ago and I don't see much reason to reopen it. You're welcome to bring it up at the next sync, but I don't consider this a problem. These look like expressions because they are limited expressions for transforming data to produce partition values. Expressions should look like expressions. If you want to improve some of the cases where more complex expressions aren't supported, then let's do that. We also don't yet know how much expression syntax we will pass to sources. This is why we started a public expression API: so we can pass expressions to sources where they are needed. I expect that API to expand as we solve more complicated use cases. |
Can you point out any link or the summary in the mailing list? Maybe I have missed some discussions so I wanted to read and follow. This was my original intention, actually.
If this is going to support an expression-like support, it should at least work like an expression (even though we don't partially push down like DSv1 filter APIs). From a cursory look, the current transform API just looks like half-baked one - looks we will have to have a copy of the Spark expressions, and unable to extend to support other expression-like supports such I indeed see a problem here - my impression was that the point of DSv2 is to avoid half-baked APIs or just-work-for-now. How do we plan to extend this support? The current status looks definitely half-backed, and some changes against this API look being proposed, for example #26929 which was why I had to follow the history. |
* Base class of the public logical expression API. | ||
*/ | ||
@Experimental | ||
public interface Expression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused why we should expose a custom API which is only used in transform API currently.
Maybe there was a discussion about this plan. Do we plan to switch the current Spark expressions to this expression entirely in the future and how is it different from using a UDF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to give the semantic to data source instead of a concrete UDF. Data sources can implement the partitioning semantic efficiently if they don't need to call a java UDF here and there.
The partitioning expressions need to be public because it's used in DS v2, that's why we create a public But I do agree with the concern from @HyukjinKwon about how we are going to extend in the future. For now the parser is pretty strict about the partitioning expression: it can only be column name or function call with column name. I think it's good enough, it looks weird to me to support "partitioned by a + b". However, I'm a little worried about @rdblue what do you think? |
@rdblue, do you plan to do a DSv2 meeting this month? It's a code freeze soon. I would like to take a step back here and revisit & rethink about this. Let me send an email to dev to collect more feedback. |
What changes were proposed in this pull request?
This adds a public Expression API that can be used to pass partition transformations to data sources.
How was this patch tested?
Existing tests to validate no regressions. Added transform cases to DDL suite and v1 conversions suite.