Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27181][SQL]: Add public transform API #24117

Closed
wants to merge 6 commits into from

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Mar 17, 2019

What changes were proposed in this pull request?

This adds a public Expression API that can be used to pass partition transformations to data sources.

How was this patch tested?

Existing tests to validate no regressions. Added transform cases to DDL suite and v1 conversions suite.

@rdblue

This comment has been minimized.

@SparkQA

This comment has been minimized.

@rdblue rdblue force-pushed the add-public-transform-api branch 2 times, most recently from 80c3966 to 5d192b9 Compare March 25, 2019 22:30
@rdblue rdblue changed the title [SPARK-27181][SQL]: Add public transform API (WIP) [SPARK-27181][SQL]: Add public transform API Mar 25, 2019
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

;

transform
: qualifiedName #identityTransform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's used in CREATE TABLE statement only, do we really need qualifiedName? I think identifier is good enough here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to use qualifiedName. This may be a logical name in the current use, but later Spark may need to resolve the transform using this name. For example, this could be set to builtin.bucket to tell Spark that it is the built-in bucket transform function. Using that information, Spark would know it can run a bucketed join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the transform arguments? Do they need to be qualifiedName as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, arguments need to be qualifiedName because they may reference nested fields.

package org.apache.spark.sql.catalyst.logical.expressions;

/**
* Helper methods to create logical transforms to pass into Spark.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who will call these help methods? Spark or data source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data sources that need to pass transforms back to Spark through Table.partitioning. Spark uses the internal LogicalExpressions.

@SparkQA

This comment has been minimized.

* @param <T> the Java type of a value held by the literal
*/
@Experimental
public interface Literal<T> extends Expression {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little hesitant to add type parameter to an expression interface. I'm not sure how useful it is. When I deal with expressions, my method parameter and return type is usually Expression. Because of type erasure, I won't get the type parameter of literal, unless the method deals with literal only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the down side to using this? We have a typed literal in Iceberg and it is useful for maintaining type safety.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside is, we may need to add cast to read value from this literal, e.g.

def func(e: Expression) = e match {
  case lit: Literal[_] => lit.asInstanceOf[Literal[Any]].value
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it will be good to see some examples. In general, my feeling is that, adding type parameter to a sub-class but not the base class is not going to be very useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative is to cast the value instead, so you have to cast either way. You can't get around casting when the type is discarded. I don't think it is a good idea to throw away type information in all cases just because it isn't useful in some cases.

Here's an example of how it is used in Iceberg in expression evaluation:

    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
      Comparator<T> cmp = lit.comparator();
      return cmp.compare(ref.get(struct), lit.value()) < 0;
    }

In Iceberg, expression binding guarantees that the literal's type matches the reference's type. With that information, this code knows that the value returned by the reference's get method matches the type of the comparator and of the literal value.

override def toString: String = describe
}

private[sql] final case class ApplyTransform(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the semantics of this? an arbitrary function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some unknown transform.

@SparkQA

This comment has been minimized.

@rxin
Copy link
Contributor

rxin commented Mar 29, 2019 via email

@SparkQA
Copy link

SparkQA commented Mar 30, 2019

Test build #104098 has finished for PR 24117 at commit 0aa3533.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Apr 1, 2019

@cloud-fan, I've updated the uses of lazy val to def. I think that's the last problem. Ready to commit?

@rdblue
Copy link
Contributor Author

rdblue commented Apr 5, 2019

Retest this please.

@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #104323 has finished for PR 24117 at commit 76a4067.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Apr 5, 2019

Retest this please.

@SparkQA
Copy link

SparkQA commented Apr 6, 2019

Test build #104334 has finished for PR 24117 at commit 76a4067.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Apr 8, 2019

Retest this please.

@SparkQA
Copy link

SparkQA commented Apr 8, 2019

Test build #104396 has finished for PR 24117 at commit 76a4067.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 8, 2019

Test build #104397 has finished for PR 24117 at commit a4a87ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Apr 8, 2019

@cloud-fan, tests passed. Is this ready to merge?

test("create table - partitioned by transforms") {
val transforms = Seq(
"bucket(16, b)", "years(ts)", "months(ts)", "days(ts)", "hours(ts)", "foo(a, 'bar', 34)",
"bucket(32, b), days(ts)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reminds me one thing: shall we resolve the references before passing the transforms to the table catalog? Now we create transforms at parser side, and we may pass years(ts) to the table catalog even if ts doesn't exist. Do we expect the table catalog itself to resolve references?

Copy link
Contributor Author

@rdblue rdblue Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark will validate that the columns exist in the table schema using an analysis rule. That isn't going into this PR. This PR updates the parser and adds tests for that. We will add analysis rules later, when this PR makes it in.

The reason why we can't add them here is that we don't want to write validations against the parsed SQL plans. We want to write them against the v2 create table commands, which won't be added until after the table catalog makes it in (#24246).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

@cloud-fan
Copy link
Contributor

LGTM except one comment

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 58674d5 Apr 10, 2019
@rdblue
Copy link
Contributor Author

rdblue commented Apr 10, 2019

Thank you @cloud-fan!

@rxin
Copy link
Contributor

rxin commented Apr 15, 2019 via email

@rdblue
Copy link
Contributor Author

rdblue commented Apr 16, 2019

mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
## What changes were proposed in this pull request?

This adds a public Expression API that can be used to pass partition transformations to data sources.

## How was this patch tested?

Existing tests to validate no regressions. Added transform cases to DDL suite and v1 conversions suite.

Closes apache#24117 from rdblue/add-public-transform-api.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 19, 2019

@rdblue, sorry that I just came here late and leave a comment like this but do you mind if I ask what does this PR target (or any discussion link I can refer)? I happened to come here while tracking history and I am lost here.

So .. does this PR target to support transform(col) in partition clause, for instance, from

CREATE TABLE table(col INT) USING parquet PARTITIONED BY transform(col)

For transform, DSv2 should implement its logic? Like, are you planning to implement YearsTransform, MonthsTransform, DaysTransform in Spark side,
and then expose ApplyTransform to Data source implementation?

Looks it's going to be super confusing.

  1. Users would expect they can use the same Spark expressions but it's actually different per datasource implementation. For instance, I would expect hours(col + col), hours(current_timestamp()) or trunc(...) works.

  2. If we should define YearsTransform, MonthsTransform, DaysTransform, ... then it's going to a copy of our expressions in Spark.

Seems like we should either try to push expression itself (or subset of the expressions that the source can handle) and the datasource implementation should interpret it.

WDYT? please let me know if I am getting completely wrong somewhere.

@rdblue
Copy link
Contributor Author

rdblue commented Dec 19, 2019

@HyukjinKwon, these transforms are passed to the data source as table configuration and it is up to the sources to implement them.

These are not expressions like other Spark SQL syntax. They are named transformations that accept a list of literals and columns.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 20, 2019

Then, I think at the very least the way of calling it should be different from calling Spark functions.
... PARTITIONED BY transform(col) this form exactly looks like a Spark expression.

This is the root cause because users cannot distinguish which one is the transforms and Spark expressions; however, both work differently.

@rdblue
Copy link
Contributor Author

rdblue commented Dec 20, 2019

These are only allowed within a PARTITIONED BY clause, where previously no expressions were allowed (only identifiers).

@HyukjinKwon
Copy link
Member

The problem is that, previously it wasn't allowed but apparently looks like it's going to allow now (while it's actually not). I myself was confused about this so I had to track the history.

@HyukjinKwon
Copy link
Member

Can we have a bit different syntax for this at least? this:

CREATE TABLE table(col INT) USING parquet PARTITIONED BY transform(col)

looks it will allow arbitrary expression at PARTITIONED BY clause.

@HyukjinKwon
Copy link
Member

@rdblue, shall we bring this topic up to the next DSv2 meeting if you don't currently have an idea to handle it and/or don't think it matters?

@rdblue
Copy link
Contributor Author

rdblue commented Dec 23, 2019

@HyukjinKwon, we discussed this a while ago and I don't see much reason to reopen it. You're welcome to bring it up at the next sync, but I don't consider this a problem.

These look like expressions because they are limited expressions for transforming data to produce partition values. Expressions should look like expressions. If you want to improve some of the cases where more complex expressions aren't supported, then let's do that.

We also don't yet know how much expression syntax we will pass to sources. This is why we started a public expression API: so we can pass expressions to sources where they are needed. I expect that API to expand as we solve more complicated use cases.

@HyukjinKwon
Copy link
Member

we discussed this a while ago and I don't see much reason to reopen it

Can you point out any link or the summary in the mailing list? Maybe I have missed some discussions so I wanted to read and follow. This was my original intention, actually.

These look like expressions because they are limited expressions for transforming data to produce partition values. Expressions should look like expressions. If you want to improve some of the cases where more complex expressions aren't supported, then let's do that.

We also don't yet know how much expression syntax we will pass to sources. This is why we started a public expression API: so we can pass expressions to sources where they are needed. I expect that API to expand as we solve more complicated use cases.

If this is going to support an expression-like support, it should at least work like an expression (even though we don't partially push down like DSv1 filter APIs).

From a cursory look, the current transform API just looks like half-baked one - looks we will have to have a copy of the Spark expressions, and unable to extend to support other expression-like supports such transform(col + col).

I indeed see a problem here - my impression was that the point of DSv2 is to avoid half-baked APIs or just-work-for-now. How do we plan to extend this support? The current status looks definitely half-backed, and some changes against this API look being proposed, for example #26929 which was why I had to follow the history.

* Base class of the public logical expression API.
*/
@Experimental
public interface Expression {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused why we should expose a custom API which is only used in transform API currently.
Maybe there was a discussion about this plan. Do we plan to switch the current Spark expressions to this expression entirely in the future and how is it different from using a UDF?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to give the semantic to data source instead of a concrete UDF. Data sources can implement the partitioning semantic efficiently if they don't need to call a java UDF here and there.

@cloud-fan
Copy link
Contributor

The partitioning expressions need to be public because it's used in DS v2, that's why we create a public Expression interface. It's kind of a copy of the interval catalyst expressions, but for now there are only a few public expressions, and we plan to add more in the future. Adding new public expressions is backward compatible.

But I do agree with the concern from @HyukjinKwon about how we are going to extend in the future. For now the parser is pretty strict about the partitioning expression: it can only be column name or function call with column name. I think it's good enough, it looks weird to me to support "partitioned by a + b". However, I'm a little worried about ApplyTransform, which just pass arbitrary function names specified by end-users to the data source, without a well defined semantic. Image we add a new transform called Second, whose function name is "second". Then in the new version data source would get Second while in the old version they got ApplyTransform. This is not backward compatible.

@rdblue what do you think?

@HyukjinKwon
Copy link
Member

@rdblue, do you plan to do a DSv2 meeting this month? It's a code freeze soon. I would like to take a step back here and revisit & rethink about this. Let me send an email to dev to collect more feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants