Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40852][CONNECT][PYTHON] Introduce StatFunction in proto and implement DataFrame.summary #38318

Closed
wants to merge 2 commits into from

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Oct 20, 2022

What changes were proposed in this pull request?

Implement DataFrame.summary

there is a set of DataFrame APIs implemented in StatFunctions, DataFrameStatFunctions and DataFrameNaFunctions, which I think can not be implemented in connect client due to:

  1. depend on Catalyst's analysis (most of them);
    2. implemented in RDD operations (like summary,approxQuantile); (resolved by reimpl)
    3. internally trigger jobs (like summary); (resolved by reimpl)

This PR introduced a new proto StatFunction to support StatFunctions method

Why are the changes needed?

for Connect API coverage

Does this PR introduce any user-facing change?

yes, new API

How was this patch tested?

added UT

@zhengruifeng zhengruifeng changed the title [SPARK-40852][CONNECT][PYTHON][WIP] Implement DataFrame.summary [SPARK-40852][CONNECT][PYTHON][WIP] Introduce DataFrameFunction in proto and implement DataFrame.summary Oct 21, 2022
@zhengruifeng zhengruifeng changed the title [SPARK-40852][CONNECT][PYTHON][WIP] Introduce DataFrameFunction in proto and implement DataFrame.summary [SPARK-40852][CONNECT][PYTHON] Introduce DataFrameFunction in proto and implement DataFrame.summary Oct 21, 2022
@zhengruifeng zhengruifeng marked this pull request as ready for review October 21, 2022 06:56
@zhengruifeng
Copy link
Contributor Author

rel.getFunctionCase match {
case proto.DataFrameFunction.FunctionCase.SUMMARY =>
StatFunctions
.summary(Dataset.ofRows(session, child), rel.getSummary.getStatisticsList.asScala.toSeq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now but it's going to truncate the SQL plans that disable further optimization. We should probably add dedicated plans for def summary in Dataset itself.

For now, LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, then it will have more optimization space. let us add new plan for it. thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1!

I don't know how to add a new plan. It would be very useful to have a PR as an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cloud-fan @HyukjinKwon
since we had reimplemented the df.summary 6a0713a, are there some differences in sql optimization between this method (directly invoke df.summary) and adding a dedicated plan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some rules may not work as they don't recognize the new plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by truncate the SQL plans? DataFrame transformations just accumulate the logical plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan the old df.summary eagerly compute the statistics and always return a LocalRelation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's an issue. Can it be solved by updating df.summary implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it has been resolved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new impl

@zhengruifeng zhengruifeng deleted the connect_df_summary branch October 22, 2022 01:52
@zhengruifeng zhengruifeng restored the connect_df_summary branch October 27, 2022 03:05
@zhengruifeng zhengruifeng reopened this Oct 27, 2022
@zhengruifeng zhengruifeng changed the title [SPARK-40852][CONNECT][PYTHON] Introduce DataFrameFunction in proto and implement DataFrame.summary [SPARK-40852][CONNECT][PYTHON] Introduce StatFunction in proto and implement DataFrame.summary Nov 7, 2022
@zhengruifeng
Copy link
Contributor Author

@@ -323,6 +323,14 @@ def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) ->
def where(self, condition: Expression) -> "DataFrame":
return self.filter(condition)

def summary(self, *statistics: str) -> "DataFrame":
_statistics: List[str] = list(statistics)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

different from

if len(statistics) == 1 and isinstance(statistics[0], list):
statistics = statistics[0]
since i think that preprocessing weird

Copy link
Contributor

@amaliujia amaliujia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with one comment

@@ -323,6 +323,14 @@ def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) ->
def where(self, condition: Expression) -> "DataFrame":
return self.filter(condition)

def summary(self, *statistics: str) -> "DataFrame":
_statistics: List[str] = list(statistics)
assert all(isinstance(s, str) for s in _statistics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given def summary(self, *statistics: str) -> "DataFrame": is public API so there could be some mis-use passing into non-str parameters for statistics, is this common practice to just assert without giving a message?

Copy link
Contributor

@amaliujia amaliujia Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In contrast, I guess the assert in plan.py is ok because that is internal API and we can implement it right so assert on unexpected things (and developer can fix when it really happens).

    def __init__(self, child: Optional["LogicalPlan"], function: str, **kwargs: Any) -> None:
        super().__init__(child)
        assert function in ["summary"]
        self.function = function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, should give the error message, will update

oneof function {
Summary summary = 2;

Unknown unknown = 999;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's for enum but here is an optional field... cc @amaliujia

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: will we add new functions under this oneof?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, such as crosstab cov corr etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok then this makes sense

import org.apache.spark.sql.connect.dsl.commands._
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}

class SparkConnectStatFunctionSuite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a new test suite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i encounter some problems in adding test in existing suites since summary requires a session and need to analyze the plan;
this suite will also cover some eargly computed stat functions (cov corr) in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me take another look

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, add another test in existing suites

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I thought a separate suite was ok because that suite actually just executed the proto plan to check expected result.

Now it's changed to plan comparison based test, which is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests were improved a lot since the time i sent this pr :)

xxx

add scala test

fix lint

resolve conflict

fix scala tests

add error msg
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 4f096db Nov 9, 2022
@zhengruifeng zhengruifeng deleted the connect_df_summary branch November 9, 2022 01:23
@zhengruifeng
Copy link
Contributor Author

thanks @cloud-fan @HyukjinKwon @amaliujia for reviews!

SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…implement `DataFrame.summary`

### What changes were proposed in this pull request?
 Implement `DataFrame.summary`

there is a set of DataFrame APIs implemented in [`StatFunctions`](https://github.com/apache/spark/blob/9cae423075145d3dd81d53f4b82d4f2af6fe7c15/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala),  [`DataFrameStatFunctions`](https://github.com/apache/spark/blob/b69c26833c99337bb17922f21dd72ee3a12e0c0a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala) and [`DataFrameNaFunctions`](https://github.com/apache/spark/blob/5d74ace648422e7a9bff7774ac266372934023b9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala), which I think can not be implemented in connect client due to:

1. depend on Catalyst's analysis (most of them);
~~2. implemented in RDD operations (like `summary`,`approxQuantile`);~~ (resolved by reimpl)
~~3. internally trigger jobs (like `summary`);~~ (resolved by reimpl)

This PR introduced a new proto `StatFunction`  to support  `StatFunctions` method

### Why are the changes needed?
for Connect API coverage

### Does this PR introduce _any_ user-facing change?
yes, new API

### How was this patch tested?
added UT

Closes apache#38318 from zhengruifeng/connect_df_summary.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants